Merge branch 'master' into add-test-44816

This commit is contained in:
Alexey Milovidov 2023-07-09 05:50:24 +03:00 committed by GitHub
commit b5b814860a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
192 changed files with 1875 additions and 1330 deletions

View File

@ -75,51 +75,6 @@ jobs:
Codebrowser:
needs: [DockerHubPush]
uses: ./.github/workflows/woboq.yml
BuilderCoverity:
needs: DockerHubPush
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
BUILD_NAME=coverity
CACHES_PATH=${{runner.temp}}/../ccaches
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
TEMP_PATH=${{runner.temp}}/build_check
EOF
echo "COVERITY_TOKEN=${{ secrets.COVERITY_TOKEN }}" >> "$GITHUB_ENV"
- name: Download changed images
uses: actions/download-artifact@v3
with:
name: changed_images
path: ${{ env.IMAGES_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
submodules: true
- name: Build
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
- name: Upload Coverity Analysis
if: ${{ success() || failure() }}
run: |
curl --form token="${COVERITY_TOKEN}" \
--form email='security+coverity@clickhouse.com' \
--form file="@$TEMP_PATH/$BUILD_NAME/coverity-scan.tar.gz" \
--form version="${GITHUB_REF#refs/heads/}-${GITHUB_SHA::6}" \
--form description="Nighly Scan: $(date +'%Y-%m-%dT%H:%M:%S')" \
https://scan.coverity.com/builds?project=ClickHouse%2FClickHouse
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
SonarCloud:
runs-on: [self-hosted, builder]
env:

View File

@ -344,9 +344,9 @@ if (COMPILER_CLANG)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-absolute-paths")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fdiagnostics-absolute-paths")
if (NOT ENABLE_TESTS AND NOT SANITIZE)
if (NOT ENABLE_TESTS AND NOT SANITIZE AND OS_LINUX)
# https://clang.llvm.org/docs/ThinLTO.html
# Applies to clang only.
# Applies to clang and linux only.
# Disabled when building with tests or sanitizers.
option(ENABLE_THINLTO "Clang-specific link time optimization" ON)
endif()

View File

@ -15,25 +15,34 @@
static thread_local uint64_t current_tid = 0;
static void setCurrentThreadId()
{
#if defined(OS_ANDROID)
current_tid = gettid();
#elif defined(OS_LINUX)
current_tid = static_cast<uint64_t>(syscall(SYS_gettid)); /// This call is always successful. - man gettid
#elif defined(OS_FREEBSD)
current_tid = pthread_getthreadid_np();
#elif defined(OS_SUNOS)
// On Solaris-derived systems, this returns the ID of the LWP, analogous
// to a thread.
current_tid = static_cast<uint64_t>(pthread_self());
#else
if (0 != pthread_threadid_np(nullptr, &current_tid))
throw std::logic_error("pthread_threadid_np returned error");
#endif
}
uint64_t getThreadId()
{
if (!current_tid)
{
#if defined(OS_ANDROID)
current_tid = gettid();
#elif defined(OS_LINUX)
current_tid = static_cast<uint64_t>(syscall(SYS_gettid)); /// This call is always successful. - man gettid
#elif defined(OS_FREEBSD)
current_tid = pthread_getthreadid_np();
#elif defined(OS_SUNOS)
// On Solaris-derived systems, this returns the ID of the LWP, analogous
// to a thread.
current_tid = static_cast<uint64_t>(pthread_self());
#else
if (0 != pthread_threadid_np(nullptr, &current_tid))
throw std::logic_error("pthread_threadid_np returned error");
#endif
}
setCurrentThreadId();
return current_tid;
}
void updateCurrentThreadIdAfterFork()
{
setCurrentThreadId();
}

View File

@ -3,3 +3,5 @@
/// Obtain thread id from OS. The value is cached in thread local variable.
uint64_t getThreadId();
void updateCurrentThreadIdAfterFork();

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
esac
ARG REPOSITORY="https://s3.amazonaws.com/clickhouse-builds/22.4/31c367d3cd3aefd316778601ff6565119fe36682/package_release"
ARG VERSION="23.6.1.1524"
ARG VERSION="23.6.2.18"
ARG PACKAGES="clickhouse-keeper"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -49,8 +49,8 @@ ENV CARGO_HOME=/rust/cargo
ENV PATH="/rust/cargo/bin:${PATH}"
RUN curl https://sh.rustup.rs -sSf | bash -s -- -y && \
chmod 777 -R /rust && \
rustup toolchain install nightly && \
rustup default nightly && \
rustup toolchain install nightly-2023-07-04 && \
rustup default nightly-2023-07-04 && \
rustup component add rust-src && \
rustup target add aarch64-unknown-linux-gnu && \
rustup target add x86_64-apple-darwin && \

View File

@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="23.6.1.1524"
ARG VERSION="23.6.2.18"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -23,7 +23,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="23.6.1.1524"
ARG VERSION="23.6.2.18"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -291,7 +291,7 @@ quit
if [ "$server_died" == 1 ]
then
# The server has died.
if ! rg --text -o 'Received signal.*|Logical error.*|Assertion.*failed|Failed assertion.*|.*runtime error: .*|.*is located.*|(SUMMARY|ERROR): [a-zA-Z]+Sanitizer:.*|.*_LIBCPP_ASSERT.*' server.log > description.txt
if ! rg --text -o 'Received signal.*|Logical error.*|Assertion.*failed|Failed assertion.*|.*runtime error: .*|.*is located.*|(SUMMARY|ERROR): [a-zA-Z]+Sanitizer:.*|.*_LIBCPP_ASSERT.*|.*Child process was terminated by signal 9.*' server.log > description.txt
then
echo "Lost connection to server. See the logs." > description.txt
fi

View File

@ -92,8 +92,8 @@ sudo clickhouse stop ||:
for _ in $(seq 1 60); do if [[ $(wget --timeout=1 -q 'localhost:8123' -O-) == 'Ok.' ]]; then sleep 1 ; else break; fi ; done
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server.log ||:
pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.gz &
rg -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server.log ||:
zstd < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.zst &
# Compressed (FIXME: remove once only github actions will be left)
rm /var/log/clickhouse-server/clickhouse-server.log

View File

@ -33,7 +33,6 @@ RUN apt-get update -y \
qemu-user-static \
sqlite3 \
sudo \
telnet \
tree \
unixodbc \
wget \

View File

@ -8,8 +8,6 @@ RUN apt-get update -y \
apt-get install --yes --no-install-recommends \
bash \
tzdata \
fakeroot \
debhelper \
parallel \
expect \
python3 \
@ -20,7 +18,6 @@ RUN apt-get update -y \
sudo \
openssl \
netcat-openbsd \
telnet \
brotli \
&& apt-get clean

View File

@ -8,8 +8,6 @@ RUN apt-get update -y \
apt-get install --yes --no-install-recommends \
bash \
tzdata \
fakeroot \
debhelper \
parallel \
expect \
python3 \
@ -20,7 +18,6 @@ RUN apt-get update -y \
sudo \
openssl \
netcat-openbsd \
telnet \
brotli \
&& apt-get clean

View File

@ -44,7 +44,6 @@ RUN apt-get update \
clang-${LLVM_VERSION} \
clang-tidy-${LLVM_VERSION} \
cmake \
fakeroot \
gdb \
git \
gperf \

View File

@ -0,0 +1,20 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v22.8.20.11-lts (c9ca79e24e8) FIXME as compared to v22.8.19.10-lts (989bc2fe8b0)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix broken index analysis when binary operator contains a null constant argument [#50177](https://github.com/ClickHouse/ClickHouse/pull/50177) ([Amos Bird](https://github.com/amosbird)).
* Fix incorrect constant folding [#50536](https://github.com/ClickHouse/ClickHouse/pull/50536) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Fix fuzzer failure in ActionsDAG [#51301](https://github.com/ClickHouse/ClickHouse/pull/51301) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Fix segfault in MathUnary [#51499](https://github.com/ClickHouse/ClickHouse/pull/51499) ([Ilya Yatsishin](https://github.com/qoega)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Decoupled commits from [#51180](https://github.com/ClickHouse/ClickHouse/issues/51180) for backports [#51561](https://github.com/ClickHouse/ClickHouse/pull/51561) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -0,0 +1,25 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v23.6.2.18-stable (89f39a7ccfe) FIXME as compared to v23.6.1.1524-stable (d1c7e13d088)
#### Build/Testing/Packaging Improvement
* Backported in [#51888](https://github.com/ClickHouse/ClickHouse/issues/51888): Update cargo dependencies. [#51721](https://github.com/ClickHouse/ClickHouse/pull/51721) ([Raúl Marín](https://github.com/Algunenano)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix reading from empty column in `parseSipHashKey` [#51804](https://github.com/ClickHouse/ClickHouse/pull/51804) ([Nikita Taranov](https://github.com/nickitat)).
* Allow parametric UDFs [#51964](https://github.com/ClickHouse/ClickHouse/pull/51964) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Remove the usage of Analyzer setting in the client [#51578](https://github.com/ClickHouse/ClickHouse/pull/51578) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Fix 02116_tuple_element with Analyzer [#51669](https://github.com/ClickHouse/ClickHouse/pull/51669) ([Robert Schulze](https://github.com/rschu1ze)).
* Fix SQLLogic docker images [#51719](https://github.com/ClickHouse/ClickHouse/pull/51719) ([Antonio Andelic](https://github.com/antonio2368)).
* Fix source image for sqllogic [#51728](https://github.com/ClickHouse/ClickHouse/pull/51728) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Pin for docker-ce [#51743](https://github.com/ClickHouse/ClickHouse/pull/51743) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -33,6 +33,15 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name
- `options` — MongoDB connection string options (optional parameter).
:::tip
If you are using the MongoDB Atlas cloud offering please add these options:
```
'connectTimeoutMS=10000&ssl=true&authSource=admin'
```
:::
## Usage Example {#usage-example}
Create a table in ClickHouse which allows to read data from MongoDB collection:

View File

@ -37,8 +37,8 @@ The [Merge](/docs/en/engines/table-engines/special/merge.md/#merge) engine does
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS|EPHEMERAL expr1] [TTL expr1] [CODEC(codec1)] [[NOT] NULL|PRIMARY KEY],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS|EPHEMERAL expr2] [TTL expr2] [CODEC(codec2)] [[NOT] NULL|PRIMARY KEY],
...
INDEX index_name1 expr1 TYPE type1(...) [GRANULARITY value1],
INDEX index_name2 expr2 TYPE type2(...) [GRANULARITY value2],
@ -439,41 +439,41 @@ Syntax: `ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions,
- `number_of_hash_functions` — The number of hash functions used in the Bloom filter.
- `random_seed` — The seed for Bloom filter hash functions.
Users can create [UDF](/docs/en/sql-reference/statements/create/function.md) to estimate the parameters set of `ngrambf_v1`. Query statements are as follows:
Users can create [UDF](/docs/en/sql-reference/statements/create/function.md) to estimate the parameters set of `ngrambf_v1`. Query statements are as follows:
```sql
CREATE FUNCTION bfEstimateFunctions [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, size_of_bloom_filter_in_bits) -> round((size_of_bloom_filter_in_bits / total_nubmer_of_all_grams) * log(2));
CREATE FUNCTION bfEstimateBmSize [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, probability_of_false_positives) -> ceil((total_nubmer_of_all_grams * log(probability_of_false_positives)) / log(1 / pow(2, log(2))));
CREATE FUNCTION bfEstimateFalsePositive [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, number_of_hash_functions, size_of_bloom_filter_in_bytes) -> pow(1 - exp(-number_of_hash_functions/ (size_of_bloom_filter_in_bytes / total_nubmer_of_all_grams)), number_of_hash_functions);
CREATE FUNCTION bfEstimateGramNumber [ON CLUSTER cluster]
AS
CREATE FUNCTION bfEstimateFunctions [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, size_of_bloom_filter_in_bits) -> round((size_of_bloom_filter_in_bits / total_nubmer_of_all_grams) * log(2));
CREATE FUNCTION bfEstimateBmSize [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, probability_of_false_positives) -> ceil((total_nubmer_of_all_grams * log(probability_of_false_positives)) / log(1 / pow(2, log(2))));
CREATE FUNCTION bfEstimateFalsePositive [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, number_of_hash_functions, size_of_bloom_filter_in_bytes) -> pow(1 - exp(-number_of_hash_functions/ (size_of_bloom_filter_in_bytes / total_nubmer_of_all_grams)), number_of_hash_functions);
CREATE FUNCTION bfEstimateGramNumber [ON CLUSTER cluster]
AS
(number_of_hash_functions, probability_of_false_positives, size_of_bloom_filter_in_bytes) -> ceil(size_of_bloom_filter_in_bytes / (-number_of_hash_functions / log(1 - exp(log(probability_of_false_positives) / number_of_hash_functions))))
```
```
To use those functions,we need to specify two parameter at least.
For example, if there 4300 ngrams in the granule and we expect false positives to be less than 0.0001. The other parameters can be estimated by executing following queries:
For example, if there 4300 ngrams in the granule and we expect false positives to be less than 0.0001. The other parameters can be estimated by executing following queries:
```sql
--- estimate number of bits in the filter
SELECT bfEstimateBmSize(4300, 0.0001) / 8 as size_of_bloom_filter_in_bytes;
SELECT bfEstimateBmSize(4300, 0.0001) / 8 as size_of_bloom_filter_in_bytes;
┌─size_of_bloom_filter_in_bytes─┐
│ 10304 │
└───────────────────────────────┘
--- estimate number of hash functions
SELECT bfEstimateFunctions(4300, bfEstimateBmSize(4300, 0.0001)) as number_of_hash_functions
┌─number_of_hash_functions─┐
│ 13 │
└──────────────────────────┘
@ -991,7 +991,7 @@ use a local disk to cache data from a table stored at a URL. Neither the cache d
nor the web storage is configured in the ClickHouse configuration files; both are
configured in the CREATE/ATTACH query settings.
In the settings highlighted below notice that the disk of `type=web` is nested within
In the settings highlighted below notice that the disk of `type=web` is nested within
the disk of `type=cache`.
```sql
@ -1308,7 +1308,7 @@ configuration file.
In this sample configuration:
- the disk is of type `web`
- the data is hosted at `http://nginx:80/test1/`
- a cache on local storage is used
- a cache on local storage is used
```xml
<clickhouse>

View File

@ -17,7 +17,8 @@ Default value: 0.
**Example**
``` sql
insert into table_1 values (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
INSERT INTO table_1 VALUES (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
SELECT * FROM table_1;
```
```response
┌─x─┬─y────┐
@ -30,7 +31,7 @@ insert into table_1 values (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
```sql
SELECT *
FROM table_1
SETTINGS additional_table_filters = (('table_1', 'x != 2'))
SETTINGS additional_table_filters = {'table_1': 'x != 2'}
```
```response
┌─x─┬─y────┐
@ -50,7 +51,8 @@ Default value: `''`.
**Example**
``` sql
insert into table_1 values (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
INSERT INTO table_1 VALUES (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
SElECT * FROM table_1;
```
```response
┌─x─┬─y────┐
@ -3535,7 +3537,7 @@ Possible values:
- Any positive integer.
- 0 - Disabled (infinite timeout).
Default value: 180.
Default value: 30.
## http_receive_timeout {#http_receive_timeout}
@ -3546,7 +3548,7 @@ Possible values:
- Any positive integer.
- 0 - Disabled (infinite timeout).
Default value: 180.
Default value: 30.
## check_query_single_value_result {#check_query_single_value_result}

View File

@ -171,12 +171,13 @@ Result:
└──────────────────────────────┘
```
Executable user defined functions can take constant parameters configured in `command` setting (works only for user defined functions with `executable` type).
Executable user defined functions can take constant parameters configured in `command` setting (works only for user defined functions with `executable` type). It also requires the `execute_direct` option (to ensure no shell argument expansion vulnerability).
File `test_function_parameter_python.xml` (`/etc/clickhouse-server/test_function_parameter_python.xml` with default path settings).
```xml
<functions>
<function>
<type>executable</type>
<execute_direct>true</execute_direct>
<name>test_function_parameter_python</name>
<return_type>String</return_type>
<argument>

View File

@ -30,6 +30,14 @@ mongodb(host:port, database, collection, user, password, structure [, options])
- `options` - MongoDB connection string options (optional parameter).
:::tip
If you are using the MongoDB Atlas cloud offering please add these options:
```
'connectTimeoutMS=10000&ssl=true&authSource=admin'
```
:::
**Returned Value**

View File

@ -3,13 +3,6 @@ slug: /zh/development/build
---
# 如何构建 ClickHouse 发布包 {#ru-he-gou-jian-clickhouse-fa-bu-bao}
## 安装 Git 和 Pbuilder {#an-zhuang-git-he-pbuilder}
``` bash
sudo apt-get update
sudo apt-get install git pbuilder debhelper lsb-release fakeroot sudo debian-archive-keyring debian-keyring
```
## 拉取 ClickHouse 源码 {#la-qu-clickhouse-yuan-ma}
``` bash

View File

@ -485,7 +485,7 @@ try
LOG_INFO(log, "Closed all listening sockets.");
if (current_connections > 0)
current_connections = waitServersToFinish(*servers, config().getInt("shutdown_wait_unfinished", 5));
current_connections = waitServersToFinish(*servers, servers_lock, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections)
LOG_INFO(log, "Closed connections to Keeper. But {} remain. Probably some users cannot finish their connections after context shutdown.", current_connections);

View File

@ -75,6 +75,15 @@ namespace ErrorCodes
extern const int FILE_ALREADY_EXISTS;
}
void applySettingsOverridesForLocal(ContextMutablePtr context)
{
Settings settings = context->getSettings();
settings.allow_introspection_functions = true;
settings.storage_file_read_method = LocalFSReadMethod::mmap;
context->setSettings(settings);
}
void LocalServer::processError(const String &) const
{
@ -668,6 +677,12 @@ void LocalServer::processConfig()
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size, compiled_expression_cache_elements_size);
#endif
/// NOTE: it is important to apply any overrides before
/// setDefaultProfiles() calls since it will copy current context (i.e.
/// there is separate context for Buffer tables).
applySettingsOverridesForLocal(global_context);
applyCmdOptions(global_context);
/// Load global settings from default_profile and system_profile.
global_context->setDefaultProfiles(config());
@ -682,7 +697,6 @@ void LocalServer::processConfig()
std::string default_database = config().getString("default_database", "_local");
DatabaseCatalog::instance().attachDatabase(default_database, createClickHouseLocalDatabaseOverlay(default_database, global_context));
global_context->setCurrentDatabase(default_database);
applyCmdOptions(global_context);
if (config().has("path"))
{

View File

@ -1146,7 +1146,16 @@ try
size_t merges_mutations_memory_usage_soft_limit = server_settings_.merges_mutations_memory_usage_soft_limit;
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(memory_amount * server_settings_.merges_mutations_memory_usage_to_ram_ratio);
if (merges_mutations_memory_usage_soft_limit == 0 || merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage)
if (merges_mutations_memory_usage_soft_limit == 0)
{
merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage;
LOG_INFO(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}"
" ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit),
formatReadableSizeWithBinarySuffix(memory_amount),
server_settings_.merges_mutations_memory_usage_to_ram_ratio);
}
else if (merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage)
{
merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage;
LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}"
@ -1523,7 +1532,7 @@ try
LOG_INFO(log, "Closed all listening sockets.");
if (current_connections > 0)
current_connections = waitServersToFinish(servers_to_start_before_tables, config().getInt("shutdown_wait_unfinished", 5));
current_connections = waitServersToFinish(servers_to_start_before_tables, servers_lock, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections)
LOG_INFO(log, "Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections after context shutdown.", current_connections);
@ -1828,7 +1837,7 @@ try
global_context->getProcessList().killAllQueries();
if (current_connections)
current_connections = waitServersToFinish(servers, config().getInt("shutdown_wait_unfinished", 5));
current_connections = waitServersToFinish(servers, servers_lock, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections)
LOG_WARNING(log, "Closed connections. But {} remain."

View File

@ -158,7 +158,6 @@ enum class AccessType
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_SYMBOLS, "RELOAD SYMBOLS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_DICTIONARY, "SYSTEM RELOAD DICTIONARIES, RELOAD DICTIONARY, RELOAD DICTIONARIES", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_MODEL, "SYSTEM RELOAD MODELS, RELOAD MODEL, RELOAD MODELS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_FUNCTION, "SYSTEM RELOAD FUNCTIONS, RELOAD FUNCTION, RELOAD FUNCTIONS", GLOBAL, SYSTEM_RELOAD) \

View File

@ -51,7 +51,8 @@ private:
T value = T{};
public:
static constexpr bool is_nullable = false;
static constexpr bool result_is_nullable = false;
static constexpr bool should_skip_null_arguments = true;
static constexpr bool is_any = false;
bool has() const
@ -501,7 +502,8 @@ private:
char small_data[MAX_SMALL_STRING_SIZE]; /// Including the terminating zero.
public:
static constexpr bool is_nullable = false;
static constexpr bool result_is_nullable = false;
static constexpr bool should_skip_null_arguments = true;
static constexpr bool is_any = false;
bool has() const
@ -769,7 +771,7 @@ static_assert(
/// For any other value types.
template <bool IS_NULLABLE = false>
template <bool RESULT_IS_NULLABLE = false>
struct SingleValueDataGeneric
{
private:
@ -779,12 +781,13 @@ private:
bool has_value = false;
public:
static constexpr bool is_nullable = IS_NULLABLE;
static constexpr bool result_is_nullable = RESULT_IS_NULLABLE;
static constexpr bool should_skip_null_arguments = !RESULT_IS_NULLABLE;
static constexpr bool is_any = false;
bool has() const
{
if constexpr (is_nullable)
if constexpr (result_is_nullable)
return has_value;
return !value.isNull();
}
@ -820,14 +823,14 @@ public:
void change(const IColumn & column, size_t row_num, Arena *)
{
column.get(row_num, value);
if constexpr (is_nullable)
if constexpr (result_is_nullable)
has_value = true;
}
void change(const Self & to, Arena *)
{
value = to.value;
if constexpr (is_nullable)
if constexpr (result_is_nullable)
has_value = true;
}
@ -844,7 +847,7 @@ public:
bool changeFirstTime(const Self & to, Arena * arena)
{
if (!has() && (is_nullable || to.has()))
if (!has() && (result_is_nullable || to.has()))
{
change(to, arena);
return true;
@ -879,7 +882,7 @@ public:
}
else
{
if constexpr (is_nullable)
if constexpr (result_is_nullable)
{
Field new_value;
column.get(row_num, new_value);
@ -910,7 +913,7 @@ public:
{
if (!to.has())
return false;
if constexpr (is_nullable)
if constexpr (result_is_nullable)
{
if (!has())
{
@ -945,7 +948,7 @@ public:
}
else
{
if constexpr (is_nullable)
if constexpr (result_is_nullable)
{
Field new_value;
column.get(row_num, new_value);
@ -975,7 +978,7 @@ public:
{
if (!to.has())
return false;
if constexpr (is_nullable)
if constexpr (result_is_nullable)
{
if (!value.isNull() && (to.value.isNull() || value < to.value))
{
@ -1138,13 +1141,20 @@ struct AggregateFunctionAnyLastData : Data
#endif
};
/** The aggregate function 'singleValueOrNull' is used to implement subquery operators,
* such as x = ALL (SELECT ...)
* It checks if there is only one unique non-NULL value in the data.
* If there is only one unique value - returns it.
* If there are zero or at least two distinct values - returns NULL.
*/
template <typename Data>
struct AggregateFunctionSingleValueOrNullData : Data
{
static constexpr bool is_nullable = true;
using Self = AggregateFunctionSingleValueOrNullData;
static constexpr bool result_is_nullable = true;
bool first_value = true;
bool is_null = false;
@ -1166,7 +1176,7 @@ struct AggregateFunctionSingleValueOrNullData : Data
if (!to.has())
return;
if (first_value)
if (first_value && !to.first_value)
{
first_value = false;
this->change(to, arena);
@ -1311,7 +1321,7 @@ public:
static DataTypePtr createResultType(const DataTypePtr & type_)
{
if constexpr (Data::is_nullable)
if constexpr (Data::result_is_nullable)
return makeNullable(type_);
return type_;
}
@ -1431,13 +1441,13 @@ public:
}
AggregateFunctionPtr getOwnNullAdapter(
const AggregateFunctionPtr & nested_function,
const AggregateFunctionPtr & original_function,
const DataTypes & /*arguments*/,
const Array & /*params*/,
const AggregateFunctionProperties & /*properties*/) const override
{
if (Data::is_nullable)
return nested_function;
if (Data::result_is_nullable && !Data::should_skip_null_arguments)
return original_function;
return nullptr;
}

View File

@ -116,7 +116,6 @@ namespace ErrorCodes
extern const int UNKNOWN_TABLE;
extern const int ILLEGAL_COLUMN;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int FUNCTION_CANNOT_HAVE_PARAMETERS;
}
/** Query analyzer implementation overview. Please check documentation in QueryAnalysisPass.h first.
@ -4897,11 +4896,6 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
lambda_expression_untyped->formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
if (!parameters.empty())
{
throw Exception(ErrorCodes::FUNCTION_CANNOT_HAVE_PARAMETERS, "Function {} is not parametric", function_node.formatASTForErrorMessage());
}
auto lambda_expression_clone = lambda_expression_untyped->clone();
IdentifierResolveScope lambda_scope(lambda_expression_clone, &scope /*parent_scope*/);
@ -5018,12 +5012,9 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
}
FunctionOverloadResolverPtr function = UserDefinedExecutableFunctionFactory::instance().tryGet(function_name, scope.context, parameters);
bool is_executable_udf = false;
if (!function)
function = FunctionFactory::instance().tryGet(function_name, scope.context);
else
is_executable_udf = true;
if (!function)
{
@ -5074,12 +5065,6 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
return result_projection_names;
}
/// Executable UDFs may have parameters. They are checked in UserDefinedExecutableFunctionFactory.
if (!parameters.empty() && !is_executable_udf)
{
throw Exception(ErrorCodes::FUNCTION_CANNOT_HAVE_PARAMETERS, "Function {} is not parametric", function_name);
}
/** For lambda arguments we need to initialize lambda argument types DataTypeFunction using `getLambdaArgumentTypes` function.
* Then each lambda arguments are initialized with columns, where column source is lambda.
* This information is important for later steps of query processing.

View File

@ -253,6 +253,7 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
{
return std::make_unique<WriteBufferFromS3>(
client,
client, // already has long timeout
s3_uri.bucket,
fs::path(s3_uri.key) / file_name,
DBMS_DEFAULT_BUFFER_SIZE,

View File

@ -575,9 +575,11 @@ try
}
auto flags = O_WRONLY | O_EXCL;
if (query_with_output->is_outfile_append)
auto file_exists = fs::exists(out_file);
if (file_exists && query_with_output->is_outfile_append)
flags |= O_APPEND;
else if (query_with_output->is_outfile_truncate)
else if (file_exists && query_with_output->is_outfile_truncate)
flags |= O_TRUNC;
else
flags |= O_CREAT;

View File

@ -8,7 +8,7 @@
* See also: https://gcc.gnu.org/legacy-ml/gcc-help/2017-12/msg00021.html
*/
#ifdef NDEBUG
__attribute__((__weak__)) extern const size_t MMAP_THRESHOLD = 64 * (1ULL << 20);
__attribute__((__weak__)) extern const size_t MMAP_THRESHOLD = 128 * (1ULL << 20);
#else
/**
* In debug build, use small mmap threshold to reproduce more memory

View File

@ -120,6 +120,15 @@ void Timer::createIfNecessary(UInt64 thread_id, int clock_type, int pause_signal
throw Exception(ErrorCodes::CANNOT_CREATE_TIMER, "Failed to create thread timer. The function "
"'timer_create' returned non-zero but didn't set errno. This is bug in your OS.");
/// For example, it cannot be created if the server is run under QEMU:
/// "Failed to create thread timer, errno: 11, strerror: Resource temporarily unavailable."
/// You could accidentally run the server under QEMU without being aware,
/// if you use Docker image for a different architecture,
/// and you have the "binfmt-misc" kernel module, and "qemu-user" tools.
/// Also, it cannot be created if the server has too many threads.
throwFromErrno("Failed to create thread timer", ErrorCodes::CANNOT_CREATE_TIMER);
}
timer_id.emplace(local_timer_id);

View File

@ -793,88 +793,6 @@ public:
}
};
// Searches for needle surrounded by token-separators.
// Separators are anything inside ASCII (0-128) and not alphanum.
// Any value outside of basic ASCII (>=128) is considered a non-separator symbol, hence UTF-8 strings
// should work just fine. But any Unicode whitespace is not considered a token separtor.
template <typename StringSearcher>
class TokenSearcher : public StringSearcherBase
{
StringSearcher searcher;
size_t needle_size;
public:
template <typename CharT>
requires (sizeof(CharT) == 1)
static bool isValidNeedle(const CharT * needle_, size_t needle_size_)
{
return std::none_of(needle_, needle_ + needle_size_, isTokenSeparator);
}
template <typename CharT>
requires (sizeof(CharT) == 1)
TokenSearcher(const CharT * needle_, size_t needle_size_)
: searcher(needle_, needle_size_)
, needle_size(needle_size_)
{
/// The caller is responsible for calling isValidNeedle()
chassert(isValidNeedle(needle_, needle_size_));
}
template <typename CharT>
requires (sizeof(CharT) == 1)
ALWAYS_INLINE bool compare(const CharT * haystack, const CharT * haystack_end, const CharT * pos) const
{
// use searcher only if pos is in the beginning of token and pos + searcher.needle_size is end of token.
if (isToken(haystack, haystack_end, pos))
return searcher.compare(haystack, haystack_end, pos);
return false;
}
template <typename CharT>
requires (sizeof(CharT) == 1)
const CharT * search(const CharT * haystack, const CharT * const haystack_end) const
{
// use searcher.search(), then verify that returned value is a token
// if it is not, skip it and re-run
const auto * pos = haystack;
while (pos < haystack_end)
{
pos = searcher.search(pos, haystack_end);
if (pos == haystack_end || isToken(haystack, haystack_end, pos))
return pos;
// assuming that heendle does not contain any token separators.
pos += needle_size;
}
return haystack_end;
}
template <typename CharT>
requires (sizeof(CharT) == 1)
const CharT * search(const CharT * haystack, size_t haystack_size) const
{
return search(haystack, haystack + haystack_size);
}
template <typename CharT>
requires (sizeof(CharT) == 1)
ALWAYS_INLINE bool isToken(const CharT * haystack, const CharT * const haystack_end, const CharT* p) const
{
return (p == haystack || isTokenSeparator(*(p - 1)))
&& (p + needle_size >= haystack_end || isTokenSeparator(*(p + needle_size)));
}
ALWAYS_INLINE static bool isTokenSeparator(const uint8_t c)
{
return !(isAlphaNumericASCII(c) || !isASCII(c));
}
};
}
using ASCIICaseSensitiveStringSearcher = impl::StringSearcher<true, true>;
@ -882,9 +800,6 @@ using ASCIICaseInsensitiveStringSearcher = impl::StringSearcher<false, true>;
using UTF8CaseSensitiveStringSearcher = impl::StringSearcher<true, false>;
using UTF8CaseInsensitiveStringSearcher = impl::StringSearcher<false, false>;
using ASCIICaseSensitiveTokenSearcher = impl::TokenSearcher<ASCIICaseSensitiveStringSearcher>;
using ASCIICaseInsensitiveTokenSearcher = impl::TokenSearcher<ASCIICaseInsensitiveStringSearcher>;
/// Use only with short haystacks where cheap initialization is required.
template <bool CaseInsensitive>
struct StdLibASCIIStringSearcher
@ -906,11 +821,11 @@ struct StdLibASCIIStringSearcher
if constexpr (CaseInsensitive)
return std::search(
haystack_start, haystack_end, needle_start, needle_end,
[](char c1, char c2) {return std::toupper(c1) == std::toupper(c2);});
[](char c1, char c2) { return std::toupper(c1) == std::toupper(c2); });
else
return std::search(
haystack_start, haystack_end, needle_start, needle_end,
[](char c1, char c2) {return c1 == c2;});
[](char c1, char c2) { return c1 == c2; });
}
template <typename CharT>

View File

@ -9,7 +9,6 @@
#include <link.h>
//#include <iostream>
#include <filesystem>
#include <base/sort.h>
@ -561,13 +560,6 @@ MultiVersion<SymbolIndex>::Version SymbolIndex::instance()
return instanceImpl().get();
}
void SymbolIndex::reload()
{
instanceImpl().set(std::unique_ptr<SymbolIndex>(new SymbolIndex));
/// Also drop stacktrace cache.
StackTrace::dropCache();
}
}
#endif

View File

@ -24,7 +24,6 @@ protected:
public:
static MultiVersion<SymbolIndex>::Version instance();
static void reload();
struct Symbol
{

View File

@ -199,13 +199,14 @@ ThreadStatus::~ThreadStatus()
if (deleter)
deleter();
chassert(!check_current_thread_on_destruction || current_thread == this);
/// Only change current_thread if it's currently being used by this ThreadStatus
/// For example, PushingToViews chain creates and deletes ThreadStatus instances while running in the main query thread
if (check_current_thread_on_destruction)
{
assert(current_thread == this);
if (current_thread == this)
current_thread = nullptr;
}
else if (check_current_thread_on_destruction)
LOG_ERROR(log, "current_thread contains invalid address");
}
void ThreadStatus::updatePerformanceCounters()

View File

@ -730,9 +730,6 @@ using VolnitskyUTF8 = VolnitskyBase<true, false, UTF8CaseSensitiveStringSearcher
using VolnitskyCaseInsensitive = VolnitskyBase<false, true, ASCIICaseInsensitiveStringSearcher>; /// ignores non-ASCII bytes
using VolnitskyCaseInsensitiveUTF8 = VolnitskyBase<false, false, UTF8CaseInsensitiveStringSearcher>;
using VolnitskyCaseSensitiveToken = VolnitskyBase<true, true, ASCIICaseSensitiveTokenSearcher>;
using VolnitskyCaseInsensitiveToken = VolnitskyBase<false, true, ASCIICaseInsensitiveTokenSearcher>;
using MultiVolnitsky = MultiVolnitskyBase<true, true, ASCIICaseSensitiveStringSearcher>;
using MultiVolnitskyUTF8 = MultiVolnitskyBase<true, false, UTF8CaseSensitiveStringSearcher>;
using MultiVolnitskyCaseInsensitive = MultiVolnitskyBase<false, true, ASCIICaseInsensitiveStringSearcher>;

View File

@ -145,14 +145,14 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
const auto create_writer = [&](const auto & key)
{
return WriteBufferFromS3
{
return WriteBufferFromS3(
s3_client->client,
s3_client->client,
s3_client->uri.bucket,
key,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings_1
};
);
};
LOG_INFO(log, "Will try to upload snapshot on {} to S3", snapshot_file_info.path);

View File

@ -41,7 +41,7 @@
/// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 180
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 30
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
/// Maximum number of http-connections between two endpoints
/// the number is unmotivated

View File

@ -102,6 +102,7 @@ class IColumn;
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
M(UInt64, s3_retry_attempts, 10, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
M(UInt64, s3_request_timeout_ms, 3000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
@ -659,7 +660,7 @@ class IColumn;
M(UInt64, function_range_max_elements_in_block, 500000000, "Maximum number of values generated by function 'range' per block of data (sum of array sizes for every row in a block, see also 'max_block_size' and 'min_insert_block_size_rows'). It is a safety threshold.", 0) \
M(ShortCircuitFunctionEvaluation, short_circuit_function_evaluation, ShortCircuitFunctionEvaluation::ENABLE, "Setting for short-circuit function evaluation configuration. Possible values: 'enable' - use short-circuit function evaluation for functions that are suitable for it, 'disable' - disable short-circuit function evaluation, 'force_enable' - use short-circuit function evaluation for all functions.", 0) \
\
M(LocalFSReadMethod, storage_file_read_method, LocalFSReadMethod::mmap, "Method of reading data from storage file, one of: read, pread, mmap. The mmap method does not apply to clickhouse-server (it's intended for clickhouse-local).", 0) \
M(LocalFSReadMethod, storage_file_read_method, LocalFSReadMethod::pread, "Method of reading data from storage file, one of: read, pread, mmap. The mmap method does not apply to clickhouse-server (it's intended for clickhouse-local).", 0) \
M(String, local_filesystem_read_method, "pread_threadpool", "Method of reading data from local filesystem, one of: read, pread, mmap, io_uring, pread_threadpool. The 'io_uring' method is experimental and does not work for Log, TinyLog, StripeLog, File, Set and Join, and other tables with append-able files in presence of concurrent reads and writes.", 0) \
M(String, remote_filesystem_read_method, "threadpool", "Method of reading data from remote filesystem, one of: read, threadpool.", 0) \
M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \

View File

@ -80,6 +80,8 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"23.6", {{"http_send_timeout", 180, 30, "3 minutes seems crazy long. Note that this is timeout for a single network write call, not for the whole upload operation."},
{"http_receive_timeout", 180, 30, "See http_send_timeout."}}},
{"23.5", {{"input_format_parquet_preserve_order", true, false, "Allow Parquet reader to reorder rows for better parallelism."},
{"parallelize_output_from_storages", false, true, "Allow parallelism when executing queries that read from file/url/s3/etc. This may reorder rows."},
{"use_with_fill_by_sorting_prefix", false, true, "Columns preceding WITH FILL columns in ORDER BY clause form sorting prefix. Rows with different values in sorting prefix are filled independently"},

View File

@ -310,6 +310,57 @@ private:
{
ThreadStatus thread_status;
/// First log those fields that are safe to access and that should not cause new fault.
/// That way we will have some duplicated info in the log but we don't loose important info
/// in case of double fault.
LOG_FATAL(log, "########## Short fault info ############");
LOG_FATAL(log, "(version {}{}, build id: {}, git hash: {}) (from thread {}) Received signal {}",
VERSION_STRING, VERSION_OFFICIAL, daemon.build_id, daemon.git_hash,
thread_num, sig);
std::string signal_description = "Unknown signal";
/// Some of these are not really signals, but our own indications on failure reason.
if (sig == StdTerminate)
signal_description = "std::terminate";
else if (sig == SanitizerTrap)
signal_description = "sanitizer trap";
else if (sig >= 0)
signal_description = strsignal(sig); // NOLINT(concurrency-mt-unsafe) // it is not thread-safe but ok in this context
LOG_FATAL(log, "Signal description: {}", signal_description);
String error_message;
if (sig != SanitizerTrap)
error_message = signalToErrorMessage(sig, info, *context);
else
error_message = "Sanitizer trap.";
LOG_FATAL(log, fmt::runtime(error_message));
String bare_stacktrace_str;
if (stack_trace.getSize())
{
/// Write bare stack trace (addresses) just in case if we will fail to print symbolized stack trace.
/// NOTE: This still require memory allocations and mutex lock inside logger.
/// BTW we can also print it to stderr using write syscalls.
WriteBufferFromOwnString bare_stacktrace;
writeString("Stack trace:", bare_stacktrace);
for (size_t i = stack_trace.getOffset(); i < stack_trace.getSize(); ++i)
{
writeChar(' ', bare_stacktrace);
writePointerHex(stack_trace.getFramePointers()[i], bare_stacktrace);
}
LOG_FATAL(log, fmt::runtime(bare_stacktrace.str()));
bare_stacktrace_str = bare_stacktrace.str();
}
/// Now try to access potentially unsafe data in thread_ptr.
String query_id;
String query;
@ -326,16 +377,6 @@ private:
}
}
std::string signal_description = "Unknown signal";
/// Some of these are not really signals, but our own indications on failure reason.
if (sig == StdTerminate)
signal_description = "std::terminate";
else if (sig == SanitizerTrap)
signal_description = "sanitizer trap";
else if (sig >= 0)
signal_description = strsignal(sig); // NOLINT(concurrency-mt-unsafe) // it is not thread-safe but ok in this context
LOG_FATAL(log, "########################################");
if (query_id.empty())
@ -351,30 +392,11 @@ private:
thread_num, query_id, query, signal_description, sig);
}
String error_message;
if (sig != SanitizerTrap)
error_message = signalToErrorMessage(sig, info, *context);
else
error_message = "Sanitizer trap.";
LOG_FATAL(log, fmt::runtime(error_message));
if (stack_trace.getSize())
if (!bare_stacktrace_str.empty())
{
/// Write bare stack trace (addresses) just in case if we will fail to print symbolized stack trace.
/// NOTE: This still require memory allocations and mutex lock inside logger.
/// BTW we can also print it to stderr using write syscalls.
WriteBufferFromOwnString bare_stacktrace;
writeString("Stack trace:", bare_stacktrace);
for (size_t i = stack_trace.getOffset(); i < stack_trace.getSize(); ++i)
{
writeChar(' ', bare_stacktrace);
writePointerHex(stack_trace.getFramePointers()[i], bare_stacktrace);
}
LOG_FATAL(log, fmt::runtime(bare_stacktrace.str()));
LOG_FATAL(log, fmt::runtime(bare_stacktrace_str));
}
/// Write symbolized stack trace line by line for better grep-ability.
@ -1101,6 +1123,7 @@ void BaseDaemon::setupWatchdog()
if (0 == pid)
{
updateCurrentThreadIdAfterFork();
logger().information("Forked a child process to watch");
#if defined(OS_LINUX)
if (0 != prctl(PR_SET_PDEATHSIG, SIGKILL))

View File

@ -13,6 +13,7 @@
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ParserCreateQuery.h>
@ -182,6 +183,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
auto ast = parseQueryFromMetadata(log, getContext(), full_path.string(), /*throw_on_error*/ true, /*remove_empty*/ false);
if (ast)
{
FunctionNameNormalizer().visit(ast.get());
auto * create_query = ast->as<ASTCreateQuery>();
/// NOTE No concurrent writes are possible during database loading
create_query->setDatabase(TSA_SUPPRESS_WARNING_FOR_READ(database_name));

View File

@ -549,16 +549,17 @@ void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<d
for (size_t i = 0; i < key_index_to_state_from_storage.size(); ++i)
{
if (key_index_to_state_from_storage[i].isExpired()
|| key_index_to_state_from_storage[i].isNotFound())
if (key_index_to_state_from_storage[i].isExpired() || key_index_to_state_from_storage[i].isNotFound())
{
if constexpr (dictionary_key_type == DictionaryKeyType::Simple)
requested_keys_vector.emplace_back(requested_keys[i]);
else
requested_complex_key_rows.emplace_back(i);
auto requested_key = requested_keys[i];
not_found_keys.insert(requested_key);
auto [_, inserted] = not_found_keys.insert(requested_key);
if (inserted)
{
if constexpr (dictionary_key_type == DictionaryKeyType::Simple)
requested_keys_vector.emplace_back(requested_keys[i]);
else
requested_complex_key_rows.emplace_back(i);
}
}
}

View File

@ -266,7 +266,7 @@ public:
}
UInt64 getSize() const override { return reservation->getSize(); }
UInt64 getUnreservedSpace() const override { return reservation->getUnreservedSpace(); }
std::optional<UInt64> getUnreservedSpace() const override { return reservation->getUnreservedSpace(); }
DiskPtr getDisk(size_t i) const override
{

View File

@ -312,17 +312,17 @@ public:
}
}
UInt64 getTotalSpace() const override
std::optional<UInt64> getTotalSpace() const override
{
return delegate->getTotalSpace();
}
UInt64 getAvailableSpace() const override
std::optional<UInt64> getAvailableSpace() const override
{
return delegate->getAvailableSpace();
}
UInt64 getUnreservedSpace() const override
std::optional<UInt64> getUnreservedSpace() const override
{
return delegate->getUnreservedSpace();
}

View File

@ -78,7 +78,7 @@ public:
{}
UInt64 getSize() const override { return size; }
UInt64 getUnreservedSpace() const override { return unreserved_space; }
std::optional<UInt64> getUnreservedSpace() const override { return unreserved_space; }
DiskPtr getDisk(size_t i) const override
{
@ -175,8 +175,11 @@ std::optional<UInt64> DiskLocal::tryReserve(UInt64 bytes)
{
std::lock_guard lock(DiskLocal::reservation_mutex);
UInt64 available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
auto available_space = getAvailableSpace();
UInt64 unreserved_space = available_space
? *available_space - std::min(*available_space, reserved_bytes)
: std::numeric_limits<UInt64>::max();
if (bytes == 0)
{
@ -187,12 +190,24 @@ std::optional<UInt64> DiskLocal::tryReserve(UInt64 bytes)
if (unreserved_space >= bytes)
{
LOG_TRACE(
logger,
"Reserved {} on local disk {}, having unreserved {}.",
ReadableSize(bytes),
backQuote(name),
ReadableSize(unreserved_space));
if (available_space)
{
LOG_TRACE(
logger,
"Reserved {} on local disk {}, having unreserved {}.",
ReadableSize(bytes),
backQuote(name),
ReadableSize(unreserved_space));
}
else
{
LOG_TRACE(
logger,
"Reserved {} on local disk {}.",
ReadableSize(bytes),
backQuote(name));
}
++reservation_count;
reserved_bytes += bytes;
return {unreserved_space - bytes};
@ -218,14 +233,14 @@ static UInt64 getTotalSpaceByName(const String & name, const String & disk_path,
return total_size - keep_free_space_bytes;
}
UInt64 DiskLocal::getTotalSpace() const
std::optional<UInt64> DiskLocal::getTotalSpace() const
{
if (broken || readonly)
return 0;
return getTotalSpaceByName(name, disk_path, keep_free_space_bytes);
}
UInt64 DiskLocal::getAvailableSpace() const
std::optional<UInt64> DiskLocal::getAvailableSpace() const
{
if (broken || readonly)
return 0;
@ -242,10 +257,10 @@ UInt64 DiskLocal::getAvailableSpace() const
return total_size - keep_free_space_bytes;
}
UInt64 DiskLocal::getUnreservedSpace() const
std::optional<UInt64> DiskLocal::getUnreservedSpace() const
{
std::lock_guard lock(DiskLocal::reservation_mutex);
auto available_space = getAvailableSpace();
auto available_space = *getAvailableSpace();
available_space -= std::min(available_space, reserved_bytes);
return available_space;
}

View File

@ -35,11 +35,9 @@ public:
ReservationPtr reserve(UInt64 bytes) override;
UInt64 getTotalSpace() const override;
UInt64 getAvailableSpace() const override;
UInt64 getUnreservedSpace() const override;
std::optional<UInt64> getTotalSpace() const override;
std::optional<UInt64> getAvailableSpace() const override;
std::optional<UInt64> getUnreservedSpace() const override;
UInt64 getKeepingFreeSpace() const override { return keep_free_space_bytes; }

View File

@ -140,13 +140,13 @@ public:
const String & getName() const override { return name; }
/// Total available space on the disk.
virtual UInt64 getTotalSpace() const = 0;
virtual std::optional<UInt64> getTotalSpace() const = 0;
/// Space currently available on the disk.
virtual UInt64 getAvailableSpace() const = 0;
virtual std::optional<UInt64> getAvailableSpace() const = 0;
/// Space available for reservation (available space minus reserved space).
virtual UInt64 getUnreservedSpace() const = 0;
virtual std::optional<UInt64> getUnreservedSpace() const = 0;
/// Amount of bytes which should be kept free on the disk.
virtual UInt64 getKeepingFreeSpace() const { return 0; }
@ -495,7 +495,7 @@ public:
/// Space available for reservation
/// (with this reservation already take into account).
virtual UInt64 getUnreservedSpace() const = 0;
virtual std::optional<UInt64> getUnreservedSpace() const = 0;
/// Get i-th disk where reservation take place.
virtual DiskPtr getDisk(size_t i = 0) const = 0; /// NOLINT

View File

@ -74,22 +74,19 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
}
void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog(
const FileSegment & file_segment, CachedOnDiskReadBufferFromFile::ReadType type)
const FileSegment::Range & file_segment_range, CachedOnDiskReadBufferFromFile::ReadType type)
{
if (!cache_log)
return;
const auto range = file_segment.range();
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = source_file_path,
.file_segment_range = { range.left, range.right },
.file_segment_range = { file_segment_range.left, file_segment_range.right },
.requested_range = { first_offset, read_until_position },
.file_segment_key = file_segment.key().toString(),
.file_segment_offset = file_segment.offset(),
.file_segment_size = range.size(),
.file_segment_size = file_segment_range.size(),
.read_from_cache_attempted = true,
.read_buffer_id = current_buffer_id,
.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(
@ -498,7 +495,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
auto completed_range = current_file_segment->range();
if (cache_log)
appendFilesystemCacheLog(*current_file_segment, read_type);
appendFilesystemCacheLog(completed_range, read_type);
chassert(file_offset_of_buffer_end > completed_range.right);
@ -521,7 +518,7 @@ CachedOnDiskReadBufferFromFile::~CachedOnDiskReadBufferFromFile()
{
if (cache_log && file_segments && !file_segments->empty())
{
appendFilesystemCacheLog(file_segments->front(), read_type);
appendFilesystemCacheLog(file_segments->front().range(), read_type);
}
}

View File

@ -90,7 +90,7 @@ private:
bool completeFileSegmentAndGetNext();
void appendFilesystemCacheLog(const FileSegment & file_segment, ReadType read_type);
void appendFilesystemCacheLog(const FileSegment::Range & file_segment_range, ReadType read_type);
bool writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment);

View File

@ -90,8 +90,6 @@ void ReadBufferFromRemoteFSGather::appendUncachedReadInfo()
.source_file_path = current_object.remote_path,
.file_segment_range = { 0, current_object.bytes_size },
.cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE,
.file_segment_key = {},
.file_segment_offset = {},
.file_segment_size = current_object.bytes_size,
.read_from_cache_attempted = false,
};

View File

@ -49,11 +49,18 @@ IVolume::IVolume(
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Volume must contain at least one disk");
}
UInt64 IVolume::getMaxUnreservedFreeSpace() const
std::optional<UInt64> IVolume::getMaxUnreservedFreeSpace() const
{
UInt64 res = 0;
std::optional<UInt64> res;
for (const auto & disk : disks)
res = std::max(res, disk->getUnreservedSpace());
{
auto disk_unreserved_space = disk->getUnreservedSpace();
if (!disk_unreserved_space)
return std::nullopt; /// There is at least one unlimited disk.
if (!res || *disk_unreserved_space > *res)
res = disk_unreserved_space;
}
return res;
}

View File

@ -74,7 +74,7 @@ public:
virtual VolumeType getType() const = 0;
/// Return biggest unreserved space across all disks
UInt64 getMaxUnreservedFreeSpace() const;
std::optional<UInt64> getMaxUnreservedFreeSpace() const;
DiskPtr getDisk() const { return getDisk(0); }
virtual DiskPtr getDisk(size_t i) const { return disks[i]; }

View File

@ -410,18 +410,25 @@ void DiskObjectStorage::removeSharedRecursive(
transaction->commit();
}
std::optional<UInt64> DiskObjectStorage::tryReserve(UInt64 bytes)
bool DiskObjectStorage::tryReserve(UInt64 bytes)
{
std::lock_guard lock(reservation_mutex);
auto available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
if (!available_space)
{
++reservation_count;
reserved_bytes += bytes;
return true;
}
UInt64 unreserved_space = *available_space - std::min(*available_space, reserved_bytes);
if (bytes == 0)
{
LOG_TRACE(log, "Reserved 0 bytes on remote disk {}", backQuote(name));
++reservation_count;
return {unreserved_space};
return true;
}
if (unreserved_space >= bytes)
@ -434,14 +441,14 @@ std::optional<UInt64> DiskObjectStorage::tryReserve(UInt64 bytes)
ReadableSize(unreserved_space));
++reservation_count;
reserved_bytes += bytes;
return {unreserved_space - bytes};
return true;
}
else
{
LOG_TRACE(log, "Could not reserve {} on remote disk {}. Not enough unreserved space", ReadableSize(bytes), backQuote(name));
}
return {};
return false;
}
bool DiskObjectStorage::supportsCache() const

View File

@ -53,11 +53,9 @@ public:
const std::string & getCacheName() const override { return object_storage->getCacheName(); }
UInt64 getTotalSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getAvailableSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getUnreservedSpace() const override { return std::numeric_limits<UInt64>::max(); }
std::optional<UInt64> getTotalSpace() const override { return {}; }
std::optional<UInt64> getAvailableSpace() const override { return {}; }
std::optional<UInt64> getUnreservedSpace() const override { return {}; }
UInt64 getKeepingFreeSpace() const override { return 0; }
@ -224,7 +222,7 @@ private:
UInt64 reservation_count = 0;
std::mutex reservation_mutex;
std::optional<UInt64> tryReserve(UInt64 bytes);
bool tryReserve(UInt64 bytes);
const bool send_metadata;
@ -244,7 +242,7 @@ public:
UInt64 getSize() const override { return size; }
UInt64 getUnreservedSpace() const override { return unreserved_space; }
std::optional<UInt64> getUnreservedSpace() const override { return unreserved_space; }
DiskPtr getDisk(size_t i) const override;

View File

@ -149,7 +149,7 @@ private:
bool S3ObjectStorage::exists(const StoredObject & object) const
{
auto settings_ptr = s3_settings.get();
return S3::objectExists(*client.get(), bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
return S3::objectExists(*clients.get()->client, bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
}
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
@ -168,7 +168,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
(const std::string & path, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromS3>(
client.get(),
clients.get()->client,
bucket,
path,
version_id,
@ -218,7 +218,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
{
auto settings_ptr = s3_settings.get();
return std::make_unique<ReadBufferFromS3>(
client.get(),
clients.get()->client,
bucket,
object.remote_path,
version_id,
@ -243,8 +243,10 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
if (write_settings.s3_allow_parallel_part_upload)
scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "VFSWrite");
auto clients_ = clients.get();
return std::make_unique<WriteBufferFromS3>(
client.get(),
clients_->client,
clients_->client_with_long_timeout,
bucket,
object.remote_path,
buf_size,
@ -258,7 +260,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
ObjectStorageIteratorPtr S3ObjectStorage::iterate(const std::string & path_prefix) const
{
auto settings_ptr = s3_settings.get();
auto client_ptr = client.get();
auto client_ptr = clients.get()->client;
return std::make_shared<S3IteratorAsync>(bucket, path_prefix, client_ptr, settings_ptr->list_object_keys_size);
}
@ -266,7 +268,7 @@ ObjectStorageIteratorPtr S3ObjectStorage::iterate(const std::string & path_prefi
void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const
{
auto settings_ptr = s3_settings.get();
auto client_ptr = client.get();
auto client_ptr = clients.get()->client;
S3::ListObjectsV2Request request;
request.SetBucket(bucket);
@ -307,7 +309,7 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exists)
{
auto client_ptr = client.get();
auto client_ptr = clients.get()->client;
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
ProfileEvents::increment(ProfileEvents::DiskS3DeleteObjects);
@ -333,7 +335,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
}
else
{
auto client_ptr = client.get();
auto client_ptr = clients.get()->client;
auto settings_ptr = s3_settings.get();
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete;
@ -394,7 +396,7 @@ void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::string & path) const
{
auto settings_ptr = s3_settings.get();
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
auto object_info = S3::getObjectInfo(*clients.get()->client, bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
if (object_info.size == 0 && object_info.last_modification_time == 0 && object_info.metadata.empty())
return {};
@ -410,7 +412,7 @@ std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::s
ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const
{
auto settings_ptr = s3_settings.get();
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
auto object_info = S3::getObjectInfo(*clients.get()->client, bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
ObjectMetadata result;
result.size_bytes = object_info.size;
@ -429,7 +431,7 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
/// Shortcut for S3
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
{
auto client_ptr = client.get();
auto client_ptr = clients.get()->client;
auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*client_ptr, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
@ -445,7 +447,7 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
void S3ObjectStorage::copyObject( // NOLINT
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> object_to_attributes)
{
auto client_ptr = client.get();
auto client_ptr = clients.get()->client;
auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*client_ptr, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
@ -458,35 +460,33 @@ void S3ObjectStorage::setNewSettings(std::unique_ptr<S3ObjectStorageSettings> &&
s3_settings.set(std::move(s3_settings_));
}
void S3ObjectStorage::setNewClient(std::unique_ptr<S3::Client> && client_)
{
client.set(std::move(client_));
}
void S3ObjectStorage::shutdown()
{
auto client_ptr = client.get();
auto clients_ptr = clients.get();
/// This call stops any next retry attempts for ongoing S3 requests.
/// If S3 request is failed and the method below is executed S3 client immediately returns the last failed S3 request outcome.
/// If S3 is healthy nothing wrong will be happened and S3 requests will be processed in a regular way without errors.
/// This should significantly speed up shutdown process if S3 is unhealthy.
const_cast<S3::Client &>(*client_ptr).DisableRequestProcessing();
const_cast<S3::Client &>(*clients_ptr->client).DisableRequestProcessing();
const_cast<S3::Client &>(*clients_ptr->client_with_long_timeout).DisableRequestProcessing();
}
void S3ObjectStorage::startup()
{
auto client_ptr = client.get();
auto clients_ptr = clients.get();
/// Need to be enabled if it was disabled during shutdown() call.
const_cast<S3::Client &>(*client_ptr).EnableRequestProcessing();
const_cast<S3::Client &>(*clients_ptr->client).EnableRequestProcessing();
const_cast<S3::Client &>(*clients_ptr->client_with_long_timeout).EnableRequestProcessing();
}
void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
{
auto new_s3_settings = getSettings(config, config_prefix, context);
auto new_client = getClient(config, config_prefix, context, *new_s3_settings);
auto new_clients = std::make_unique<Clients>(std::move(new_client), *new_s3_settings);
s3_settings.set(std::move(new_s3_settings));
client.set(std::move(new_client));
clients.set(std::move(new_clients));
}
std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
@ -501,7 +501,9 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
endpoint);
}
S3ObjectStorage::Clients::Clients(std::shared_ptr<S3::Client> client_, const S3ObjectStorageSettings & settings)
: client(std::move(client_)), client_with_long_timeout(client->clone(std::nullopt, settings.request_settings.long_request_timeout_ms)) {}
}
#endif

View File

@ -39,6 +39,16 @@ struct S3ObjectStorageSettings
class S3ObjectStorage : public IObjectStorage
{
public:
struct Clients
{
std::shared_ptr<S3::Client> client;
std::shared_ptr<S3::Client> client_with_long_timeout;
Clients() = default;
Clients(std::shared_ptr<S3::Client> client, const S3ObjectStorageSettings & settings);
};
private:
friend class S3PlainObjectStorage;
@ -51,7 +61,7 @@ private:
String bucket_,
String connection_string)
: bucket(bucket_)
, client(std::move(client_))
, clients(std::make_unique<Clients>(std::move(client_), *s3_settings_))
, s3_settings(std::move(s3_settings_))
, s3_capabilities(s3_capabilities_)
, version_id(std::move(version_id_))
@ -159,14 +169,12 @@ public:
private:
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);
void setNewClient(std::unique_ptr<S3::Client> && client_);
void removeObjectImpl(const StoredObject & object, bool if_exists);
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
std::string bucket;
MultiVersion<S3::Client> client;
MultiVersion<Clients> clients;
MultiVersion<S3ObjectStorageSettings> s3_settings;
S3Capabilities s3_capabilities;

View File

@ -129,7 +129,7 @@ std::unique_ptr<S3::Client> getClient(
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3 path must ends with '/', but '{}' doesn't.", uri.key);
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 1000);
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 30000);
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 3000);
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
client_configuration.endpointOverride = uri.endpoint;

View File

@ -209,10 +209,17 @@ DiskPtr StoragePolicy::tryGetDiskByName(const String & disk_name) const
UInt64 StoragePolicy::getMaxUnreservedFreeSpace() const
{
UInt64 res = 0;
std::optional<UInt64> res;
for (const auto & volume : volumes)
res = std::max(res, volume->getMaxUnreservedFreeSpace());
return res;
{
auto volume_unreserved_space = volume->getMaxUnreservedFreeSpace();
if (!volume_unreserved_space)
return -1ULL; /// There is at least one unlimited disk.
if (!res || *volume_unreserved_space > *res)
res = volume_unreserved_space;
}
return res.value_or(-1ULL);
}
@ -248,22 +255,37 @@ ReservationPtr StoragePolicy::reserveAndCheck(UInt64 bytes) const
ReservationPtr StoragePolicy::makeEmptyReservationOnLargestDisk() const
{
UInt64 max_space = 0;
bool found_bottomless_disk = false;
DiskPtr max_disk;
for (const auto & volume : volumes)
{
for (const auto & disk : volume->getDisks())
{
auto avail_space = disk->getAvailableSpace();
if (avail_space > max_space)
auto available_space = disk->getAvailableSpace();
if (!available_space)
{
max_space = avail_space;
max_disk = disk;
found_bottomless_disk = true;
break;
}
if (*available_space > max_space)
{
max_space = *available_space;
max_disk = disk;
}
}
if (found_bottomless_disk)
break;
}
if (!max_disk)
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "There is no space on any disk in storage policy: {}. "
"It's likely all disks are broken", name);
auto reservation = max_disk->reserve(0);
if (!reservation)
{

View File

@ -40,20 +40,28 @@ VolumeJBOD::VolumeJBOD(
auto ratio = config.getDouble(config_prefix + ".max_data_part_size_ratio");
if (ratio < 0)
throw Exception(ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG, "'max_data_part_size_ratio' have to be not less then 0.");
UInt64 sum_size = 0;
std::vector<UInt64> sizes;
for (const auto & disk : disks)
{
sizes.push_back(disk->getTotalSpace());
sum_size += sizes.back();
auto size = disk->getTotalSpace();
if (size)
sum_size += *size;
else
break;
sizes.push_back(*size);
}
max_data_part_size = static_cast<decltype(max_data_part_size)>(sum_size * ratio / disks.size());
for (size_t i = 0; i < disks.size(); ++i)
if (sizes.size() == disks.size())
{
if (sizes[i] < max_data_part_size)
max_data_part_size = static_cast<UInt64>(sum_size * ratio / disks.size());
for (size_t i = 0; i < disks.size(); ++i)
{
LOG_WARNING(logger, "Disk {} on volume {} have not enough space ({}) for containing part the size of max_data_part_size ({})",
backQuote(disks[i]->getName()), backQuote(config_prefix), ReadableSize(sizes[i]), ReadableSize(max_data_part_size));
if (sizes[i] < max_data_part_size)
{
LOG_WARNING(logger, "Disk {} on volume {} have not enough space ({}) for containing part the size of max_data_part_size ({})",
backQuote(disks[i]->getName()), backQuote(config_prefix), ReadableSize(sizes[i]), ReadableSize(max_data_part_size));
}
}
}
}

View File

@ -68,7 +68,7 @@ private:
struct DiskWithSize
{
DiskPtr disk;
uint64_t free_size = 0;
std::optional<UInt64> free_size = 0;
DiskWithSize(DiskPtr disk_)
: disk(disk_)
@ -80,7 +80,7 @@ private:
return free_size < rhs.free_size;
}
ReservationPtr reserve(uint64_t bytes)
ReservationPtr reserve(UInt64 bytes)
{
ReservationPtr reservation = disk->reserve(bytes);
if (!reservation)

View File

@ -56,7 +56,7 @@ void loadDiskLocalConfig(const String & name,
tmp_path = context->getPath();
// Create tmp disk for getting total disk space.
keep_free_space_bytes = static_cast<UInt64>(DiskLocal("tmp", tmp_path, 0, config, config_prefix).getTotalSpace() * ratio);
keep_free_space_bytes = static_cast<UInt64>(*DiskLocal("tmp", tmp_path, 0, config, config_prefix).getTotalSpace() * ratio);
}
}

View File

@ -155,7 +155,6 @@ public:
if (!((executeType<UInt8>(result_column, arguments, input_rows_count))
|| (executeType<UInt16>(result_column, arguments, input_rows_count))
|| (executeType<UInt32>(result_column, arguments, input_rows_count))
|| (executeType<UInt32>(result_column, arguments, input_rows_count))
|| (executeType<UInt64>(result_column, arguments, input_rows_count))
|| (executeType<Int8>(result_column, arguments, input_rows_count))
|| (executeType<Int16>(result_column, arguments, input_rows_count))

View File

@ -17,7 +17,7 @@ namespace ErrorCodes
/** Token search the string, means that needle must be surrounded by some separator chars, like whitespace or puctuation.
*/
template <typename Name, typename TokenSearcher, bool negate>
template <typename Name, typename Searcher, bool negate>
struct HasTokenImpl
{
using ResultType = UInt8;
@ -46,7 +46,7 @@ struct HasTokenImpl
const UInt8 * const end = haystack_data.data() + haystack_data.size();
const UInt8 * pos = begin;
if (!ASCIICaseSensitiveTokenSearcher::isValidNeedle(pattern.data(), pattern.size()))
if (!std::none_of(pattern.begin(), pattern.end(), isTokenSeparator))
{
if (res_null)
{
@ -58,7 +58,8 @@ struct HasTokenImpl
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Needle must not contain whitespace or separator characters");
}
TokenSearcher searcher(pattern.data(), pattern.size(), end - pos);
size_t pattern_size = pattern.size();
Searcher searcher(pattern.data(), pattern_size, end - pos);
if (res_null)
std::ranges::fill(res_null->getData(), false);
@ -67,21 +68,31 @@ struct HasTokenImpl
/// We will search for the next occurrence in all rows at once.
while (pos < end && end != (pos = searcher.search(pos, end - pos)))
{
/// Let's determine which index it refers to.
while (begin + haystack_offsets[i] <= pos)
/// The found substring is a token
if ((pos == begin || isTokenSeparator(pos[-1]))
&& (pos + pattern_size == end || isTokenSeparator(pos[pattern_size])))
{
res[i] = negate;
/// Let's determine which index it refers to.
while (begin + haystack_offsets[i] <= pos)
{
res[i] = negate;
++i;
}
/// We check that the entry does not pass through the boundaries of strings.
if (pos + pattern.size() < begin + haystack_offsets[i])
res[i] = !negate;
else
res[i] = negate;
pos = begin + haystack_offsets[i];
++i;
}
/// We check that the entry does not pass through the boundaries of strings.
if (pos + pattern.size() < begin + haystack_offsets[i])
res[i] = !negate;
else
res[i] = negate;
pos = begin + haystack_offsets[i];
++i;
{
/// Not a token. Jump over it.
pos += pattern_size;
}
}
/// Tail, in which there can be no substring.
@ -113,6 +124,12 @@ struct HasTokenImpl
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Function '{}' doesn't support FixedString haystack argument", name);
}
private:
static bool isTokenSeparator(UInt8 c)
{
return isASCII(c) && !isAlphaNumericASCII(c);
}
};
}

View File

@ -20,7 +20,6 @@ namespace DB
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
extern const int FUNCTION_CANNOT_HAVE_PARAMETERS;
}
void UserDefinedSQLFunctionVisitor::visit(ASTPtr & ast)
@ -139,12 +138,6 @@ ASTPtr UserDefinedSQLFunctionVisitor::tryToReplaceFunction(const ASTFunction & f
if (!user_defined_function)
return nullptr;
/// All UDFs are not parametric for now.
if (function.parameters)
{
throw Exception(ErrorCodes::FUNCTION_CANNOT_HAVE_PARAMETERS, "Function {} is not parametric", function.name);
}
const auto & function_arguments_list = function.children.at(0)->as<ASTExpressionList>();
auto & function_arguments = function_arguments_list->children;

View File

@ -22,19 +22,19 @@ namespace
struct FilesystemAvailable
{
static constexpr auto name = "filesystemAvailable";
static std::uintmax_t get(const DiskPtr & disk) { return disk->getAvailableSpace(); }
static UInt64 get(const DiskPtr & disk) { return disk->getAvailableSpace().value_or(std::numeric_limits<UInt64>::max()); }
};
struct FilesystemUnreserved
{
static constexpr auto name = "filesystemUnreserved";
static std::uintmax_t get(const DiskPtr & disk) { return disk->getUnreservedSpace(); }
static UInt64 get(const DiskPtr & disk) { return disk->getUnreservedSpace().value_or(std::numeric_limits<UInt64>::max()); }
};
struct FilesystemCapacity
{
static constexpr auto name = "filesystemCapacity";
static std::uintmax_t get(const DiskPtr & disk) { return disk->getTotalSpace(); }
static UInt64 get(const DiskPtr & disk) { return disk->getTotalSpace().value_or(std::numeric_limits<UInt64>::max()); }
};
template <typename Impl>

View File

@ -6,6 +6,7 @@
namespace DB
{
struct NameHasToken
{
static constexpr auto name = "hasToken";
@ -17,9 +18,9 @@ struct NameHasTokenOrNull
};
using FunctionHasToken
= FunctionsStringSearch<HasTokenImpl<NameHasToken, VolnitskyCaseSensitiveToken, false>>;
= FunctionsStringSearch<HasTokenImpl<NameHasToken, Volnitsky, false>>;
using FunctionHasTokenOrNull
= FunctionsStringSearch<HasTokenImpl<NameHasTokenOrNull, VolnitskyCaseSensitiveToken, false>, ExecutionErrorPolicy::Null>;
= FunctionsStringSearch<HasTokenImpl<NameHasTokenOrNull, Volnitsky, false>, ExecutionErrorPolicy::Null>;
REGISTER_FUNCTION(HasToken)
{

View File

@ -6,6 +6,7 @@
namespace DB
{
struct NameHasTokenCaseInsensitive
{
static constexpr auto name = "hasTokenCaseInsensitive";
@ -17,9 +18,9 @@ struct NameHasTokenCaseInsensitiveOrNull
};
using FunctionHasTokenCaseInsensitive
= FunctionsStringSearch<HasTokenImpl<NameHasTokenCaseInsensitive, VolnitskyCaseInsensitiveToken, false>>;
= FunctionsStringSearch<HasTokenImpl<NameHasTokenCaseInsensitive, VolnitskyCaseInsensitive, false>>;
using FunctionHasTokenCaseInsensitiveOrNull
= FunctionsStringSearch<HasTokenImpl<NameHasTokenCaseInsensitiveOrNull, VolnitskyCaseInsensitiveToken, false>, ExecutionErrorPolicy::Null>;
= FunctionsStringSearch<HasTokenImpl<NameHasTokenCaseInsensitiveOrNull, VolnitskyCaseInsensitive, false>, ExecutionErrorPolicy::Null>;
REGISTER_FUNCTION(HasTokenCaseInsensitive)
{

View File

@ -100,7 +100,7 @@ std::unique_ptr<Client> Client::create(
size_t max_redirects_,
ServerSideEncryptionKMSConfig sse_kms_config_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider,
const Aws::Client::ClientConfiguration & client_configuration,
const PocoHTTPClientConfiguration & client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing)
{
@ -109,9 +109,16 @@ std::unique_ptr<Client> Client::create(
new Client(max_redirects_, std::move(sse_kms_config_), credentials_provider, client_configuration, sign_payloads, use_virtual_addressing));
}
std::unique_ptr<Client> Client::create(const Client & other)
std::unique_ptr<Client> Client::clone(
std::optional<std::shared_ptr<RetryStrategy>> override_retry_strategy,
std::optional<Int64> override_request_timeout_ms) const
{
return std::unique_ptr<Client>(new Client(other));
PocoHTTPClientConfiguration new_configuration = client_configuration;
if (override_retry_strategy.has_value())
new_configuration.retryStrategy = *override_retry_strategy;
if (override_request_timeout_ms.has_value())
new_configuration.requestTimeoutMs = *override_request_timeout_ms;
return std::unique_ptr<Client>(new Client(*this, new_configuration));
}
namespace
@ -134,11 +141,14 @@ Client::Client(
size_t max_redirects_,
ServerSideEncryptionKMSConfig sse_kms_config_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider_,
const Aws::Client::ClientConfiguration & client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing)
: Aws::S3::S3Client(credentials_provider_, client_configuration, std::move(sign_payloads), use_virtual_addressing)
const PocoHTTPClientConfiguration & client_configuration_,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads_,
bool use_virtual_addressing_)
: Aws::S3::S3Client(credentials_provider_, client_configuration_, sign_payloads_, use_virtual_addressing_)
, credentials_provider(credentials_provider_)
, client_configuration(client_configuration_)
, sign_payloads(sign_payloads_)
, use_virtual_addressing(use_virtual_addressing_)
, max_redirects(max_redirects_)
, sse_kms_config(std::move(sse_kms_config_))
, log(&Poco::Logger::get("S3Client"))
@ -175,10 +185,15 @@ Client::Client(
ClientCacheRegistry::instance().registerClient(cache);
}
Client::Client(const Client & other)
: Aws::S3::S3Client(other)
Client::Client(
const Client & other, const PocoHTTPClientConfiguration & client_configuration_)
: Aws::S3::S3Client(other.credentials_provider, client_configuration_, other.sign_payloads,
other.use_virtual_addressing)
, initial_endpoint(other.initial_endpoint)
, credentials_provider(other.credentials_provider)
, client_configuration(client_configuration_)
, sign_payloads(other.sign_payloads)
, use_virtual_addressing(other.use_virtual_addressing)
, explicit_region(other.explicit_region)
, detect_region(other.detect_region)
, provider_type(other.provider_type)

View File

@ -105,6 +105,8 @@ private:
class Client : private Aws::S3::S3Client
{
public:
class RetryStrategy;
/// we use a factory method to verify arguments before creating a client because
/// there are certain requirements on arguments for it to work correctly
/// e.g. Client::RetryStrategy should be used
@ -112,11 +114,19 @@ public:
size_t max_redirects_,
ServerSideEncryptionKMSConfig sse_kms_config_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider,
const Aws::Client::ClientConfiguration & client_configuration,
const PocoHTTPClientConfiguration & client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing);
static std::unique_ptr<Client> create(const Client & other);
/// Create a client with adjusted settings:
/// * override_retry_strategy can be used to disable retries to avoid nested retries when we have
/// a retry loop outside of S3 client. Specifically, for read and write buffers. Currently not
/// actually used.
/// * override_request_timeout_ms is used to increase timeout for CompleteMultipartUploadRequest
/// because it often sits idle for 10 seconds: https://github.com/ClickHouse/ClickHouse/pull/42321
std::unique_ptr<Client> clone(
std::optional<std::shared_ptr<RetryStrategy>> override_retry_strategy = std::nullopt,
std::optional<Int64> override_request_timeout_ms = std::nullopt) const;
Client & operator=(const Client &) = delete;
@ -211,11 +221,12 @@ private:
Client(size_t max_redirects_,
ServerSideEncryptionKMSConfig sse_kms_config_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider_,
const Aws::Client::ClientConfiguration& client_configuration,
const PocoHTTPClientConfiguration & client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing);
Client(const Client & other);
Client(
const Client & other, const PocoHTTPClientConfiguration & client_configuration);
/// Leave regular functions private so we don't accidentally use them
/// otherwise region and endpoint redirection won't work
@ -251,6 +262,9 @@ private:
String initial_endpoint;
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;
PocoHTTPClientConfiguration client_configuration;
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads;
bool use_virtual_addressing;
std::string explicit_region;
mutable bool detect_region = true;

View File

@ -89,6 +89,7 @@ void doWriteRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::
DB::S3Settings::RequestSettings request_settings;
request_settings.max_unexpected_write_error_retries = max_unexpected_write_error_retries;
DB::WriteBufferFromS3 write_buffer(
client,
client,
uri.bucket,
uri.key,

View File

@ -6,7 +6,6 @@ namespace DB
namespace ErrorCodes
{
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int BAD_ARGUMENTS;
}
void throwReadAfterEOF()
@ -14,12 +13,4 @@ void throwReadAfterEOF()
throw Exception(ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF, "Attempt to read after eof");
}
void throwValueTooLargeForVarIntEncoding(UInt64 x)
{
/// Under practical circumstances, we should virtually never end up here but AST Fuzzer manages to create superlarge input integers
/// which trigger this exception. Intentionally not throwing LOGICAL_ERROR or calling abort() or [ch]assert(false), so AST Fuzzer
/// can swallow the exception and continue to run.
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Value {} is too large for VarInt encoding", x);
}
}

View File

@ -12,24 +12,63 @@ namespace DB
/// Variable-Length Quantity (VLQ) Base-128 compression, also known as Variable Byte (VB) or Varint encoding.
/// Write UInt64 in variable length format (base128)
void writeVarUInt(UInt64 x, std::ostream & ostr);
void writeVarUInt(UInt64 x, WriteBuffer & ostr);
char * writeVarUInt(UInt64 x, char * ostr);
/// Read UInt64, written in variable length format (base128)
void readVarUInt(UInt64 & x, std::istream & istr);
void readVarUInt(UInt64 & x, ReadBuffer & istr);
const char * readVarUInt(UInt64 & x, const char * istr, size_t size);
/// Get the length of an variable-length-encoded integer
size_t getLengthOfVarUInt(UInt64 x);
size_t getLengthOfVarInt(Int64 x);
[[noreturn]] void throwReadAfterEOF();
[[noreturn]] void throwValueTooLargeForVarIntEncoding(UInt64 x);
/// Write Int64 in variable length format (base128)
inline void writeVarUInt(UInt64 x, WriteBuffer & ostr)
{
while (x > 0x7F)
{
uint8_t byte = 0x80 | (x & 0x7F);
ostr.nextIfAtEnd();
*ostr.position() = byte;
++ostr.position();
x >>= 7;
}
uint8_t final_byte = static_cast<uint8_t>(x);
ostr.nextIfAtEnd();
*ostr.position() = final_byte;
++ostr.position();
}
inline void writeVarUInt(UInt64 x, std::ostream & ostr)
{
while (x > 0x7F)
{
uint8_t byte = 0x80 | (x & 0x7F);
ostr.put(byte);
x >>= 7;
}
uint8_t final_byte = static_cast<uint8_t>(x);
ostr.put(final_byte);
}
inline char * writeVarUInt(UInt64 x, char * ostr)
{
while (x > 0x7F)
{
uint8_t byte = 0x80 | (x & 0x7F);
*ostr = byte;
++ostr;
x >>= 7;
}
uint8_t final_byte = static_cast<uint8_t>(x);
*ostr = final_byte;
++ostr;
return ostr;
}
template <typename Out>
inline void writeVarInt(Int64 x, Out & ostr)
{
@ -41,8 +80,71 @@ inline char * writeVarInt(Int64 x, char * ostr)
return writeVarUInt(static_cast<UInt64>((x << 1) ^ (x >> 63)), ostr);
}
namespace impl
{
template <bool check_eof>
inline void readVarUInt(UInt64 & x, ReadBuffer & istr)
{
x = 0;
for (size_t i = 0; i < 10; ++i)
{
if constexpr (check_eof)
if (istr.eof()) [[unlikely]]
throwReadAfterEOF();
UInt64 byte = *istr.position();
++istr.position();
x |= (byte & 0x7F) << (7 * i);
if (!(byte & 0x80))
return;
}
}
}
inline void readVarUInt(UInt64 & x, ReadBuffer & istr)
{
if (istr.buffer().end() - istr.position() >= 10)
return impl::readVarUInt<false>(x, istr);
return impl::readVarUInt<true>(x, istr);
}
inline void readVarUInt(UInt64 & x, std::istream & istr)
{
x = 0;
for (size_t i = 0; i < 10; ++i)
{
UInt64 byte = istr.get();
x |= (byte & 0x7F) << (7 * i);
if (!(byte & 0x80))
return;
}
}
inline const char * readVarUInt(UInt64 & x, const char * istr, size_t size)
{
const char * end = istr + size;
x = 0;
for (size_t i = 0; i < 10; ++i)
{
if (istr == end) [[unlikely]]
throwReadAfterEOF();
UInt64 byte = *istr;
++istr;
x |= (byte & 0x7F) << (7 * i);
if (!(byte & 0x80))
return istr;
}
return istr;
}
/// Read Int64, written in variable length format (base128)
template <typename In>
inline void readVarInt(Int64 & x, In & istr)
{
@ -57,9 +159,6 @@ inline const char * readVarInt(Int64 & x, const char * istr, size_t size)
return res;
}
/// For [U]Int32, [U]Int16, size_t.
inline void readVarUInt(UInt32 & x, ReadBuffer & istr)
{
UInt64 tmp;
@ -97,137 +196,6 @@ inline void readVarUInt(T & x, ReadBuffer & istr)
x = tmp;
}
template <bool fast>
inline void readVarUIntImpl(UInt64 & x, ReadBuffer & istr)
{
x = 0;
for (size_t i = 0; i < 9; ++i)
{
if constexpr (!fast)
if (istr.eof()) [[unlikely]]
throwReadAfterEOF();
UInt64 byte = *istr.position();
++istr.position();
x |= (byte & 0x7F) << (7 * i);
if (!(byte & 0x80))
return;
}
}
inline void readVarUInt(UInt64 & x, ReadBuffer & istr)
{
if (istr.buffer().end() - istr.position() >= 9)
return readVarUIntImpl<true>(x, istr);
return readVarUIntImpl<false>(x, istr);
}
inline void readVarUInt(UInt64 & x, std::istream & istr)
{
x = 0;
for (size_t i = 0; i < 9; ++i)
{
UInt64 byte = istr.get();
x |= (byte & 0x7F) << (7 * i);
if (!(byte & 0x80))
return;
}
}
inline const char * readVarUInt(UInt64 & x, const char * istr, size_t size)
{
const char * end = istr + size;
x = 0;
for (size_t i = 0; i < 9; ++i)
{
if (istr == end) [[unlikely]]
throwReadAfterEOF();
UInt64 byte = *istr;
++istr;
x |= (byte & 0x7F) << (7 * i);
if (!(byte & 0x80))
return istr;
}
return istr;
}
/// NOTE: Due to historical reasons, only values up to 1<<63-1 can be safely encoded/decoded (bigger values are not idempotent under
/// encoding/decoding). This cannot be changed without breaking backward compatibility (some drivers, e.g. clickhouse-rs (Rust), have the
/// same limitation, others support the full 1<<64 range, e.g. clickhouse-driver (Python))
constexpr UInt64 VAR_UINT_MAX = (1ULL<<63) - 1;
inline void writeVarUInt(UInt64 x, WriteBuffer & ostr)
{
if (x > VAR_UINT_MAX) [[unlikely]]
throwValueTooLargeForVarIntEncoding(x);
for (size_t i = 0; i < 9; ++i)
{
uint8_t byte = x & 0x7F;
if (x > 0x7F)
byte |= 0x80;
ostr.nextIfAtEnd();
*ostr.position() = byte;
++ostr.position();
x >>= 7;
if (!x)
return;
}
}
inline void writeVarUInt(UInt64 x, std::ostream & ostr)
{
if (x > VAR_UINT_MAX) [[unlikely]]
throwValueTooLargeForVarIntEncoding(x);
for (size_t i = 0; i < 9; ++i)
{
uint8_t byte = x & 0x7F;
if (x > 0x7F)
byte |= 0x80;
ostr.put(byte);
x >>= 7;
if (!x)
return;
}
}
inline char * writeVarUInt(UInt64 x, char * ostr)
{
if (x > VAR_UINT_MAX) [[unlikely]]
throwValueTooLargeForVarIntEncoding(x);
for (size_t i = 0; i < 9; ++i)
{
uint8_t byte = x & 0x7F;
if (x > 0x7F)
byte |= 0x80;
*ostr = byte;
++ostr;
x >>= 7;
if (!x)
return ostr;
}
return ostr;
}
inline size_t getLengthOfVarUInt(UInt64 x)
{
return x < (1ULL << 7) ? 1
@ -238,7 +206,8 @@ inline size_t getLengthOfVarUInt(UInt64 x)
: (x < (1ULL << 42) ? 6
: (x < (1ULL << 49) ? 7
: (x < (1ULL << 56) ? 8
: 9)))))));
: (x < (1ULL << 63) ? 9
: 10))))))));
}

View File

@ -77,6 +77,7 @@ struct WriteBufferFromS3::PartData
WriteBufferFromS3::WriteBufferFromS3(
std::shared_ptr<const S3::Client> client_ptr_,
std::shared_ptr<const S3::Client> client_with_long_timeout_ptr_,
const String & bucket_,
const String & key_,
size_t buf_size_,
@ -91,6 +92,7 @@ WriteBufferFromS3::WriteBufferFromS3(
, upload_settings(request_settings.getUploadSettings())
, write_settings(write_settings_)
, client_ptr(std::move(client_ptr_))
, client_with_long_timeout_ptr(std::move(client_with_long_timeout_ptr_))
, object_metadata(std::move(object_metadata_))
, buffer_allocation_policy(ChooseBufferPolicy(upload_settings))
, task_tracker(
@ -552,7 +554,7 @@ void WriteBufferFromS3::completeMultipartUpload()
ProfileEvents::increment(ProfileEvents::DiskS3CompleteMultipartUpload);
Stopwatch watch;
auto outcome = client_ptr->CompleteMultipartUpload(req);
auto outcome = client_with_long_timeout_ptr->CompleteMultipartUpload(req);
watch.stop();
ProfileEvents::increment(ProfileEvents::WriteBufferFromS3Microseconds, watch.elapsedMicroseconds());

View File

@ -29,6 +29,8 @@ class WriteBufferFromS3 final : public WriteBufferFromFileBase
public:
WriteBufferFromS3(
std::shared_ptr<const S3::Client> client_ptr_,
/// for CompleteMultipartUploadRequest, because it blocks on recv() for a few seconds on big uploads
std::shared_ptr<const S3::Client> client_with_long_timeout_ptr_,
const String & bucket_,
const String & key_,
size_t buf_size_,
@ -86,6 +88,7 @@ private:
const S3Settings::RequestSettings::PartUploadSettings & upload_settings;
const WriteSettings write_settings;
const std::shared_ptr<const S3::Client> client_ptr;
const std::shared_ptr<const S3::Client> client_with_long_timeout_ptr;
const std::optional<std::map<String, String>> object_metadata;
Poco::Logger * log = &Poco::Logger::get("WriteBufferFromS3");

View File

@ -526,6 +526,7 @@ public:
getAsyncPolicy().setAutoExecute(false);
return std::make_unique<WriteBufferFromS3>(
client,
client,
bucket,
file_name,

View File

@ -465,8 +465,12 @@ void ActionsDAG::removeUnusedActions(const Names & required_names, bool allow_re
void ActionsDAG::removeUnusedActions(bool allow_remove_inputs, bool allow_constant_folding)
{
std::unordered_set<const Node *> visited_nodes;
std::unordered_set<const Node *> used_inputs;
std::stack<Node *> stack;
for (const auto * input : inputs)
used_inputs.insert(input);
for (const auto * node : outputs)
{
visited_nodes.insert(node);
@ -484,7 +488,7 @@ void ActionsDAG::removeUnusedActions(bool allow_remove_inputs, bool allow_consta
stack.push(&node);
}
if (node.type == ActionType::INPUT && !allow_remove_inputs)
if (node.type == ActionType::INPUT && !allow_remove_inputs && used_inputs.contains(&node))
visited_nodes.insert(&node);
}
@ -1365,8 +1369,8 @@ ActionsDAGPtr ActionsDAG::merge(ActionsDAG && first, ActionsDAG && second)
{
first.mergeInplace(std::move(second));
/// Drop unused inputs and, probably, some actions.
first.removeUnusedActions();
/// Some actions could become unused. Do not drop inputs to preserve the header.
first.removeUnusedActions(false);
return std::make_shared<ActionsDAG>(std::move(first));
}

View File

@ -78,7 +78,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION;
extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
extern const int FUNCTION_CANNOT_HAVE_PARAMETERS;
}
static NamesAndTypesList::iterator findColumn(const String & name, NamesAndTypesList & cols)
@ -1106,12 +1105,6 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
}
}
/// Normal functions are not parametric for now.
if (node.parameters)
{
throw Exception(ErrorCodes::FUNCTION_CANNOT_HAVE_PARAMETERS, "Function {} is not parametric", node.name);
}
Names argument_names;
DataTypes argument_types;
bool arguments_present = true;

View File

@ -806,13 +806,6 @@ bool FileCache::tryReserve(FileSegment & file_segment, const size_t size)
return true;
}
void FileCache::removeKey(const Key & key)
{
assertInitialized();
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW);
locked_key->removeAll();
}
void FileCache::removeKeyIfExists(const Key & key)
{
assertInitialized();
@ -825,14 +818,7 @@ void FileCache::removeKeyIfExists(const Key & key)
/// But if we have multiple replicated zero-copy tables on the same server
/// it became possible to start removing something from cache when it is used
/// by other "zero-copy" tables. That is why it's not an error.
locked_key->removeAll(/* if_releasable */true);
}
void FileCache::removeFileSegment(const Key & key, size_t offset)
{
assertInitialized();
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW);
locked_key->removeFileSegment(offset);
locked_key->removeAllReleasable();
}
void FileCache::removePathIfExists(const String & path)
@ -844,12 +830,22 @@ void FileCache::removeAllReleasable()
{
assertInitialized();
metadata.iterate([](LockedKey & locked_key) { locked_key.removeAll(/* if_releasable */true); });
auto lock = lockCache();
main_priority->iterate([&](LockedKey & locked_key, const FileSegmentMetadataPtr & segment_metadata)
{
if (segment_metadata->releasable())
{
auto file_segment = segment_metadata->file_segment;
locked_key.removeFileSegment(file_segment->offset(), file_segment->lock());
return PriorityIterationResult::REMOVE_AND_CONTINUE;
}
return PriorityIterationResult::CONTINUE;
}, lock);
if (stash)
{
/// Remove all access information.
auto lock = lockCache();
stash->records.clear();
stash->queue->removeAll(lock);
}
@ -919,7 +915,7 @@ void FileCache::loadMetadata()
continue;
}
const auto key = Key::fromKeyString(key_directory.filename().string());
const auto key = Key(unhexUInt<UInt128>(key_directory.filename().string().data()));
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::CREATE_EMPTY, /* is_initial_load */true);
for (fs::directory_iterator offset_it{key_directory}; offset_it != fs::directory_iterator(); ++offset_it)
@ -1074,7 +1070,7 @@ FileSegmentsHolderPtr FileCache::getSnapshot()
FileSegmentsHolderPtr FileCache::getSnapshot(const Key & key)
{
FileSegments file_segments;
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW_LOGICAL);
auto locked_key = metadata.lockKeyMetadata(key, CacheMetadata::KeyNotFoundPolicy::THROW);
for (const auto & [_, file_segment_metadata] : *locked_key->getKeyMetadata())
file_segments.push_back(FileSegment::getSnapshot(file_segment_metadata->file_segment));
return std::make_unique<FileSegmentsHolder>(std::move(file_segments));

View File

@ -83,19 +83,13 @@ public:
FileSegmentsHolderPtr set(const Key & key, size_t offset, size_t size, const CreateFileSegmentSettings & settings);
/// Remove file segment by `key` and `offset`. Throws if file segment does not exist.
void removeFileSegment(const Key & key, size_t offset);
/// Remove files by `key`. Throws if key does not exist.
void removeKey(const Key & key);
/// Remove files by `key`.
/// Remove files by `key`. Removes files which might be used at the moment.
void removeKeyIfExists(const Key & key);
/// Removes files by `path`.
/// Removes files by `path`. Removes files which might be used at the moment.
void removePathIfExists(const String & path);
/// Remove files by `key`.
/// Remove files by `key`. Will not remove files which are used at the moment.
void removeAllReleasable();
std::vector<String> tryGetCachePaths(const Key & key);

View File

@ -28,9 +28,4 @@ FileCacheKey FileCacheKey::random()
return FileCacheKey(UUIDHelpers::generateV4().toUnderType());
}
FileCacheKey FileCacheKey::fromKeyString(const std::string & key_str)
{
return FileCacheKey(unhexUInt<UInt128>(key_str.data()));
}
}

View File

@ -21,8 +21,6 @@ struct FileCacheKey
static FileCacheKey random();
bool operator==(const FileCacheKey & other) const { return key == other.key; }
static FileCacheKey fromKeyString(const std::string & key_str);
};
using FileCacheKeyAndOffset = std::pair<FileCacheKey, size_t>;

View File

@ -25,7 +25,6 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int BAD_ARGUMENTS;
}
FileSegmentMetadata::FileSegmentMetadata(FileSegmentPtr && file_segment_)
@ -192,8 +191,6 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata(
if (it == end())
{
if (key_not_found_policy == KeyNotFoundPolicy::THROW)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No such key `{}` in cache", key);
else if (key_not_found_policy == KeyNotFoundPolicy::THROW_LOGICAL)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key `{}` in cache", key);
else if (key_not_found_policy == KeyNotFoundPolicy::RETURN_NULL)
return nullptr;
@ -218,8 +215,6 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata(
return locked_metadata;
if (key_not_found_policy == KeyNotFoundPolicy::THROW)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "No such key `{}` in cache", key);
else if (key_not_found_policy == KeyNotFoundPolicy::THROW_LOGICAL)
throw Exception(ErrorCodes::LOGICAL_ERROR, "No such key `{}` in cache", key);
if (key_not_found_policy == KeyNotFoundPolicy::RETURN_NULL)
@ -563,11 +558,11 @@ bool LockedKey::isLastOwnerOfFileSegment(size_t offset) const
return file_segment_metadata->file_segment.use_count() == 2;
}
void LockedKey::removeAll(bool if_releasable)
void LockedKey::removeAllReleasable()
{
for (auto it = key_metadata->begin(); it != key_metadata->end();)
{
if (if_releasable && !it->second->releasable())
if (!it->second->releasable())
{
++it;
continue;
@ -588,32 +583,17 @@ void LockedKey::removeAll(bool if_releasable)
}
}
KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset)
{
auto it = key_metadata->find(offset);
if (it == key_metadata->end())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "There is no offset {}", offset);
auto file_segment = it->second->file_segment;
return removeFileSegmentImpl(it, file_segment->lock());
}
KeyMetadata::iterator LockedKey::removeFileSegment(size_t offset, const FileSegmentGuard::Lock & segment_lock)
{
auto it = key_metadata->find(offset);
if (it == key_metadata->end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no offset {}", offset);
return removeFileSegmentImpl(it, segment_lock);
}
KeyMetadata::iterator LockedKey::removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock & segment_lock)
{
auto file_segment = it->second->file_segment;
LOG_DEBUG(
key_metadata->log, "Remove from cache. Key: {}, offset: {}, size: {}",
getKey(), file_segment->offset(), file_segment->reserved_size);
getKey(), offset, file_segment->reserved_size);
chassert(file_segment->assertCorrectnessUnlocked(segment_lock));

View File

@ -87,7 +87,7 @@ struct CacheMetadata : public std::unordered_map<FileCacheKey, KeyMetadataPtr>,
{
public:
using Key = FileCacheKey;
using IterateCacheMetadataFunc = std::function<void(LockedKey &)>;
using IterateCacheMetadataFunc = std::function<void(const LockedKey &)>;
explicit CacheMetadata(const std::string & path_);
@ -106,7 +106,6 @@ public:
enum class KeyNotFoundPolicy
{
THROW,
THROW_LOGICAL,
CREATE_EMPTY,
RETURN_NULL,
};
@ -170,10 +169,9 @@ struct LockedKey : private boost::noncopyable
std::shared_ptr<const KeyMetadata> getKeyMetadata() const { return key_metadata; }
std::shared_ptr<KeyMetadata> getKeyMetadata() { return key_metadata; }
void removeAll(bool if_releasable = true);
void removeAllReleasable();
KeyMetadata::iterator removeFileSegment(size_t offset, const FileSegmentGuard::Lock &);
KeyMetadata::iterator removeFileSegment(size_t offset);
void shrinkFileSegmentToDownloadedSize(size_t offset, const FileSegmentGuard::Lock &);
@ -190,8 +188,6 @@ struct LockedKey : private boost::noncopyable
std::string toString() const;
private:
KeyMetadata::iterator removeFileSegmentImpl(KeyMetadata::iterator it, const FileSegmentGuard::Lock &);
const std::shared_ptr<KeyMetadata> key_metadata;
KeyGuard::Lock lock; /// `lock` must be destructed before `key_metadata`.
};

View File

@ -30,6 +30,7 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
extern const int INVALID_SHARD_ID;
extern const int NO_SUCH_REPLICA;
extern const int BAD_ARGUMENTS;
}
namespace
@ -524,7 +525,7 @@ Cluster::Cluster(
addresses_with_failover.emplace_back(current);
addShard(settings, std::move(current), params.treat_local_as_remote, current_shard_num);
addShard(settings, std::move(current), params.treat_local_as_remote, current_shard_num, /* insert_paths= */ {}, /* weight= */ 1);
++current_shard_num;
}
@ -552,7 +553,7 @@ Cluster::Cluster(
addresses_with_failover.emplace_back(current);
addShard(settings, std::move(current), params.treat_local_as_remote, current_shard_num);
addShard(settings, std::move(current), params.treat_local_as_remote, current_shard_num, /* insert_paths= */ {}, /* weight= */ 1);
++current_shard_num;
}
@ -614,6 +615,12 @@ Poco::Timespan Cluster::saturate(Poco::Timespan v, Poco::Timespan limit)
void Cluster::initMisc()
{
/// NOTE: It is possible to have cluster w/o shards for
/// optimize_skip_unused_shards (i.e. WHERE 0 expression), so check the
/// slots only if shards is not empty.
if (!shards_info.empty() && slot_to_shard.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Cluster with zero weight on all shards is prohibited");
for (const auto & shard_info : shards_info)
{
if (!shard_info.isLocal() && !shard_info.hasRemoteConnections())
@ -708,6 +715,7 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
ShardInfo info;
info.shard_num = ++shard_num;
info.weight = 1;
if (address.is_local)
info.local_addresses.push_back(address);
@ -733,6 +741,8 @@ Cluster::Cluster(Cluster::ReplicasAsShardsTag, const Cluster & from, const Setti
info.per_replica_pools = {std::move(pool)};
addresses_with_failover.emplace_back(Addresses{address});
slot_to_shard.insert(std::end(slot_to_shard), info.weight, shards_info.size());
shards_info.emplace_back(std::move(info));
}
};
@ -762,7 +772,11 @@ Cluster::Cluster(Cluster::SubclusterTag, const Cluster & from, const std::vector
{
for (size_t index : indices)
{
shards_info.emplace_back(from.shards_info.at(index));
const auto & from_shard = from.shards_info.at(index);
if (from_shard.weight)
slot_to_shard.insert(std::end(slot_to_shard), from_shard.weight, shards_info.size());
shards_info.emplace_back(from_shard);
if (!from.addresses_with_failover.empty())
addresses_with_failover.emplace_back(from.addresses_with_failover.at(index));

View File

@ -176,6 +176,15 @@ namespace ErrorCodes
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
}
#define SHUTDOWN(log, desc, ptr, method) do \
{ \
if (ptr) \
{ \
LOG_DEBUG(log, "Shutting down " desc); \
(ptr)->method; \
} \
} while (false) \
/** Set of known objects (environment), that could be used in query.
* Shared (global) part. Order of members (especially, order of destruction) is very important.
@ -479,35 +488,29 @@ struct ContextSharedPart : boost::noncopyable
/// Stop periodic reloading of the configuration files.
/// This must be done first because otherwise the reloading may pass a changed config
/// to some destroyed parts of ContextSharedPart.
if (external_dictionaries_loader)
external_dictionaries_loader->enablePeriodicUpdates(false);
if (external_user_defined_executable_functions_loader)
external_user_defined_executable_functions_loader->enablePeriodicUpdates(false);
if (user_defined_sql_objects_loader)
user_defined_sql_objects_loader->stopWatching();
SHUTDOWN(log, "dictionaries loader", external_dictionaries_loader, enablePeriodicUpdates(false));
SHUTDOWN(log, "UDFs loader", external_user_defined_executable_functions_loader, enablePeriodicUpdates(false));
SHUTDOWN(log, "another UDFs loader", user_defined_sql_objects_loader, stopWatching());
LOG_TRACE(log, "Shutting down named sessions");
Session::shutdownNamedSessions();
/// Waiting for current backups/restores to be finished. This must be done before `DatabaseCatalog::shutdown()`.
if (backups_worker)
backups_worker->shutdown();
SHUTDOWN(log, "backups worker", backups_worker, shutdown());
/** After system_logs have been shut down it is guaranteed that no system table gets created or written to.
* Note that part changes at shutdown won't be logged to part log.
*/
if (system_logs)
system_logs->shutdown();
SHUTDOWN(log, "system logs", system_logs, shutdown());
LOG_TRACE(log, "Shutting down database catalog");
DatabaseCatalog::shutdown();
if (merge_mutate_executor)
merge_mutate_executor->wait();
if (fetch_executor)
fetch_executor->wait();
if (moves_executor)
moves_executor->wait();
if (common_executor)
common_executor->wait();
SHUTDOWN(log, "merges executor", merge_mutate_executor, wait());
SHUTDOWN(log, "fetches executor", fetch_executor, wait());
SHUTDOWN(log, "moves executor", moves_executor, wait());
SHUTDOWN(log, "common executor", common_executor, wait());
TransactionLog::shutdownIfAny();
@ -533,10 +536,12 @@ struct ContextSharedPart : boost::noncopyable
/// DDLWorker should be deleted without lock, cause its internal thread can
/// take it as well, which will cause deadlock.
LOG_TRACE(log, "Shutting down DDLWorker");
delete_ddl_worker.reset();
/// Background operations in cache use background schedule pool.
/// Deactivate them before destructing it.
LOG_TRACE(log, "Shutting down caches");
const auto & caches = FileCacheFactory::instance().getAll();
for (const auto & [_, cache] : caches)
cache->cache->deactivateBackgroundOperations();

View File

@ -56,6 +56,7 @@ namespace ErrorCodes
extern const int DATABASE_ACCESS_DENIED;
extern const int LOGICAL_ERROR;
extern const int HAVE_DEPENDENT_OBJECTS;
extern const int UNFINISHED;
}
TemporaryTableHolder::TemporaryTableHolder(ContextPtr context_, const TemporaryTableHolder::Creator & creator, const ASTPtr & query)
@ -196,6 +197,9 @@ void DatabaseCatalog::startupBackgroundCleanup()
void DatabaseCatalog::shutdownImpl()
{
is_shutting_down = true;
wait_table_finally_dropped.notify_all();
if (cleanup_task)
(*cleanup_task)->deactivate();
@ -227,9 +231,11 @@ void DatabaseCatalog::shutdownImpl()
databases_with_delayed_shutdown.push_back(database.second);
continue;
}
LOG_TRACE(log, "Shutting down database {}", database.first);
database.second->shutdown();
}
LOG_TRACE(log, "Shutting down system databases");
for (auto & database : databases_with_delayed_shutdown)
{
database->shutdown();
@ -1161,8 +1167,13 @@ void DatabaseCatalog::waitTableFinallyDropped(const UUID & uuid)
std::unique_lock lock{tables_marked_dropped_mutex};
wait_table_finally_dropped.wait(lock, [&]() TSA_REQUIRES(tables_marked_dropped_mutex) -> bool
{
return !tables_marked_dropped_ids.contains(uuid);
return !tables_marked_dropped_ids.contains(uuid) || is_shutting_down;
});
/// TSA doesn't support unique_lock
if (TSA_SUPPRESS_WARNING_FOR_READ(tables_marked_dropped_ids).contains(uuid))
throw Exception(ErrorCodes::UNFINISHED, "Did not finish dropping the table with UUID {} because the server is shutting down, "
"will finish after restart", uuid);
}
void DatabaseCatalog::addDependencies(

View File

@ -308,6 +308,8 @@ private:
Poco::Logger * log;
std::atomic_bool is_shutting_down = false;
/// Do not allow simultaneous execution of DDL requests on the same table.
/// database name -> database guard -> (table name mutex, counter),
/// counter: how many threads are running a query on the table at the same time

View File

@ -40,8 +40,6 @@ NamesAndTypesList FilesystemCacheLogElement::getNamesAndTypes()
{"source_file_path", std::make_shared<DataTypeString>()},
{"file_segment_range", std::make_shared<DataTypeTuple>(types)},
{"total_requested_range", std::make_shared<DataTypeTuple>(types)},
{"key", std::make_shared<DataTypeString>()},
{"offset", std::make_shared<DataTypeUInt64>()},
{"size", std::make_shared<DataTypeUInt64>()},
{"read_type", std::make_shared<DataTypeString>()},
{"read_from_cache_attempted", std::make_shared<DataTypeUInt8>()},
@ -62,8 +60,6 @@ void FilesystemCacheLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(source_file_path);
columns[i++]->insert(Tuple{file_segment_range.first, file_segment_range.second});
columns[i++]->insert(Tuple{requested_range.first, requested_range.second});
columns[i++]->insert(file_segment_key);
columns[i++]->insert(file_segment_offset);
columns[i++]->insert(file_segment_size);
columns[i++]->insert(typeToString(cache_type));
columns[i++]->insert(read_from_cache_attempted);

View File

@ -11,7 +11,16 @@
namespace DB
{
///
/// -------- Column --------- Type ------
/// | event_date | DateTime |
/// | event_time | UInt64 |
/// | query_id | String |
/// | remote_file_path | String |
/// | segment_range | Tuple |
/// | read_type | String |
/// -------------------------------------
///
struct FilesystemCacheLogElement
{
enum class CacheType
@ -30,8 +39,6 @@ struct FilesystemCacheLogElement
std::pair<size_t, size_t> file_segment_range{};
std::pair<size_t, size_t> requested_range{};
CacheType cache_type{};
std::string file_segment_key;
size_t file_segment_offset;
size_t file_segment_size;
bool read_from_cache_attempted;
String read_buffer_id;

View File

@ -1079,6 +1079,7 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
// Table SQL definition is available even if the table is detached (even permanently)
auto query = database->getCreateTableQuery(create.getTable(), getContext());
FunctionNameNormalizer().visit(query.get());
auto create_query = query->as<ASTCreateQuery &>();
if (!create.is_dictionary && create_query.is_dictionary)

View File

@ -370,18 +370,7 @@ BlockIO InterpreterSystemQuery::execute()
else
{
auto cache = FileCacheFactory::instance().getByName(query.filesystem_cache_name).cache;
if (query.key_to_drop.empty())
{
cache->removeAllReleasable();
}
else
{
auto key = FileCacheKey::fromKeyString(query.key_to_drop);
if (query.offset_to_drop.has_value())
cache->removeFileSegment(key, query.offset_to_drop.value());
else
cache->removeKey(key);
}
cache->removeAllReleasable();
}
break;
}
@ -470,16 +459,6 @@ BlockIO InterpreterSystemQuery::execute()
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_USERS);
system_context->getAccessControl().reload(AccessControl::ReloadMode::ALL);
break;
case Type::RELOAD_SYMBOLS:
{
#if defined(__ELF__) && !defined(OS_FREEBSD)
getContext()->checkAccess(AccessType::SYSTEM_RELOAD_SYMBOLS);
SymbolIndex::reload();
break;
#else
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "SYSTEM RELOAD SYMBOLS is not supported on current platform");
#endif
}
case Type::STOP_MERGES:
startStopAction(ActionLocks::PartsMerge, false);
break;
@ -1056,11 +1035,6 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_RELOAD_USERS);
break;
}
case Type::RELOAD_SYMBOLS:
{
required_access.emplace_back(AccessType::SYSTEM_RELOAD_SYMBOLS);
break;
}
case Type::STOP_MERGES:
case Type::START_MERGES:
{

View File

@ -191,14 +191,21 @@ void ServerAsynchronousMetrics::updateImpl(AsynchronousMetricValues & new_values
auto available = disk->getAvailableSpace();
auto unreserved = disk->getUnreservedSpace();
new_values[fmt::format("DiskTotal_{}", name)] = { total,
"The total size in bytes of the disk (virtual filesystem). Remote filesystems can show a large value like 16 EiB." };
new_values[fmt::format("DiskUsed_{}", name)] = { total - available,
"Used bytes on the disk (virtual filesystem). Remote filesystems not always provide this information." };
new_values[fmt::format("DiskAvailable_{}", name)] = { available,
"Available bytes on the disk (virtual filesystem). Remote filesystems can show a large value like 16 EiB." };
new_values[fmt::format("DiskUnreserved_{}", name)] = { unreserved,
"Available bytes on the disk (virtual filesystem) without the reservations for merges, fetches, and moves. Remote filesystems can show a large value like 16 EiB." };
new_values[fmt::format("DiskTotal_{}", name)] = { *total,
"The total size in bytes of the disk (virtual filesystem). Remote filesystems may not provide this information." };
if (available)
{
new_values[fmt::format("DiskUsed_{}", name)] = { *total - *available,
"Used bytes on the disk (virtual filesystem). Remote filesystems not always provide this information." };
new_values[fmt::format("DiskAvailable_{}", name)] = { *available,
"Available bytes on the disk (virtual filesystem). Remote filesystems may not provide this information." };
}
if (unreserved)
new_values[fmt::format("DiskUnreserved_{}", name)] = { *unreserved,
"Available bytes on the disk (virtual filesystem) without the reservations for merges, fetches, and moves. Remote filesystems may not provide this information." };
}
}

View File

@ -2,6 +2,7 @@
#include <Columns/ColumnVector.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Common/Exception.h>
#include <type_traits>
@ -12,13 +13,19 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
template <typename T>
IColumn::Selector createBlockSelector(
const IColumn & column,
const std::vector<UInt64> & slots)
{
const auto total_weight = slots.size();
assert(total_weight != 0);
if (total_weight == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "weight is zero");
size_t num_rows = column.size();
IColumn::Selector selector(num_rows);

View File

@ -322,8 +322,8 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// This does not have impact on the final span logs, because these internal queries are issued by external queries,
/// we still have enough span logs for the execution of external queries.
std::shared_ptr<OpenTelemetry::SpanHolder> query_span = internal ? nullptr : std::make_shared<OpenTelemetry::SpanHolder>("query");
if (query_span)
LOG_DEBUG(&Poco::Logger::get("executeQuery"), "Query span trace_id for opentelemetry log: {}", query_span->trace_id);
if (query_span && query_span->trace_id != UUID{})
LOG_TRACE(&Poco::Logger::get("executeQuery"), "Query span trace_id for opentelemetry log: {}", query_span->trace_id);
auto query_start_time = std::chrono::system_clock::now();

View File

@ -44,6 +44,7 @@ ASTPtr ASTColumnDeclaration::clone() const
res->ttl = ttl->clone();
res->children.push_back(res->ttl);
}
if (collation)
{
res->collation = collation->clone();
@ -76,6 +77,10 @@ void ASTColumnDeclaration::formatImpl(const FormatSettings & settings, FormatSta
<< (*null_modifier ? "" : "NOT ") << "NULL" << (settings.hilite ? hilite_none : "");
}
if (primary_key_specifier)
settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "")
<< "PRIMARY KEY" << (settings.hilite ? hilite_none : "");
if (default_expression)
{
settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << default_specifier << (settings.hilite ? hilite_none : "");

View File

@ -21,6 +21,7 @@ public:
ASTPtr codec;
ASTPtr ttl;
ASTPtr collation;
bool primary_key_specifier = false;
String getID(char delim) const override { return "ColumnDeclaration" + (delim + name); }

View File

@ -56,6 +56,7 @@ public:
ASTExpressionList * constraints = nullptr;
ASTExpressionList * projections = nullptr;
IAST * primary_key = nullptr;
IAST * primary_key_from_columns = nullptr;
String getID(char) const override { return "Columns definition"; }
@ -76,7 +77,7 @@ public:
f(reinterpret_cast<void **>(&primary_key));
f(reinterpret_cast<void **>(&constraints));
f(reinterpret_cast<void **>(&projections));
f(reinterpret_cast<void **>(&primary_key));
f(reinterpret_cast<void **>(&primary_key_from_columns));
}
};

View File

@ -210,15 +210,7 @@ void ASTSystemQuery::formatImpl(const FormatSettings & settings, FormatState &,
else if (type == Type::DROP_FILESYSTEM_CACHE)
{
if (!filesystem_cache_name.empty())
{
settings.ostr << (settings.hilite ? hilite_none : "") << " " << filesystem_cache_name;
if (!key_to_drop.empty())
{
settings.ostr << (settings.hilite ? hilite_none : "") << " KEY " << key_to_drop;
if (offset_to_drop.has_value())
settings.ostr << (settings.hilite ? hilite_none : "") << " OFFSET " << offset_to_drop.value();
}
}
}
else if (type == Type::UNFREEZE)
{

Some files were not shown because too many files have changed in this diff Show More