Merge branch 'master' into less-logs-2

This commit is contained in:
Alexey Milovidov 2023-07-09 08:49:44 +03:00 committed by GitHub
commit 59eadca95c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
251 changed files with 2775 additions and 1707 deletions

View File

@ -75,51 +75,6 @@ jobs:
Codebrowser:
needs: [DockerHubPush]
uses: ./.github/workflows/woboq.yml
BuilderCoverity:
needs: DockerHubPush
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
BUILD_NAME=coverity
CACHES_PATH=${{runner.temp}}/../ccaches
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
TEMP_PATH=${{runner.temp}}/build_check
EOF
echo "COVERITY_TOKEN=${{ secrets.COVERITY_TOKEN }}" >> "$GITHUB_ENV"
- name: Download changed images
uses: actions/download-artifact@v3
with:
name: changed_images
path: ${{ env.IMAGES_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
submodules: true
- name: Build
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
- name: Upload Coverity Analysis
if: ${{ success() || failure() }}
run: |
curl --form token="${COVERITY_TOKEN}" \
--form email='security+coverity@clickhouse.com' \
--form file="@$TEMP_PATH/$BUILD_NAME/coverity-scan.tar.gz" \
--form version="${GITHUB_REF#refs/heads/}-${GITHUB_SHA::6}" \
--form description="Nighly Scan: $(date +'%Y-%m-%dT%H:%M:%S')" \
https://scan.coverity.com/builds?project=ClickHouse%2FClickHouse
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
SonarCloud:
runs-on: [self-hosted, builder]
env:

View File

@ -87,7 +87,6 @@ if (ENABLE_FUZZING)
set (ENABLE_CLICKHOUSE_ODBC_BRIDGE OFF)
set (ENABLE_LIBRARIES 0)
set (ENABLE_SSL 1)
set (USE_UNWIND ON)
set (ENABLE_EMBEDDED_COMPILER 0)
set (ENABLE_EXAMPLES 0)
set (ENABLE_UTILS 0)
@ -344,9 +343,9 @@ if (COMPILER_CLANG)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-absolute-paths")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fdiagnostics-absolute-paths")
if (NOT ENABLE_TESTS AND NOT SANITIZE)
if (NOT ENABLE_TESTS AND NOT SANITIZE AND OS_LINUX)
# https://clang.llvm.org/docs/ThinLTO.html
# Applies to clang only.
# Applies to clang and linux only.
# Disabled when building with tests or sanitizers.
option(ENABLE_THINLTO "Clang-specific link time optimization" ON)
endif()

View File

@ -15,25 +15,34 @@
static thread_local uint64_t current_tid = 0;
static void setCurrentThreadId()
{
#if defined(OS_ANDROID)
current_tid = gettid();
#elif defined(OS_LINUX)
current_tid = static_cast<uint64_t>(syscall(SYS_gettid)); /// This call is always successful. - man gettid
#elif defined(OS_FREEBSD)
current_tid = pthread_getthreadid_np();
#elif defined(OS_SUNOS)
// On Solaris-derived systems, this returns the ID of the LWP, analogous
// to a thread.
current_tid = static_cast<uint64_t>(pthread_self());
#else
if (0 != pthread_threadid_np(nullptr, &current_tid))
throw std::logic_error("pthread_threadid_np returned error");
#endif
}
uint64_t getThreadId()
{
if (!current_tid)
{
#if defined(OS_ANDROID)
current_tid = gettid();
#elif defined(OS_LINUX)
current_tid = static_cast<uint64_t>(syscall(SYS_gettid)); /// This call is always successful. - man gettid
#elif defined(OS_FREEBSD)
current_tid = pthread_getthreadid_np();
#elif defined(OS_SUNOS)
// On Solaris-derived systems, this returns the ID of the LWP, analogous
// to a thread.
current_tid = static_cast<uint64_t>(pthread_self());
#else
if (0 != pthread_threadid_np(nullptr, &current_tid))
throw std::logic_error("pthread_threadid_np returned error");
#endif
}
setCurrentThreadId();
return current_tid;
}
void updateCurrentThreadIdAfterFork()
{
setCurrentThreadId();
}

View File

@ -3,3 +3,5 @@
/// Obtain thread id from OS. The value is cached in thread local variable.
uint64_t getThreadId();
void updateCurrentThreadIdAfterFork();

View File

@ -15,6 +15,7 @@ set(CMAKE_OSX_DEPLOYMENT_TARGET 10.15)
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
include (cmake/unwind.cmake)
include (cmake/cxx.cmake)
link_libraries(global-group)

View File

@ -18,6 +18,9 @@ if (NOT PARALLEL_COMPILE_JOBS AND TOTAL_PHYSICAL_MEMORY AND MAX_COMPILER_MEMORY)
if (NOT PARALLEL_COMPILE_JOBS)
set (PARALLEL_COMPILE_JOBS 1)
endif ()
if (NOT NUMBER_OF_LOGICAL_CORES OR PARALLEL_COMPILE_JOBS LESS NUMBER_OF_LOGICAL_CORES)
set (PARALLEL_COMPILE_JOBS_LESS TRUE)
endif()
endif ()
if (PARALLEL_COMPILE_JOBS AND (NOT NUMBER_OF_LOGICAL_CORES OR PARALLEL_COMPILE_JOBS LESS NUMBER_OF_LOGICAL_CORES))
@ -33,6 +36,9 @@ if (NOT PARALLEL_LINK_JOBS AND TOTAL_PHYSICAL_MEMORY AND MAX_LINKER_MEMORY)
if (NOT PARALLEL_LINK_JOBS)
set (PARALLEL_LINK_JOBS 1)
endif ()
if (NOT NUMBER_OF_LOGICAL_CORES OR PARALLEL_LINK_JOBS LESS NUMBER_OF_LOGICAL_CORES)
set (PARALLEL_LINK_JOBS_LESS TRUE)
endif()
endif ()
# ThinLTO provides its own parallel linking
@ -56,4 +62,10 @@ if (PARALLEL_COMPILE_JOBS OR PARALLEL_LINK_JOBS)
message(STATUS
"${CMAKE_CURRENT_SOURCE_DIR}: Have ${TOTAL_PHYSICAL_MEMORY} megabytes of memory.
Limiting concurrent linkers jobs to ${PARALLEL_LINK_JOBS} and compiler jobs to ${PARALLEL_COMPILE_JOBS} (system has ${NUMBER_OF_LOGICAL_CORES} logical cores)")
if (PARALLEL_COMPILE_JOBS_LESS)
message(WARNING "The autocalculated compile jobs limit (${PARALLEL_COMPILE_JOBS}) underutilizes CPU cores (${NUMBER_OF_LOGICAL_CORES}). Set PARALLEL_COMPILE_JOBS to override.")
endif()
if (PARALLEL_LINK_JOBS_LESS)
message(WARNING "The autocalculated link jobs limit (${PARALLEL_LINK_JOBS}) underutilizes CPU cores (${NUMBER_OF_LOGICAL_CORES}). Set PARALLEL_LINK_JOBS to override.")
endif()
endif ()

View File

@ -40,7 +40,6 @@ if (CMAKE_CROSSCOMPILING)
set (OPENSSL_NO_ASM ON CACHE INTERNAL "")
set (ENABLE_JEMALLOC ON CACHE INTERNAL "")
set (ENABLE_PARQUET OFF CACHE INTERNAL "")
set (USE_UNWIND OFF CACHE INTERNAL "")
set (ENABLE_GRPC OFF CACHE INTERNAL "")
set (ENABLE_HDFS OFF CACHE INTERNAL "")
set (ENABLE_MYSQL OFF CACHE INTERNAL "")

View File

@ -1,13 +1 @@
option (USE_UNWIND "Enable libunwind (better stacktraces)" ${ENABLE_LIBRARIES})
if (USE_UNWIND)
add_subdirectory(contrib/libunwind-cmake)
set (UNWIND_LIBRARIES unwind)
set (EXCEPTION_HANDLING_LIBRARY ${UNWIND_LIBRARIES})
message (STATUS "Using libunwind: ${UNWIND_LIBRARIES}")
else ()
set (EXCEPTION_HANDLING_LIBRARY gcc_eh)
endif ()
message (STATUS "Using exception handler: ${EXCEPTION_HANDLING_LIBRARY}")
add_subdirectory(contrib/libunwind-cmake)

View File

@ -170,16 +170,13 @@ endif ()
target_compile_definitions(_jemalloc PRIVATE -DJEMALLOC_PROF=1)
if (USE_UNWIND)
# jemalloc provides support for two different libunwind flavors: the original HP libunwind and the one coming with gcc / g++ / libstdc++.
# The latter is identified by `JEMALLOC_PROF_LIBGCC` and uses `_Unwind_Backtrace` method instead of `unw_backtrace`.
# At the time ClickHouse uses LLVM libunwind which follows libgcc's way of backtracing.
# ClickHouse has to provide `unw_backtrace` method by the means of [commit 8e2b31e](https://github.com/ClickHouse/libunwind/commit/8e2b31e766dd502f6df74909e04a7dbdf5182eb1).
target_compile_definitions (_jemalloc PRIVATE -DJEMALLOC_PROF_LIBGCC=1)
target_link_libraries (_jemalloc PRIVATE unwind)
endif ()
# jemalloc provides support for two different libunwind flavors: the original HP libunwind and the one coming with gcc / g++ / libstdc++.
# The latter is identified by `JEMALLOC_PROF_LIBGCC` and uses `_Unwind_Backtrace` method instead of `unw_backtrace`.
# At the time ClickHouse uses LLVM libunwind which follows libgcc's way of backtracing.
#
# ClickHouse has to provide `unw_backtrace` method by the means of [commit 8e2b31e](https://github.com/ClickHouse/libunwind/commit/8e2b31e766dd502f6df74909e04a7dbdf5182eb1).
target_compile_definitions (_jemalloc PRIVATE -DJEMALLOC_PROF_LIBGCC=1)
target_link_libraries (_jemalloc PRIVATE unwind)
# for RTLD_NEXT
target_compile_options(_jemalloc PRIVATE -D_GNU_SOURCE)

View File

@ -61,9 +61,7 @@ target_include_directories(cxx SYSTEM BEFORE PUBLIC $<$<COMPILE_LANGUAGE:CXX>:$
target_compile_definitions(cxx PRIVATE -D_LIBCPP_BUILDING_LIBRARY -DLIBCXX_BUILDING_LIBCXXABI)
# Enable capturing stack traces for all exceptions.
if (USE_UNWIND)
target_compile_definitions(cxx PUBLIC -DSTD_EXCEPTION_HAS_STACK_TRACE=1)
endif ()
target_compile_definitions(cxx PUBLIC -DSTD_EXCEPTION_HAS_STACK_TRACE=1)
if (USE_MUSL)
target_compile_definitions(cxx PUBLIC -D_LIBCPP_HAS_MUSL_LIBC=1)

View File

@ -35,12 +35,10 @@ target_include_directories(cxxabi SYSTEM BEFORE
)
target_compile_definitions(cxxabi PRIVATE -D_LIBCPP_BUILDING_LIBRARY)
target_compile_options(cxxabi PRIVATE -nostdinc++ -fno-sanitize=undefined -Wno-macro-redefined) # If we don't disable UBSan, infinite recursion happens in dynamic_cast.
target_link_libraries(cxxabi PUBLIC ${EXCEPTION_HANDLING_LIBRARY})
target_link_libraries(cxxabi PUBLIC unwind)
# Enable capturing stack traces for all exceptions.
if (USE_UNWIND)
target_compile_definitions(cxxabi PUBLIC -DSTD_EXCEPTION_HAS_STACK_TRACE=1)
endif ()
target_compile_definitions(cxxabi PUBLIC -DSTD_EXCEPTION_HAS_STACK_TRACE=1)
install(
TARGETS cxxabi

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
esac
ARG REPOSITORY="https://s3.amazonaws.com/clickhouse-builds/22.4/31c367d3cd3aefd316778601ff6565119fe36682/package_release"
ARG VERSION="23.6.1.1524"
ARG VERSION="23.6.2.18"
ARG PACKAGES="clickhouse-keeper"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -49,8 +49,8 @@ ENV CARGO_HOME=/rust/cargo
ENV PATH="/rust/cargo/bin:${PATH}"
RUN curl https://sh.rustup.rs -sSf | bash -s -- -y && \
chmod 777 -R /rust && \
rustup toolchain install nightly && \
rustup default nightly && \
rustup toolchain install nightly-2023-07-04 && \
rustup default nightly-2023-07-04 && \
rustup component add rust-src && \
rustup target add aarch64-unknown-linux-gnu && \
rustup target add x86_64-apple-darwin && \

View File

@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="23.6.1.1524"
ARG VERSION="23.6.2.18"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -23,7 +23,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="23.6.1.1524"
ARG VERSION="23.6.2.18"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -166,7 +166,6 @@ function run_cmake
"-DENABLE_UTILS=0"
"-DENABLE_EMBEDDED_COMPILER=0"
"-DENABLE_THINLTO=0"
"-DUSE_UNWIND=1"
"-DENABLE_NURAFT=1"
"-DENABLE_SIMDJSON=1"
"-DENABLE_JEMALLOC=1"

View File

@ -291,7 +291,7 @@ quit
if [ "$server_died" == 1 ]
then
# The server has died.
if ! rg --text -o 'Received signal.*|Logical error.*|Assertion.*failed|Failed assertion.*|.*runtime error: .*|.*is located.*|(SUMMARY|ERROR): [a-zA-Z]+Sanitizer:.*|.*_LIBCPP_ASSERT.*' server.log > description.txt
if ! rg --text -o 'Received signal.*|Logical error.*|Assertion.*failed|Failed assertion.*|.*runtime error: .*|.*is located.*|(SUMMARY|ERROR): [a-zA-Z]+Sanitizer:.*|.*_LIBCPP_ASSERT.*|.*Child process was terminated by signal 9.*' server.log > description.txt
then
echo "Lost connection to server. See the logs." > description.txt
fi

View File

@ -92,8 +92,8 @@ sudo clickhouse stop ||:
for _ in $(seq 1 60); do if [[ $(wget --timeout=1 -q 'localhost:8123' -O-) == 'Ok.' ]]; then sleep 1 ; else break; fi ; done
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server.log ||:
pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.gz &
rg -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server.log ||:
zstd < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.zst &
# Compressed (FIXME: remove once only github actions will be left)
rm /var/log/clickhouse-server/clickhouse-server.log

View File

@ -33,7 +33,6 @@ RUN apt-get update -y \
qemu-user-static \
sqlite3 \
sudo \
telnet \
tree \
unixodbc \
wget \

View File

@ -8,8 +8,6 @@ RUN apt-get update -y \
apt-get install --yes --no-install-recommends \
bash \
tzdata \
fakeroot \
debhelper \
parallel \
expect \
python3 \
@ -20,7 +18,6 @@ RUN apt-get update -y \
sudo \
openssl \
netcat-openbsd \
telnet \
brotli \
&& apt-get clean

View File

@ -8,8 +8,6 @@ RUN apt-get update -y \
apt-get install --yes --no-install-recommends \
bash \
tzdata \
fakeroot \
debhelper \
parallel \
expect \
python3 \
@ -20,7 +18,6 @@ RUN apt-get update -y \
sudo \
openssl \
netcat-openbsd \
telnet \
brotli \
&& apt-get clean

View File

@ -44,7 +44,6 @@ RUN apt-get update \
clang-${LLVM_VERSION} \
clang-tidy-${LLVM_VERSION} \
cmake \
fakeroot \
gdb \
git \
gperf \

View File

@ -0,0 +1,20 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v22.8.20.11-lts (c9ca79e24e8) FIXME as compared to v22.8.19.10-lts (989bc2fe8b0)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix broken index analysis when binary operator contains a null constant argument [#50177](https://github.com/ClickHouse/ClickHouse/pull/50177) ([Amos Bird](https://github.com/amosbird)).
* Fix incorrect constant folding [#50536](https://github.com/ClickHouse/ClickHouse/pull/50536) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Fix fuzzer failure in ActionsDAG [#51301](https://github.com/ClickHouse/ClickHouse/pull/51301) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Fix segfault in MathUnary [#51499](https://github.com/ClickHouse/ClickHouse/pull/51499) ([Ilya Yatsishin](https://github.com/qoega)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Decoupled commits from [#51180](https://github.com/ClickHouse/ClickHouse/issues/51180) for backports [#51561](https://github.com/ClickHouse/ClickHouse/pull/51561) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -0,0 +1,25 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v23.6.2.18-stable (89f39a7ccfe) FIXME as compared to v23.6.1.1524-stable (d1c7e13d088)
#### Build/Testing/Packaging Improvement
* Backported in [#51888](https://github.com/ClickHouse/ClickHouse/issues/51888): Update cargo dependencies. [#51721](https://github.com/ClickHouse/ClickHouse/pull/51721) ([Raúl Marín](https://github.com/Algunenano)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix reading from empty column in `parseSipHashKey` [#51804](https://github.com/ClickHouse/ClickHouse/pull/51804) ([Nikita Taranov](https://github.com/nickitat)).
* Allow parametric UDFs [#51964](https://github.com/ClickHouse/ClickHouse/pull/51964) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Remove the usage of Analyzer setting in the client [#51578](https://github.com/ClickHouse/ClickHouse/pull/51578) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Fix 02116_tuple_element with Analyzer [#51669](https://github.com/ClickHouse/ClickHouse/pull/51669) ([Robert Schulze](https://github.com/rschu1ze)).
* Fix SQLLogic docker images [#51719](https://github.com/ClickHouse/ClickHouse/pull/51719) ([Antonio Andelic](https://github.com/antonio2368)).
* Fix source image for sqllogic [#51728](https://github.com/ClickHouse/ClickHouse/pull/51728) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Pin for docker-ce [#51743](https://github.com/ClickHouse/ClickHouse/pull/51743) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -23,7 +23,7 @@ sudo bash -c "$(wget -O - https://apt.llvm.org/llvm.sh)"
``` bash
cd ClickHouse
mkdir build-riscv64
CC=clang-16 CXX=clang++-16 cmake . -Bbuild-riscv64 -G Ninja -DCMAKE_TOOLCHAIN_FILE=cmake/linux/toolchain-riscv64.cmake -DGLIBC_COMPATIBILITY=OFF -DENABLE_LDAP=OFF -DOPENSSL_NO_ASM=ON -DENABLE_JEMALLOC=ON -DENABLE_PARQUET=OFF -DUSE_UNWIND=OFF -DENABLE_GRPC=OFF -DENABLE_HDFS=OFF -DENABLE_MYSQL=OFF
CC=clang-16 CXX=clang++-16 cmake . -Bbuild-riscv64 -G Ninja -DCMAKE_TOOLCHAIN_FILE=cmake/linux/toolchain-riscv64.cmake -DGLIBC_COMPATIBILITY=OFF -DENABLE_LDAP=OFF -DOPENSSL_NO_ASM=ON -DENABLE_JEMALLOC=ON -DENABLE_PARQUET=OFF -DENABLE_GRPC=OFF -DENABLE_HDFS=OFF -DENABLE_MYSQL=OFF
ninja -C build-riscv64
```

View File

@ -33,6 +33,15 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name
- `options` — MongoDB connection string options (optional parameter).
:::tip
If you are using the MongoDB Atlas cloud offering please add these options:
```
'connectTimeoutMS=10000&ssl=true&authSource=admin'
```
:::
## Usage Example {#usage-example}
Create a table in ClickHouse which allows to read data from MongoDB collection:

View File

@ -37,8 +37,8 @@ The [Merge](/docs/en/engines/table-engines/special/merge.md/#merge) engine does
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS|EPHEMERAL expr1] [TTL expr1] [CODEC(codec1)] [[NOT] NULL|PRIMARY KEY],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS|EPHEMERAL expr2] [TTL expr2] [CODEC(codec2)] [[NOT] NULL|PRIMARY KEY],
...
INDEX index_name1 expr1 TYPE type1(...) [GRANULARITY value1],
INDEX index_name2 expr2 TYPE type2(...) [GRANULARITY value2],
@ -439,41 +439,41 @@ Syntax: `ngrambf_v1(n, size_of_bloom_filter_in_bytes, number_of_hash_functions,
- `number_of_hash_functions` — The number of hash functions used in the Bloom filter.
- `random_seed` — The seed for Bloom filter hash functions.
Users can create [UDF](/docs/en/sql-reference/statements/create/function.md) to estimate the parameters set of `ngrambf_v1`. Query statements are as follows:
Users can create [UDF](/docs/en/sql-reference/statements/create/function.md) to estimate the parameters set of `ngrambf_v1`. Query statements are as follows:
```sql
CREATE FUNCTION bfEstimateFunctions [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, size_of_bloom_filter_in_bits) -> round((size_of_bloom_filter_in_bits / total_nubmer_of_all_grams) * log(2));
CREATE FUNCTION bfEstimateBmSize [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, probability_of_false_positives) -> ceil((total_nubmer_of_all_grams * log(probability_of_false_positives)) / log(1 / pow(2, log(2))));
CREATE FUNCTION bfEstimateFalsePositive [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, number_of_hash_functions, size_of_bloom_filter_in_bytes) -> pow(1 - exp(-number_of_hash_functions/ (size_of_bloom_filter_in_bytes / total_nubmer_of_all_grams)), number_of_hash_functions);
CREATE FUNCTION bfEstimateGramNumber [ON CLUSTER cluster]
AS
CREATE FUNCTION bfEstimateFunctions [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, size_of_bloom_filter_in_bits) -> round((size_of_bloom_filter_in_bits / total_nubmer_of_all_grams) * log(2));
CREATE FUNCTION bfEstimateBmSize [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, probability_of_false_positives) -> ceil((total_nubmer_of_all_grams * log(probability_of_false_positives)) / log(1 / pow(2, log(2))));
CREATE FUNCTION bfEstimateFalsePositive [ON CLUSTER cluster]
AS
(total_nubmer_of_all_grams, number_of_hash_functions, size_of_bloom_filter_in_bytes) -> pow(1 - exp(-number_of_hash_functions/ (size_of_bloom_filter_in_bytes / total_nubmer_of_all_grams)), number_of_hash_functions);
CREATE FUNCTION bfEstimateGramNumber [ON CLUSTER cluster]
AS
(number_of_hash_functions, probability_of_false_positives, size_of_bloom_filter_in_bytes) -> ceil(size_of_bloom_filter_in_bytes / (-number_of_hash_functions / log(1 - exp(log(probability_of_false_positives) / number_of_hash_functions))))
```
```
To use those functions,we need to specify two parameter at least.
For example, if there 4300 ngrams in the granule and we expect false positives to be less than 0.0001. The other parameters can be estimated by executing following queries:
For example, if there 4300 ngrams in the granule and we expect false positives to be less than 0.0001. The other parameters can be estimated by executing following queries:
```sql
--- estimate number of bits in the filter
SELECT bfEstimateBmSize(4300, 0.0001) / 8 as size_of_bloom_filter_in_bytes;
SELECT bfEstimateBmSize(4300, 0.0001) / 8 as size_of_bloom_filter_in_bytes;
┌─size_of_bloom_filter_in_bytes─┐
│ 10304 │
└───────────────────────────────┘
--- estimate number of hash functions
SELECT bfEstimateFunctions(4300, bfEstimateBmSize(4300, 0.0001)) as number_of_hash_functions
┌─number_of_hash_functions─┐
│ 13 │
└──────────────────────────┘
@ -991,7 +991,7 @@ use a local disk to cache data from a table stored at a URL. Neither the cache d
nor the web storage is configured in the ClickHouse configuration files; both are
configured in the CREATE/ATTACH query settings.
In the settings highlighted below notice that the disk of `type=web` is nested within
In the settings highlighted below notice that the disk of `type=web` is nested within
the disk of `type=cache`.
```sql
@ -1308,7 +1308,7 @@ configuration file.
In this sample configuration:
- the disk is of type `web`
- the data is hosted at `http://nginx:80/test1/`
- a cache on local storage is used
- a cache on local storage is used
```xml
<clickhouse>

View File

@ -17,7 +17,8 @@ Default value: 0.
**Example**
``` sql
insert into table_1 values (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
INSERT INTO table_1 VALUES (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
SELECT * FROM table_1;
```
```response
┌─x─┬─y────┐
@ -30,7 +31,7 @@ insert into table_1 values (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
```sql
SELECT *
FROM table_1
SETTINGS additional_table_filters = (('table_1', 'x != 2'))
SETTINGS additional_table_filters = {'table_1': 'x != 2'}
```
```response
┌─x─┬─y────┐
@ -50,7 +51,8 @@ Default value: `''`.
**Example**
``` sql
insert into table_1 values (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
INSERT INTO table_1 VALUES (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
SElECT * FROM table_1;
```
```response
┌─x─┬─y────┐
@ -3535,7 +3537,7 @@ Possible values:
- Any positive integer.
- 0 - Disabled (infinite timeout).
Default value: 180.
Default value: 30.
## http_receive_timeout {#http_receive_timeout}
@ -3546,7 +3548,7 @@ Possible values:
- Any positive integer.
- 0 - Disabled (infinite timeout).
Default value: 180.
Default value: 30.
## check_query_single_value_result {#check_query_single_value_result}

View File

@ -0,0 +1,45 @@
---
slug: /en/operations/system-tables/jemalloc_bins
---
# jemalloc_bins
Contains information about memory allocations done via jemalloc allocator in different size classes (bins) aggregated from all arenas.
These statistics might not be absolutely accurate because of thread local caching in jemalloc.
Columns:
- `index` (UInt64) — Index of the bin ordered by size
- `large` (Bool) — True for large allocations and False for small
- `size` (UInt64) — Size of allocations in this bin
- `allocations` (UInt64) — Number of allocations
- `deallocations` (UInt64) — Number of deallocations
**Example**
Find the sizes of allocations that contributed the most to the current overall memory usage.
``` sql
SELECT
*,
allocations - deallocations AS active_allocations,
size * active_allocations AS allocated_bytes
FROM system.jemalloc_bins
WHERE allocated_bytes > 0
ORDER BY allocated_bytes DESC
LIMIT 10
```
``` text
┌─index─┬─large─┬─────size─┬─allocactions─┬─deallocations─┬─active_allocations─┬─allocated_bytes─┐
│ 82 │ 1 │ 50331648 │ 1 │ 0 │ 1 │ 50331648 │
│ 10 │ 0 │ 192 │ 512336 │ 370710 │ 141626 │ 27192192 │
│ 69 │ 1 │ 5242880 │ 6 │ 2 │ 4 │ 20971520 │
│ 3 │ 0 │ 48 │ 16938224 │ 16559484 │ 378740 │ 18179520 │
│ 28 │ 0 │ 4096 │ 122924 │ 119142 │ 3782 │ 15491072 │
│ 61 │ 1 │ 1310720 │ 44569 │ 44558 │ 11 │ 14417920 │
│ 39 │ 1 │ 28672 │ 1285 │ 913 │ 372 │ 10665984 │
│ 4 │ 0 │ 64 │ 2837225 │ 2680568 │ 156657 │ 10026048 │
│ 6 │ 0 │ 96 │ 2617803 │ 2531435 │ 86368 │ 8291328 │
│ 36 │ 1 │ 16384 │ 22431 │ 21970 │ 461 │ 7553024 │
└───────┴───────┴──────────┴──────────────┴───────────────┴────────────────────┴─────────────────┘
```

View File

@ -171,12 +171,13 @@ Result:
└──────────────────────────────┘
```
Executable user defined functions can take constant parameters configured in `command` setting (works only for user defined functions with `executable` type).
Executable user defined functions can take constant parameters configured in `command` setting (works only for user defined functions with `executable` type). It also requires the `execute_direct` option (to ensure no shell argument expansion vulnerability).
File `test_function_parameter_python.xml` (`/etc/clickhouse-server/test_function_parameter_python.xml` with default path settings).
```xml
<functions>
<function>
<type>executable</type>
<execute_direct>true</execute_direct>
<name>test_function_parameter_python</name>
<return_type>String</return_type>
<argument>

View File

@ -30,6 +30,14 @@ mongodb(host:port, database, collection, user, password, structure [, options])
- `options` - MongoDB connection string options (optional parameter).
:::tip
If you are using the MongoDB Atlas cloud offering please add these options:
```
'connectTimeoutMS=10000&ssl=true&authSource=admin'
```
:::
**Returned Value**

View File

@ -3,13 +3,6 @@ slug: /zh/development/build
---
# 如何构建 ClickHouse 发布包 {#ru-he-gou-jian-clickhouse-fa-bu-bao}
## 安装 Git 和 Pbuilder {#an-zhuang-git-he-pbuilder}
``` bash
sudo apt-get update
sudo apt-get install git pbuilder debhelper lsb-release fakeroot sudo debian-archive-keyring debian-keyring
```
## 拉取 ClickHouse 源码 {#la-qu-clickhouse-yuan-ma}
``` bash

View File

@ -485,7 +485,7 @@ try
LOG_INFO(log, "Closed all listening sockets.");
if (current_connections > 0)
current_connections = waitServersToFinish(*servers, config().getInt("shutdown_wait_unfinished", 5));
current_connections = waitServersToFinish(*servers, servers_lock, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections)
LOG_INFO(log, "Closed connections to Keeper. But {} remain. Probably some users cannot finish their connections after context shutdown.", current_connections);

View File

@ -75,6 +75,15 @@ namespace ErrorCodes
extern const int FILE_ALREADY_EXISTS;
}
void applySettingsOverridesForLocal(ContextMutablePtr context)
{
Settings settings = context->getSettings();
settings.allow_introspection_functions = true;
settings.storage_file_read_method = LocalFSReadMethod::mmap;
context->setSettings(settings);
}
void LocalServer::processError(const String &) const
{
@ -668,6 +677,12 @@ void LocalServer::processConfig()
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size, compiled_expression_cache_elements_size);
#endif
/// NOTE: it is important to apply any overrides before
/// setDefaultProfiles() calls since it will copy current context (i.e.
/// there is separate context for Buffer tables).
applySettingsOverridesForLocal(global_context);
applyCmdOptions(global_context);
/// Load global settings from default_profile and system_profile.
global_context->setDefaultProfiles(config());
@ -682,7 +697,6 @@ void LocalServer::processConfig()
std::string default_database = config().getString("default_database", "_local");
DatabaseCatalog::instance().attachDatabase(default_database, createClickHouseLocalDatabaseOverlay(default_database, global_context));
global_context->setCurrentDatabase(default_database);
applyCmdOptions(global_context);
if (config().has("path"))
{

View File

@ -1146,7 +1146,16 @@ try
size_t merges_mutations_memory_usage_soft_limit = server_settings_.merges_mutations_memory_usage_soft_limit;
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(memory_amount * server_settings_.merges_mutations_memory_usage_to_ram_ratio);
if (merges_mutations_memory_usage_soft_limit == 0 || merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage)
if (merges_mutations_memory_usage_soft_limit == 0)
{
merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage;
LOG_INFO(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}"
" ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit),
formatReadableSizeWithBinarySuffix(memory_amount),
server_settings_.merges_mutations_memory_usage_to_ram_ratio);
}
else if (merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage)
{
merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage;
LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}"
@ -1523,7 +1532,7 @@ try
LOG_INFO(log, "Closed all listening sockets.");
if (current_connections > 0)
current_connections = waitServersToFinish(servers_to_start_before_tables, config().getInt("shutdown_wait_unfinished", 5));
current_connections = waitServersToFinish(servers_to_start_before_tables, servers_lock, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections)
LOG_INFO(log, "Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections after context shutdown.", current_connections);
@ -1618,7 +1627,7 @@ try
/// Init trace collector only after trace_log system table was created
/// Disable it if we collect test coverage information, because it will work extremely slow.
#if USE_UNWIND && !WITH_COVERAGE
#if !WITH_COVERAGE
/// Profilers cannot work reliably with any other libunwind or without PHDR cache.
if (hasPHDRCache())
{
@ -1641,10 +1650,6 @@ try
/// Describe multiple reasons when query profiler cannot work.
#if !USE_UNWIND
LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they cannot work without bundled unwind (stack unwinding) library.");
#endif
#if WITH_COVERAGE
LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they work extremely slow with test coverage.");
#endif
@ -1828,7 +1833,7 @@ try
global_context->getProcessList().killAllQueries();
if (current_connections)
current_connections = waitServersToFinish(servers, config().getInt("shutdown_wait_unfinished", 5));
current_connections = waitServersToFinish(servers, servers_lock, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections)
LOG_WARNING(log, "Closed connections. But {} remain."

View File

@ -158,7 +158,6 @@ enum class AccessType
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_SYMBOLS, "RELOAD SYMBOLS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_DICTIONARY, "SYSTEM RELOAD DICTIONARIES, RELOAD DICTIONARY, RELOAD DICTIONARIES", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_MODEL, "SYSTEM RELOAD MODELS, RELOAD MODEL, RELOAD MODELS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_FUNCTION, "SYSTEM RELOAD FUNCTIONS, RELOAD FUNCTION, RELOAD FUNCTIONS", GLOBAL, SYSTEM_RELOAD) \

View File

@ -51,7 +51,8 @@ private:
T value = T{};
public:
static constexpr bool is_nullable = false;
static constexpr bool result_is_nullable = false;
static constexpr bool should_skip_null_arguments = true;
static constexpr bool is_any = false;
bool has() const
@ -501,7 +502,8 @@ private:
char small_data[MAX_SMALL_STRING_SIZE]; /// Including the terminating zero.
public:
static constexpr bool is_nullable = false;
static constexpr bool result_is_nullable = false;
static constexpr bool should_skip_null_arguments = true;
static constexpr bool is_any = false;
bool has() const
@ -769,7 +771,7 @@ static_assert(
/// For any other value types.
template <bool IS_NULLABLE = false>
template <bool RESULT_IS_NULLABLE = false>
struct SingleValueDataGeneric
{
private:
@ -779,12 +781,13 @@ private:
bool has_value = false;
public:
static constexpr bool is_nullable = IS_NULLABLE;
static constexpr bool result_is_nullable = RESULT_IS_NULLABLE;
static constexpr bool should_skip_null_arguments = !RESULT_IS_NULLABLE;
static constexpr bool is_any = false;
bool has() const
{
if constexpr (is_nullable)
if constexpr (result_is_nullable)
return has_value;
return !value.isNull();
}
@ -820,14 +823,14 @@ public:
void change(const IColumn & column, size_t row_num, Arena *)
{
column.get(row_num, value);
if constexpr (is_nullable)
if constexpr (result_is_nullable)
has_value = true;
}
void change(const Self & to, Arena *)
{
value = to.value;
if constexpr (is_nullable)
if constexpr (result_is_nullable)
has_value = true;
}
@ -844,7 +847,7 @@ public:
bool changeFirstTime(const Self & to, Arena * arena)
{
if (!has() && (is_nullable || to.has()))
if (!has() && (result_is_nullable || to.has()))
{
change(to, arena);
return true;
@ -879,7 +882,7 @@ public:
}
else
{
if constexpr (is_nullable)
if constexpr (result_is_nullable)
{
Field new_value;
column.get(row_num, new_value);
@ -910,7 +913,7 @@ public:
{
if (!to.has())
return false;
if constexpr (is_nullable)
if constexpr (result_is_nullable)
{
if (!has())
{
@ -945,7 +948,7 @@ public:
}
else
{
if constexpr (is_nullable)
if constexpr (result_is_nullable)
{
Field new_value;
column.get(row_num, new_value);
@ -975,7 +978,7 @@ public:
{
if (!to.has())
return false;
if constexpr (is_nullable)
if constexpr (result_is_nullable)
{
if (!value.isNull() && (to.value.isNull() || value < to.value))
{
@ -1138,13 +1141,20 @@ struct AggregateFunctionAnyLastData : Data
#endif
};
/** The aggregate function 'singleValueOrNull' is used to implement subquery operators,
* such as x = ALL (SELECT ...)
* It checks if there is only one unique non-NULL value in the data.
* If there is only one unique value - returns it.
* If there are zero or at least two distinct values - returns NULL.
*/
template <typename Data>
struct AggregateFunctionSingleValueOrNullData : Data
{
static constexpr bool is_nullable = true;
using Self = AggregateFunctionSingleValueOrNullData;
static constexpr bool result_is_nullable = true;
bool first_value = true;
bool is_null = false;
@ -1166,7 +1176,7 @@ struct AggregateFunctionSingleValueOrNullData : Data
if (!to.has())
return;
if (first_value)
if (first_value && !to.first_value)
{
first_value = false;
this->change(to, arena);
@ -1311,7 +1321,7 @@ public:
static DataTypePtr createResultType(const DataTypePtr & type_)
{
if constexpr (Data::is_nullable)
if constexpr (Data::result_is_nullable)
return makeNullable(type_);
return type_;
}
@ -1431,13 +1441,13 @@ public:
}
AggregateFunctionPtr getOwnNullAdapter(
const AggregateFunctionPtr & nested_function,
const AggregateFunctionPtr & original_function,
const DataTypes & /*arguments*/,
const Array & /*params*/,
const AggregateFunctionProperties & /*properties*/) const override
{
if (Data::is_nullable)
return nested_function;
if (Data::result_is_nullable && !Data::should_skip_null_arguments)
return original_function;
return nullptr;
}

View File

@ -116,7 +116,6 @@ namespace ErrorCodes
extern const int UNKNOWN_TABLE;
extern const int ILLEGAL_COLUMN;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int FUNCTION_CANNOT_HAVE_PARAMETERS;
}
/** Query analyzer implementation overview. Please check documentation in QueryAnalysisPass.h first.
@ -4897,11 +4896,6 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
lambda_expression_untyped->formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
if (!parameters.empty())
{
throw Exception(ErrorCodes::FUNCTION_CANNOT_HAVE_PARAMETERS, "Function {} is not parametric", function_node.formatASTForErrorMessage());
}
auto lambda_expression_clone = lambda_expression_untyped->clone();
IdentifierResolveScope lambda_scope(lambda_expression_clone, &scope /*parent_scope*/);
@ -5018,12 +5012,9 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
}
FunctionOverloadResolverPtr function = UserDefinedExecutableFunctionFactory::instance().tryGet(function_name, scope.context, parameters);
bool is_executable_udf = false;
if (!function)
function = FunctionFactory::instance().tryGet(function_name, scope.context);
else
is_executable_udf = true;
if (!function)
{
@ -5074,12 +5065,6 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
return result_projection_names;
}
/// Executable UDFs may have parameters. They are checked in UserDefinedExecutableFunctionFactory.
if (!parameters.empty() && !is_executable_udf)
{
throw Exception(ErrorCodes::FUNCTION_CANNOT_HAVE_PARAMETERS, "Function {} is not parametric", function_name);
}
/** For lambda arguments we need to initialize lambda argument types DataTypeFunction using `getLambdaArgumentTypes` function.
* Then each lambda arguments are initialized with columns, where column source is lambda.
* This information is important for later steps of query processing.

View File

@ -253,6 +253,7 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
{
return std::make_unique<WriteBufferFromS3>(
client,
client, // already has long timeout
s3_uri.bucket,
fs::path(s3_uri.key) / file_name,
DBMS_DEFAULT_BUFFER_SIZE,

View File

@ -206,11 +206,10 @@ add_library (clickhouse_new_delete STATIC Common/new_delete.cpp)
target_link_libraries (clickhouse_new_delete PRIVATE clickhouse_common_io)
if (TARGET ch_contrib::jemalloc)
target_link_libraries (clickhouse_new_delete PRIVATE ch_contrib::jemalloc)
target_link_libraries (clickhouse_common_io PRIVATE ch_contrib::jemalloc)
target_link_libraries (clickhouse_storages_system PRIVATE ch_contrib::jemalloc)
endif()
if (TARGET ch_contrib::jemalloc)
target_link_libraries (clickhouse_common_io PRIVATE ch_contrib::jemalloc)
endif()
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::sparsehash)
add_subdirectory(Access/Common)

View File

@ -575,9 +575,11 @@ try
}
auto flags = O_WRONLY | O_EXCL;
if (query_with_output->is_outfile_append)
auto file_exists = fs::exists(out_file);
if (file_exists && query_with_output->is_outfile_append)
flags |= O_APPEND;
else if (query_with_output->is_outfile_truncate)
else if (file_exists && query_with_output->is_outfile_truncate)
flags |= O_TRUNC;
else
flags |= O_CREAT;

View File

@ -8,7 +8,7 @@
* See also: https://gcc.gnu.org/legacy-ml/gcc-help/2017-12/msg00021.html
*/
#ifdef NDEBUG
__attribute__((__weak__)) extern const size_t MMAP_THRESHOLD = 64 * (1ULL << 20);
__attribute__((__weak__)) extern const size_t MMAP_THRESHOLD = 128 * (1ULL << 20);
#else
/**
* In debug build, use small mmap threshold to reproduce more memory

View File

@ -0,0 +1,42 @@
#pragma once
#include <Common/CurrentThread.h>
#include <Common/MemoryTracker.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
struct MemoryTrackerSwitcher
{
explicit MemoryTrackerSwitcher(MemoryTracker * new_tracker)
{
if (!current_thread)
throw Exception(ErrorCodes::LOGICAL_ERROR, "current_thread is not initialized");
auto * thread_tracker = CurrentThread::getMemoryTracker();
prev_untracked_memory = current_thread->untracked_memory;
prev_memory_tracker_parent = thread_tracker->getParent();
current_thread->untracked_memory = 0;
thread_tracker->setParent(new_tracker);
}
~MemoryTrackerSwitcher()
{
CurrentThread::flushUntrackedMemory();
auto * thread_tracker = CurrentThread::getMemoryTracker();
current_thread->untracked_memory = prev_untracked_memory;
thread_tracker->setParent(prev_memory_tracker_parent);
}
MemoryTracker * prev_memory_tracker_parent = nullptr;
Int64 prev_untracked_memory = 0;
};
}

View File

@ -1,9 +1,11 @@
#pragma once
#include <mutex>
#include <condition_variable>
#include <Poco/Timespan.h>
#include <mutex>
#include <type_traits>
#include <variant>
#include <boost/noncopyable.hpp>
#include <Poco/Timespan.h>
#include <Common/logger_useful.h>
#include <Common/Exception.h>
@ -15,14 +17,6 @@ namespace ProfileEvents
extern const Event ConnectionPoolIsFullMicroseconds;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
/** A class from which you can inherit and get a pool of something. Used for database connection pools.
* Descendant class must provide a method for creating a new object to place in the pool.
*/
@ -35,6 +29,22 @@ public:
using ObjectPtr = std::shared_ptr<Object>;
using Ptr = std::shared_ptr<PoolBase<TObject>>;
enum class BehaviourOnLimit
{
/**
* Default behaviour - when limit on pool size is reached, callers will wait until object will be returned back in pool.
*/
Wait,
/**
* If no free objects in pool - allocate a new object, but not store it in pool.
* This behaviour is needed when we simply don't want to waste time waiting or if we cannot guarantee that query could be processed using fixed amount of connections.
* For example, when we read from table on s3, one GetObject request corresponds to the whole FileSystemCache segment. This segments are shared between different
* reading tasks, so in general case connection could be taken from pool by one task and returned back by another one. And these tasks are processed completely independently.
*/
AllocateNewBypassingPool,
};
private:
/** The object with the flag, whether it is currently used. */
@ -89,37 +99,53 @@ public:
Object & operator*() && = delete;
const Object & operator*() const && = delete;
Object * operator->() & { return &*data->data.object; }
const Object * operator->() const & { return &*data->data.object; }
Object & operator*() & { return *data->data.object; }
const Object & operator*() const & { return *data->data.object; }
Object * operator->() & { return castToObjectPtr(); }
const Object * operator->() const & { return castToObjectPtr(); }
Object & operator*() & { return *castToObjectPtr(); }
const Object & operator*() const & { return *castToObjectPtr(); }
/**
* Expire an object to make it reallocated later.
*/
void expire()
{
data->data.is_expired = true;
if (data.index() == 1)
std::get<1>(data)->data.is_expired = true;
}
bool isNull() const { return data == nullptr; }
PoolBase * getPool() const
{
if (!data)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Attempt to get pool from uninitialized entry");
return &data->data.pool;
}
bool isNull() const { return data.index() == 0 ? !std::get<0>(data) : !std::get<1>(data); }
private:
std::shared_ptr<PoolEntryHelper> data;
/**
* Plain object will be stored instead of PoolEntryHelper if fallback was made in get() (see BehaviourOnLimit::AllocateNewBypassingPool).
*/
std::variant<ObjectPtr, std::shared_ptr<PoolEntryHelper>> data;
explicit Entry(PooledObject & object) : data(std::make_shared<PoolEntryHelper>(object)) {}
explicit Entry(ObjectPtr && object) : data(std::move(object)) { }
explicit Entry(PooledObject & object) : data(std::make_shared<PoolEntryHelper>(object)) { }
auto castToObjectPtr() const
{
return std::visit(
[](const auto & ptr)
{
using T = std::decay_t<decltype(ptr)>;
if constexpr (std::is_same_v<ObjectPtr, T>)
return ptr.get();
else
return ptr->data.object.get();
},
data);
}
};
virtual ~PoolBase() = default;
/** Allocates the object. Wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite. */
/** Allocates the object.
* If 'behaviour_on_limit' is Wait - wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite.
* If 'behaviour_on_limit' is AllocateNewBypassingPool and there is no free object - a new object will be created but not stored in the pool.
*/
Entry get(Poco::Timespan::TimeDiff timeout)
{
std::unique_lock lock(mutex);
@ -150,6 +176,9 @@ public:
return Entry(*items.back());
}
if (behaviour_on_limit == BehaviourOnLimit::AllocateNewBypassingPool)
return Entry(allocObject());
Stopwatch blocked;
if (timeout < 0)
{
@ -184,6 +213,8 @@ private:
/** The maximum size of the pool. */
unsigned max_items;
BehaviourOnLimit behaviour_on_limit;
/** Pool. */
Objects items;
@ -192,11 +223,10 @@ private:
std::condition_variable available;
protected:
Poco::Logger * log;
PoolBase(unsigned max_items_, Poco::Logger * log_)
: max_items(max_items_), log(log_)
PoolBase(unsigned max_items_, Poco::Logger * log_, BehaviourOnLimit behaviour_on_limit_ = BehaviourOnLimit::Wait)
: max_items(max_items_), behaviour_on_limit(behaviour_on_limit_), log(log_)
{
items.reserve(max_items);
}

View File

@ -368,6 +368,10 @@ The server successfully detected this situation and will download merged part fr
M(ReadBufferFromS3InitMicroseconds, "Time spent initializing connection to S3.") \
M(ReadBufferFromS3Bytes, "Bytes read from S3.") \
M(ReadBufferFromS3RequestsErrors, "Number of exceptions while reading from S3.") \
M(ReadBufferFromS3ResetSessions, "Number of HTTP sessions that were reset in ReadBufferFromS3.") \
M(ReadBufferFromS3PreservedSessions, "Number of HTTP sessions that were preserved in ReadBufferFromS3.") \
\
M(ReadWriteBufferFromHTTPPreservedSessions, "Number of HTTP sessions that were preserved in ReadWriteBufferFromHTTP.") \
\
M(WriteBufferFromS3Microseconds, "Time spent on writing to S3.") \
M(WriteBufferFromS3Bytes, "Bytes written to S3.") \

View File

@ -91,7 +91,7 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
#if USE_UNWIND
#ifndef __APPLE__
Timer::Timer()
: log(&Poco::Logger::get("Timer"))
{}
@ -120,6 +120,15 @@ void Timer::createIfNecessary(UInt64 thread_id, int clock_type, int pause_signal
throw Exception(ErrorCodes::CANNOT_CREATE_TIMER, "Failed to create thread timer. The function "
"'timer_create' returned non-zero but didn't set errno. This is bug in your OS.");
/// For example, it cannot be created if the server is run under QEMU:
/// "Failed to create thread timer, errno: 11, strerror: Resource temporarily unavailable."
/// You could accidentally run the server under QEMU without being aware,
/// if you use Docker image for a different architecture,
/// and you have the "binfmt-misc" kernel module, and "qemu-user" tools.
/// Also, it cannot be created if the server has too many threads.
throwFromErrno("Failed to create thread timer", ErrorCodes::CANNOT_CREATE_TIMER);
}
timer_id.emplace(local_timer_id);
@ -200,13 +209,13 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(UInt64 thread_id, int clock_t
UNUSED(pause_signal);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "QueryProfiler disabled because they cannot work under sanitizers");
#elif !USE_UNWIND
#elif defined(__APPLE__)
UNUSED(thread_id);
UNUSED(clock_type);
UNUSED(period);
UNUSED(pause_signal);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "QueryProfiler cannot work with stock libunwind");
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "QueryProfiler cannot work on OSX");
#else
/// Sanity check.
if (!hasPHDRCache())
@ -255,7 +264,7 @@ QueryProfilerBase<ProfilerImpl>::~QueryProfilerBase()
template <typename ProfilerImpl>
void QueryProfilerBase<ProfilerImpl>::cleanup()
{
#if USE_UNWIND
#ifndef __APPLE__
timer.stop();
signal_handler_disarmed = true;
#endif

View File

@ -28,7 +28,7 @@ namespace DB
* Note that signal handler implementation is defined by template parameter. See QueryProfilerReal and QueryProfilerCPU.
*/
#if USE_UNWIND
#ifndef __APPLE__
class Timer
{
public:
@ -60,7 +60,7 @@ private:
Poco::Logger * log;
#if USE_UNWIND
#ifndef __APPLE__
inline static thread_local Timer timer = Timer();
#endif

View File

@ -20,13 +20,10 @@
#include <sstream>
#include <unordered_map>
#include <fmt/format.h>
#include <libunwind.h>
#include "config.h"
#if USE_UNWIND
# include <libunwind.h>
#endif
namespace
{
/// Currently this variable is set up once on server startup.
@ -287,12 +284,8 @@ StackTrace::StackTrace(const ucontext_t & signal_context)
void StackTrace::tryCapture()
{
#if USE_UNWIND
size = unw_backtrace(frame_pointers.data(), capacity);
__msan_unpoison(frame_pointers.data(), size * sizeof(frame_pointers[0]));
#else
size = 0;
#endif
}
/// ClickHouse uses bundled libc++ so type names will be the same on every system thus it's safe to hardcode them

View File

@ -793,88 +793,6 @@ public:
}
};
// Searches for needle surrounded by token-separators.
// Separators are anything inside ASCII (0-128) and not alphanum.
// Any value outside of basic ASCII (>=128) is considered a non-separator symbol, hence UTF-8 strings
// should work just fine. But any Unicode whitespace is not considered a token separtor.
template <typename StringSearcher>
class TokenSearcher : public StringSearcherBase
{
StringSearcher searcher;
size_t needle_size;
public:
template <typename CharT>
requires (sizeof(CharT) == 1)
static bool isValidNeedle(const CharT * needle_, size_t needle_size_)
{
return std::none_of(needle_, needle_ + needle_size_, isTokenSeparator);
}
template <typename CharT>
requires (sizeof(CharT) == 1)
TokenSearcher(const CharT * needle_, size_t needle_size_)
: searcher(needle_, needle_size_)
, needle_size(needle_size_)
{
/// The caller is responsible for calling isValidNeedle()
chassert(isValidNeedle(needle_, needle_size_));
}
template <typename CharT>
requires (sizeof(CharT) == 1)
ALWAYS_INLINE bool compare(const CharT * haystack, const CharT * haystack_end, const CharT * pos) const
{
// use searcher only if pos is in the beginning of token and pos + searcher.needle_size is end of token.
if (isToken(haystack, haystack_end, pos))
return searcher.compare(haystack, haystack_end, pos);
return false;
}
template <typename CharT>
requires (sizeof(CharT) == 1)
const CharT * search(const CharT * haystack, const CharT * const haystack_end) const
{
// use searcher.search(), then verify that returned value is a token
// if it is not, skip it and re-run
const auto * pos = haystack;
while (pos < haystack_end)
{
pos = searcher.search(pos, haystack_end);
if (pos == haystack_end || isToken(haystack, haystack_end, pos))
return pos;
// assuming that heendle does not contain any token separators.
pos += needle_size;
}
return haystack_end;
}
template <typename CharT>
requires (sizeof(CharT) == 1)
const CharT * search(const CharT * haystack, size_t haystack_size) const
{
return search(haystack, haystack + haystack_size);
}
template <typename CharT>
requires (sizeof(CharT) == 1)
ALWAYS_INLINE bool isToken(const CharT * haystack, const CharT * const haystack_end, const CharT* p) const
{
return (p == haystack || isTokenSeparator(*(p - 1)))
&& (p + needle_size >= haystack_end || isTokenSeparator(*(p + needle_size)));
}
ALWAYS_INLINE static bool isTokenSeparator(const uint8_t c)
{
return !(isAlphaNumericASCII(c) || !isASCII(c));
}
};
}
using ASCIICaseSensitiveStringSearcher = impl::StringSearcher<true, true>;
@ -882,9 +800,6 @@ using ASCIICaseInsensitiveStringSearcher = impl::StringSearcher<false, true>;
using UTF8CaseSensitiveStringSearcher = impl::StringSearcher<true, false>;
using UTF8CaseInsensitiveStringSearcher = impl::StringSearcher<false, false>;
using ASCIICaseSensitiveTokenSearcher = impl::TokenSearcher<ASCIICaseSensitiveStringSearcher>;
using ASCIICaseInsensitiveTokenSearcher = impl::TokenSearcher<ASCIICaseInsensitiveStringSearcher>;
/// Use only with short haystacks where cheap initialization is required.
template <bool CaseInsensitive>
struct StdLibASCIIStringSearcher
@ -906,11 +821,11 @@ struct StdLibASCIIStringSearcher
if constexpr (CaseInsensitive)
return std::search(
haystack_start, haystack_end, needle_start, needle_end,
[](char c1, char c2) {return std::toupper(c1) == std::toupper(c2);});
[](char c1, char c2) { return std::toupper(c1) == std::toupper(c2); });
else
return std::search(
haystack_start, haystack_end, needle_start, needle_end,
[](char c1, char c2) {return c1 == c2;});
[](char c1, char c2) { return c1 == c2; });
}
template <typename CharT>

View File

@ -9,7 +9,6 @@
#include <link.h>
//#include <iostream>
#include <filesystem>
#include <base/sort.h>
@ -561,13 +560,6 @@ MultiVersion<SymbolIndex>::Version SymbolIndex::instance()
return instanceImpl().get();
}
void SymbolIndex::reload()
{
instanceImpl().set(std::unique_ptr<SymbolIndex>(new SymbolIndex));
/// Also drop stacktrace cache.
StackTrace::dropCache();
}
}
#endif

View File

@ -24,7 +24,6 @@ protected:
public:
static MultiVersion<SymbolIndex>::Version instance();
static void reload();
struct Symbol
{

View File

@ -199,13 +199,14 @@ ThreadStatus::~ThreadStatus()
if (deleter)
deleter();
chassert(!check_current_thread_on_destruction || current_thread == this);
/// Only change current_thread if it's currently being used by this ThreadStatus
/// For example, PushingToViews chain creates and deletes ThreadStatus instances while running in the main query thread
if (check_current_thread_on_destruction)
{
assert(current_thread == this);
if (current_thread == this)
current_thread = nullptr;
}
else if (check_current_thread_on_destruction)
LOG_ERROR(log, "current_thread contains invalid address");
}
void ThreadStatus::updatePerformanceCounters()

View File

@ -730,9 +730,6 @@ using VolnitskyUTF8 = VolnitskyBase<true, false, UTF8CaseSensitiveStringSearcher
using VolnitskyCaseInsensitive = VolnitskyBase<false, true, ASCIICaseInsensitiveStringSearcher>; /// ignores non-ASCII bytes
using VolnitskyCaseInsensitiveUTF8 = VolnitskyBase<false, false, UTF8CaseInsensitiveStringSearcher>;
using VolnitskyCaseSensitiveToken = VolnitskyBase<true, true, ASCIICaseSensitiveTokenSearcher>;
using VolnitskyCaseInsensitiveToken = VolnitskyBase<false, true, ASCIICaseInsensitiveTokenSearcher>;
using MultiVolnitsky = MultiVolnitskyBase<true, true, ASCIICaseSensitiveStringSearcher>;
using MultiVolnitskyUTF8 = MultiVolnitskyBase<true, false, UTF8CaseSensitiveStringSearcher>;
using MultiVolnitskyCaseInsensitive = MultiVolnitskyBase<false, true, ASCIICaseInsensitiveStringSearcher>;

View File

@ -9,7 +9,6 @@
#cmakedefine01 USE_AWS_S3
#cmakedefine01 USE_AZURE_BLOB_STORAGE
#cmakedefine01 USE_BROTLI
#cmakedefine01 USE_UNWIND
#cmakedefine01 USE_CASSANDRA
#cmakedefine01 USE_SENTRY
#cmakedefine01 USE_GRPC

View File

@ -145,14 +145,14 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
const auto create_writer = [&](const auto & key)
{
return WriteBufferFromS3
{
return WriteBufferFromS3(
s3_client->client,
s3_client->client,
s3_client->uri.bucket,
key,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings_1
};
);
};
LOG_INFO(log, "Will try to upload snapshot on {} to S3", snapshot_file_info.path);

View File

@ -41,7 +41,7 @@
/// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 180
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 30
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
/// Maximum number of http-connections between two endpoints
/// the number is unmotivated

View File

@ -102,6 +102,7 @@ class IColumn;
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
M(UInt64, s3_retry_attempts, 10, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
M(UInt64, s3_request_timeout_ms, 3000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
@ -659,7 +660,7 @@ class IColumn;
M(UInt64, function_range_max_elements_in_block, 500000000, "Maximum number of values generated by function 'range' per block of data (sum of array sizes for every row in a block, see also 'max_block_size' and 'min_insert_block_size_rows'). It is a safety threshold.", 0) \
M(ShortCircuitFunctionEvaluation, short_circuit_function_evaluation, ShortCircuitFunctionEvaluation::ENABLE, "Setting for short-circuit function evaluation configuration. Possible values: 'enable' - use short-circuit function evaluation for functions that are suitable for it, 'disable' - disable short-circuit function evaluation, 'force_enable' - use short-circuit function evaluation for all functions.", 0) \
\
M(LocalFSReadMethod, storage_file_read_method, LocalFSReadMethod::mmap, "Method of reading data from storage file, one of: read, pread, mmap. The mmap method does not apply to clickhouse-server (it's intended for clickhouse-local).", 0) \
M(LocalFSReadMethod, storage_file_read_method, LocalFSReadMethod::pread, "Method of reading data from storage file, one of: read, pread, mmap. The mmap method does not apply to clickhouse-server (it's intended for clickhouse-local).", 0) \
M(String, local_filesystem_read_method, "pread_threadpool", "Method of reading data from local filesystem, one of: read, pread, mmap, io_uring, pread_threadpool. The 'io_uring' method is experimental and does not work for Log, TinyLog, StripeLog, File, Set and Join, and other tables with append-able files in presence of concurrent reads and writes.", 0) \
M(String, remote_filesystem_read_method, "threadpool", "Method of reading data from remote filesystem, one of: read, threadpool.", 0) \
M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \

View File

@ -80,6 +80,8 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"23.6", {{"http_send_timeout", 180, 30, "3 minutes seems crazy long. Note that this is timeout for a single network write call, not for the whole upload operation."},
{"http_receive_timeout", 180, 30, "See http_send_timeout."}}},
{"23.5", {{"input_format_parquet_preserve_order", true, false, "Allow Parquet reader to reorder rows for better parallelism."},
{"parallelize_output_from_storages", false, true, "Allow parallelism when executing queries that read from file/url/s3/etc. This may reorder rows."},
{"use_with_fill_by_sorting_prefix", false, true, "Columns preceding WITH FILL columns in ORDER BY clause form sorting prefix. Rows with different values in sorting prefix are filled independently"},

View File

@ -154,7 +154,7 @@ static void signalHandler(int sig, siginfo_t * info, void * context)
writePODBinary(*info, out);
writePODBinary(signal_context, out);
writePODBinary(stack_trace, out);
writeVectorBinary(Exception::thread_frame_pointers, out);
writeVectorBinary(Exception::enable_job_stack_trace ? Exception::thread_frame_pointers : std::vector<StackTrace::FramePointers>{}, out);
writeBinary(static_cast<UInt32>(getThreadId()), out);
writePODBinary(current_thread, out);
@ -310,6 +310,57 @@ private:
{
ThreadStatus thread_status;
/// First log those fields that are safe to access and that should not cause new fault.
/// That way we will have some duplicated info in the log but we don't loose important info
/// in case of double fault.
LOG_FATAL(log, "########## Short fault info ############");
LOG_FATAL(log, "(version {}{}, build id: {}, git hash: {}) (from thread {}) Received signal {}",
VERSION_STRING, VERSION_OFFICIAL, daemon.build_id, daemon.git_hash,
thread_num, sig);
std::string signal_description = "Unknown signal";
/// Some of these are not really signals, but our own indications on failure reason.
if (sig == StdTerminate)
signal_description = "std::terminate";
else if (sig == SanitizerTrap)
signal_description = "sanitizer trap";
else if (sig >= 0)
signal_description = strsignal(sig); // NOLINT(concurrency-mt-unsafe) // it is not thread-safe but ok in this context
LOG_FATAL(log, "Signal description: {}", signal_description);
String error_message;
if (sig != SanitizerTrap)
error_message = signalToErrorMessage(sig, info, *context);
else
error_message = "Sanitizer trap.";
LOG_FATAL(log, fmt::runtime(error_message));
String bare_stacktrace_str;
if (stack_trace.getSize())
{
/// Write bare stack trace (addresses) just in case if we will fail to print symbolized stack trace.
/// NOTE: This still require memory allocations and mutex lock inside logger.
/// BTW we can also print it to stderr using write syscalls.
WriteBufferFromOwnString bare_stacktrace;
writeString("Stack trace:", bare_stacktrace);
for (size_t i = stack_trace.getOffset(); i < stack_trace.getSize(); ++i)
{
writeChar(' ', bare_stacktrace);
writePointerHex(stack_trace.getFramePointers()[i], bare_stacktrace);
}
LOG_FATAL(log, fmt::runtime(bare_stacktrace.str()));
bare_stacktrace_str = bare_stacktrace.str();
}
/// Now try to access potentially unsafe data in thread_ptr.
String query_id;
String query;
@ -326,16 +377,6 @@ private:
}
}
std::string signal_description = "Unknown signal";
/// Some of these are not really signals, but our own indications on failure reason.
if (sig == StdTerminate)
signal_description = "std::terminate";
else if (sig == SanitizerTrap)
signal_description = "sanitizer trap";
else if (sig >= 0)
signal_description = strsignal(sig); // NOLINT(concurrency-mt-unsafe) // it is not thread-safe but ok in this context
LOG_FATAL(log, "########################################");
if (query_id.empty())
@ -351,30 +392,11 @@ private:
thread_num, query_id, query, signal_description, sig);
}
String error_message;
if (sig != SanitizerTrap)
error_message = signalToErrorMessage(sig, info, *context);
else
error_message = "Sanitizer trap.";
LOG_FATAL(log, fmt::runtime(error_message));
if (stack_trace.getSize())
if (!bare_stacktrace_str.empty())
{
/// Write bare stack trace (addresses) just in case if we will fail to print symbolized stack trace.
/// NOTE: This still require memory allocations and mutex lock inside logger.
/// BTW we can also print it to stderr using write syscalls.
WriteBufferFromOwnString bare_stacktrace;
writeString("Stack trace:", bare_stacktrace);
for (size_t i = stack_trace.getOffset(); i < stack_trace.getSize(); ++i)
{
writeChar(' ', bare_stacktrace);
writePointerHex(stack_trace.getFramePointers()[i], bare_stacktrace);
}
LOG_FATAL(log, fmt::runtime(bare_stacktrace.str()));
LOG_FATAL(log, fmt::runtime(bare_stacktrace_str));
}
/// Write symbolized stack trace line by line for better grep-ability.
@ -1101,6 +1123,7 @@ void BaseDaemon::setupWatchdog()
if (0 == pid)
{
updateCurrentThreadIdAfterFork();
logger().information("Forked a child process to watch");
#if defined(OS_LINUX)
if (0 != prctl(PR_SET_PDEATHSIG, SIGKILL))

View File

@ -13,6 +13,7 @@
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ParserCreateQuery.h>
@ -182,6 +183,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
auto ast = parseQueryFromMetadata(log, getContext(), full_path.string(), /*throw_on_error*/ true, /*remove_empty*/ false);
if (ast)
{
FunctionNameNormalizer().visit(ast.get());
auto * create_query = ast->as<ASTCreateQuery>();
/// NOTE No concurrent writes are possible during database loading
create_query->setDatabase(TSA_SUPPRESS_WARNING_FOR_READ(database_name));

View File

@ -549,16 +549,17 @@ void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<d
for (size_t i = 0; i < key_index_to_state_from_storage.size(); ++i)
{
if (key_index_to_state_from_storage[i].isExpired()
|| key_index_to_state_from_storage[i].isNotFound())
if (key_index_to_state_from_storage[i].isExpired() || key_index_to_state_from_storage[i].isNotFound())
{
if constexpr (dictionary_key_type == DictionaryKeyType::Simple)
requested_keys_vector.emplace_back(requested_keys[i]);
else
requested_complex_key_rows.emplace_back(i);
auto requested_key = requested_keys[i];
not_found_keys.insert(requested_key);
auto [_, inserted] = not_found_keys.insert(requested_key);
if (inserted)
{
if constexpr (dictionary_key_type == DictionaryKeyType::Simple)
requested_keys_vector.emplace_back(requested_keys[i]);
else
requested_complex_key_rows.emplace_back(i);
}
}
}

View File

@ -266,7 +266,7 @@ public:
}
UInt64 getSize() const override { return reservation->getSize(); }
UInt64 getUnreservedSpace() const override { return reservation->getUnreservedSpace(); }
std::optional<UInt64> getUnreservedSpace() const override { return reservation->getUnreservedSpace(); }
DiskPtr getDisk(size_t i) const override
{

View File

@ -312,17 +312,17 @@ public:
}
}
UInt64 getTotalSpace() const override
std::optional<UInt64> getTotalSpace() const override
{
return delegate->getTotalSpace();
}
UInt64 getAvailableSpace() const override
std::optional<UInt64> getAvailableSpace() const override
{
return delegate->getAvailableSpace();
}
UInt64 getUnreservedSpace() const override
std::optional<UInt64> getUnreservedSpace() const override
{
return delegate->getUnreservedSpace();
}

View File

@ -78,7 +78,7 @@ public:
{}
UInt64 getSize() const override { return size; }
UInt64 getUnreservedSpace() const override { return unreserved_space; }
std::optional<UInt64> getUnreservedSpace() const override { return unreserved_space; }
DiskPtr getDisk(size_t i) const override
{
@ -175,8 +175,11 @@ std::optional<UInt64> DiskLocal::tryReserve(UInt64 bytes)
{
std::lock_guard lock(DiskLocal::reservation_mutex);
UInt64 available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
auto available_space = getAvailableSpace();
UInt64 unreserved_space = available_space
? *available_space - std::min(*available_space, reserved_bytes)
: std::numeric_limits<UInt64>::max();
if (bytes == 0)
{
@ -187,12 +190,24 @@ std::optional<UInt64> DiskLocal::tryReserve(UInt64 bytes)
if (unreserved_space >= bytes)
{
LOG_TRACE(
logger,
"Reserved {} on local disk {}, having unreserved {}.",
ReadableSize(bytes),
backQuote(name),
ReadableSize(unreserved_space));
if (available_space)
{
LOG_TRACE(
logger,
"Reserved {} on local disk {}, having unreserved {}.",
ReadableSize(bytes),
backQuote(name),
ReadableSize(unreserved_space));
}
else
{
LOG_TRACE(
logger,
"Reserved {} on local disk {}.",
ReadableSize(bytes),
backQuote(name));
}
++reservation_count;
reserved_bytes += bytes;
return {unreserved_space - bytes};
@ -218,14 +233,14 @@ static UInt64 getTotalSpaceByName(const String & name, const String & disk_path,
return total_size - keep_free_space_bytes;
}
UInt64 DiskLocal::getTotalSpace() const
std::optional<UInt64> DiskLocal::getTotalSpace() const
{
if (broken || readonly)
return 0;
return getTotalSpaceByName(name, disk_path, keep_free_space_bytes);
}
UInt64 DiskLocal::getAvailableSpace() const
std::optional<UInt64> DiskLocal::getAvailableSpace() const
{
if (broken || readonly)
return 0;
@ -242,10 +257,10 @@ UInt64 DiskLocal::getAvailableSpace() const
return total_size - keep_free_space_bytes;
}
UInt64 DiskLocal::getUnreservedSpace() const
std::optional<UInt64> DiskLocal::getUnreservedSpace() const
{
std::lock_guard lock(DiskLocal::reservation_mutex);
auto available_space = getAvailableSpace();
auto available_space = *getAvailableSpace();
available_space -= std::min(available_space, reserved_bytes);
return available_space;
}

View File

@ -35,11 +35,9 @@ public:
ReservationPtr reserve(UInt64 bytes) override;
UInt64 getTotalSpace() const override;
UInt64 getAvailableSpace() const override;
UInt64 getUnreservedSpace() const override;
std::optional<UInt64> getTotalSpace() const override;
std::optional<UInt64> getAvailableSpace() const override;
std::optional<UInt64> getUnreservedSpace() const override;
UInt64 getKeepingFreeSpace() const override { return keep_free_space_bytes; }

View File

@ -140,13 +140,13 @@ public:
const String & getName() const override { return name; }
/// Total available space on the disk.
virtual UInt64 getTotalSpace() const = 0;
virtual std::optional<UInt64> getTotalSpace() const = 0;
/// Space currently available on the disk.
virtual UInt64 getAvailableSpace() const = 0;
virtual std::optional<UInt64> getAvailableSpace() const = 0;
/// Space available for reservation (available space minus reserved space).
virtual UInt64 getUnreservedSpace() const = 0;
virtual std::optional<UInt64> getUnreservedSpace() const = 0;
/// Amount of bytes which should be kept free on the disk.
virtual UInt64 getKeepingFreeSpace() const { return 0; }
@ -495,7 +495,7 @@ public:
/// Space available for reservation
/// (with this reservation already take into account).
virtual UInt64 getUnreservedSpace() const = 0;
virtual std::optional<UInt64> getUnreservedSpace() const = 0;
/// Get i-th disk where reservation take place.
virtual DiskPtr getDisk(size_t i = 0) const = 0; /// NOLINT

View File

@ -42,23 +42,17 @@ namespace ErrorCodes
extern const int ARGUMENT_OUT_OF_BOUND;
}
static size_t chooseBufferSize(const ReadSettings & settings, size_t file_size)
{
/// Buffers used for prefetch or pre-download better to have enough size, but not bigger than the whole file.
return std::min<size_t>(std::max<size_t>(settings.prefetch_buffer_size, DBMS_DEFAULT_BUFFER_SIZE), file_size);
}
AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer(
ImplPtr impl_,
IAsynchronousReader & reader_,
const ReadSettings & settings_,
AsyncReadCountersPtr async_read_counters_,
FilesystemReadPrefetchesLogPtr prefetches_log_)
: ReadBufferFromFileBase(chooseBufferSize(settings_, impl_->getFileSize()), nullptr, 0)
: ReadBufferFromFileBase(chooseBufferSizeForRemoteReading(settings_, impl_->getFileSize()), nullptr, 0)
, impl(std::move(impl_))
, read_settings(settings_)
, reader(reader_)
, prefetch_buffer(chooseBufferSize(settings_, impl->getFileSize()))
, prefetch_buffer(chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()))
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, current_reader_id(getRandomASCIIString(8))
, log(&Poco::Logger::get("AsynchronousBoundedReadBuffer"))
@ -111,7 +105,7 @@ void AsynchronousBoundedReadBuffer::prefetch(Priority priority)
last_prefetch_info.submit_time = std::chrono::system_clock::now();
last_prefetch_info.priority = priority;
chassert(prefetch_buffer.size() == chooseBufferSize(read_settings, impl->getFileSize()));
chassert(prefetch_buffer.size() == chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()));
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), priority);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
}
@ -190,7 +184,7 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::SynchronousRemoteReadWaitMicroseconds);
chassert(memory.size() == chooseBufferSize(read_settings, impl->getFileSize()));
chassert(memory.size() == chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()));
std::tie(size, offset) = impl->readInto(memory.data(), memory.size(), file_offset_of_buffer_end, bytes_to_ignore);
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);

View File

@ -74,22 +74,19 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
}
void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog(
const FileSegment & file_segment, CachedOnDiskReadBufferFromFile::ReadType type)
const FileSegment::Range & file_segment_range, CachedOnDiskReadBufferFromFile::ReadType type)
{
if (!cache_log)
return;
const auto range = file_segment.range();
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = source_file_path,
.file_segment_range = { range.left, range.right },
.file_segment_range = { file_segment_range.left, file_segment_range.right },
.requested_range = { first_offset, read_until_position },
.file_segment_key = file_segment.key().toString(),
.file_segment_offset = file_segment.offset(),
.file_segment_size = range.size(),
.file_segment_size = file_segment_range.size(),
.read_from_cache_attempted = true,
.read_buffer_id = current_buffer_id,
.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(
@ -498,7 +495,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
auto completed_range = current_file_segment->range();
if (cache_log)
appendFilesystemCacheLog(*current_file_segment, read_type);
appendFilesystemCacheLog(completed_range, read_type);
chassert(file_offset_of_buffer_end > completed_range.right);
@ -521,7 +518,7 @@ CachedOnDiskReadBufferFromFile::~CachedOnDiskReadBufferFromFile()
{
if (cache_log && file_segments && !file_segments->empty())
{
appendFilesystemCacheLog(file_segments->front(), read_type);
appendFilesystemCacheLog(file_segments->front().range(), read_type);
}
}
@ -1090,6 +1087,10 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
first_offset,
file_segments->toString());
/// Release buffer a little bit earlier.
if (read_until_position == file_offset_of_buffer_end)
implementation_buffer.reset();
return result;
}

View File

@ -90,7 +90,7 @@ private:
bool completeFileSegmentAndGetNext();
void appendFilesystemCacheLog(const FileSegment & file_segment, ReadType read_type);
void appendFilesystemCacheLog(const FileSegment::Range & file_segment_range, ReadType read_type);
bool writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment);

View File

@ -2,14 +2,27 @@
#include <IO/SeekableReadBuffer.h>
#include <iostream>
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
#include <Disks/ObjectStorages/Cached/CachedObjectStorage.h>
#include <Common/logger_useful.h>
#include <IO/ReadSettings.h>
#include <IO/SwapHelper.h>
#include <iostream>
#include <base/hex.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <base/hex.h>
#include <Common/logger_useful.h>
using namespace DB;
namespace
{
bool withCache(const ReadSettings & settings)
{
return settings.remote_fs_cache && settings.enable_filesystem_cache
&& (!CurrentThread::getQueryId().empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache
|| !settings.avoid_readthrough_cache_outside_query_context);
}
}
namespace DB
{
@ -18,29 +31,35 @@ namespace ErrorCodes
extern const int CANNOT_SEEK_THROUGH_FILE;
}
size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size)
{
/// Only when cache is used we could download bigger portions of FileSegments than what we actually gonna read within particular task.
if (!withCache(settings))
return settings.remote_fs_buffer_size;
/// Buffers used for prefetch and pre-download better to have enough size, but not bigger than the whole file.
return std::min<size_t>(std::max<size_t>(settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE), file_size);
}
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
bool use_external_buffer_)
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : settings_.remote_fs_buffer_size, nullptr, 0)
: ReadBufferFromFileBase(
use_external_buffer_ ? 0 : chooseBufferSizeForRemoteReading(settings_, getTotalSize(blobs_to_read_)), nullptr, 0)
, settings(settings_)
, blobs_to_read(blobs_to_read_)
, read_buffer_creator(std::move(read_buffer_creator_))
, cache_log(settings.enable_filesystem_cache_log ? cache_log_ : nullptr)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, query_id(CurrentThread::getQueryId())
, use_external_buffer(use_external_buffer_)
, with_cache(withCache(settings))
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
{
if (!blobs_to_read.empty())
current_object = blobs_to_read.front();
with_cache = settings.remote_fs_cache
&& settings.enable_filesystem_cache
&& (!query_id.empty()
|| settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache
|| !settings.avoid_readthrough_cache_outside_query_context);
}
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
@ -90,8 +109,6 @@ void ReadBufferFromRemoteFSGather::appendUncachedReadInfo()
.source_file_path = current_object.remote_path,
.file_segment_range = { 0, current_object.bytes_size },
.cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE,
.file_segment_key = {},
.file_segment_offset = {},
.file_segment_size = current_object.bytes_size,
.read_from_cache_attempted = false,
};

View File

@ -73,7 +73,7 @@ private:
const std::shared_ptr<FilesystemCacheLog> cache_log;
const String query_id;
const bool use_external_buffer;
bool with_cache;
const bool with_cache;
size_t read_until_position = 0;
size_t file_offset_of_buffer_end = 0;
@ -86,4 +86,5 @@ private:
Poco::Logger * log;
};
size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size);
}

View File

@ -49,11 +49,18 @@ IVolume::IVolume(
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Volume must contain at least one disk");
}
UInt64 IVolume::getMaxUnreservedFreeSpace() const
std::optional<UInt64> IVolume::getMaxUnreservedFreeSpace() const
{
UInt64 res = 0;
std::optional<UInt64> res;
for (const auto & disk : disks)
res = std::max(res, disk->getUnreservedSpace());
{
auto disk_unreserved_space = disk->getUnreservedSpace();
if (!disk_unreserved_space)
return std::nullopt; /// There is at least one unlimited disk.
if (!res || *disk_unreserved_space > *res)
res = disk_unreserved_space;
}
return res;
}

View File

@ -74,7 +74,7 @@ public:
virtual VolumeType getType() const = 0;
/// Return biggest unreserved space across all disks
UInt64 getMaxUnreservedFreeSpace() const;
std::optional<UInt64> getMaxUnreservedFreeSpace() const;
DiskPtr getDisk() const { return getDisk(0); }
virtual DiskPtr getDisk(size_t i) const { return disks[i]; }

View File

@ -410,18 +410,25 @@ void DiskObjectStorage::removeSharedRecursive(
transaction->commit();
}
std::optional<UInt64> DiskObjectStorage::tryReserve(UInt64 bytes)
bool DiskObjectStorage::tryReserve(UInt64 bytes)
{
std::lock_guard lock(reservation_mutex);
auto available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
if (!available_space)
{
++reservation_count;
reserved_bytes += bytes;
return true;
}
UInt64 unreserved_space = *available_space - std::min(*available_space, reserved_bytes);
if (bytes == 0)
{
LOG_TRACE(log, "Reserved 0 bytes on remote disk {}", backQuote(name));
++reservation_count;
return {unreserved_space};
return true;
}
if (unreserved_space >= bytes)
@ -434,14 +441,14 @@ std::optional<UInt64> DiskObjectStorage::tryReserve(UInt64 bytes)
ReadableSize(unreserved_space));
++reservation_count;
reserved_bytes += bytes;
return {unreserved_space - bytes};
return true;
}
else
{
LOG_TRACE(log, "Could not reserve {} on remote disk {}. Not enough unreserved space", ReadableSize(bytes), backQuote(name));
}
return {};
return false;
}
bool DiskObjectStorage::supportsCache() const

View File

@ -53,11 +53,9 @@ public:
const std::string & getCacheName() const override { return object_storage->getCacheName(); }
UInt64 getTotalSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getAvailableSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getUnreservedSpace() const override { return std::numeric_limits<UInt64>::max(); }
std::optional<UInt64> getTotalSpace() const override { return {}; }
std::optional<UInt64> getAvailableSpace() const override { return {}; }
std::optional<UInt64> getUnreservedSpace() const override { return {}; }
UInt64 getKeepingFreeSpace() const override { return 0; }
@ -224,7 +222,7 @@ private:
UInt64 reservation_count = 0;
std::mutex reservation_mutex;
std::optional<UInt64> tryReserve(UInt64 bytes);
bool tryReserve(UInt64 bytes);
const bool send_metadata;
@ -244,7 +242,7 @@ public:
UInt64 getSize() const override { return size; }
UInt64 getUnreservedSpace() const override { return unreserved_space; }
std::optional<UInt64> getUnreservedSpace() const override { return unreserved_space; }
DiskPtr getDisk(size_t i) const override;

View File

@ -149,7 +149,7 @@ private:
bool S3ObjectStorage::exists(const StoredObject & object) const
{
auto settings_ptr = s3_settings.get();
return S3::objectExists(*client.get(), bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
return S3::objectExists(*clients.get()->client, bucket, object.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
}
std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
@ -168,7 +168,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
(const std::string & path, size_t read_until_position) -> std::unique_ptr<ReadBufferFromFileBase>
{
return std::make_unique<ReadBufferFromS3>(
client.get(),
clients.get()->client,
bucket,
path,
version_id,
@ -218,7 +218,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObject( /// NOLINT
{
auto settings_ptr = s3_settings.get();
return std::make_unique<ReadBufferFromS3>(
client.get(),
clients.get()->client,
bucket,
object.remote_path,
version_id,
@ -243,8 +243,10 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
if (write_settings.s3_allow_parallel_part_upload)
scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "VFSWrite");
auto clients_ = clients.get();
return std::make_unique<WriteBufferFromS3>(
client.get(),
clients_->client,
clients_->client_with_long_timeout,
bucket,
object.remote_path,
buf_size,
@ -258,7 +260,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
ObjectStorageIteratorPtr S3ObjectStorage::iterate(const std::string & path_prefix) const
{
auto settings_ptr = s3_settings.get();
auto client_ptr = client.get();
auto client_ptr = clients.get()->client;
return std::make_shared<S3IteratorAsync>(bucket, path_prefix, client_ptr, settings_ptr->list_object_keys_size);
}
@ -266,7 +268,7 @@ ObjectStorageIteratorPtr S3ObjectStorage::iterate(const std::string & path_prefi
void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const
{
auto settings_ptr = s3_settings.get();
auto client_ptr = client.get();
auto client_ptr = clients.get()->client;
S3::ListObjectsV2Request request;
request.SetBucket(bucket);
@ -307,7 +309,7 @@ void S3ObjectStorage::listObjects(const std::string & path, RelativePathsWithMet
void S3ObjectStorage::removeObjectImpl(const StoredObject & object, bool if_exists)
{
auto client_ptr = client.get();
auto client_ptr = clients.get()->client;
ProfileEvents::increment(ProfileEvents::S3DeleteObjects);
ProfileEvents::increment(ProfileEvents::DiskS3DeleteObjects);
@ -333,7 +335,7 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
}
else
{
auto client_ptr = client.get();
auto client_ptr = clients.get()->client;
auto settings_ptr = s3_settings.get();
size_t chunk_size_limit = settings_ptr->objects_chunk_size_to_delete;
@ -394,7 +396,7 @@ void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::string & path) const
{
auto settings_ptr = s3_settings.get();
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
auto object_info = S3::getObjectInfo(*clients.get()->client, bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true, /* throw_on_error= */ false);
if (object_info.size == 0 && object_info.last_modification_time == 0 && object_info.metadata.empty())
return {};
@ -410,7 +412,7 @@ std::optional<ObjectMetadata> S3ObjectStorage::tryGetObjectMetadata(const std::s
ObjectMetadata S3ObjectStorage::getObjectMetadata(const std::string & path) const
{
auto settings_ptr = s3_settings.get();
auto object_info = S3::getObjectInfo(*client.get(), bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
auto object_info = S3::getObjectInfo(*clients.get()->client, bucket, path, {}, settings_ptr->request_settings, /* with_metadata= */ true, /* for_disk_s3= */ true);
ObjectMetadata result;
result.size_bytes = object_info.size;
@ -429,7 +431,7 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
/// Shortcut for S3
if (auto * dest_s3 = dynamic_cast<S3ObjectStorage * >(&object_storage_to); dest_s3 != nullptr)
{
auto client_ptr = client.get();
auto client_ptr = clients.get()->client;
auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*client_ptr, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
@ -445,7 +447,7 @@ void S3ObjectStorage::copyObjectToAnotherObjectStorage( // NOLINT
void S3ObjectStorage::copyObject( // NOLINT
const StoredObject & object_from, const StoredObject & object_to, std::optional<ObjectAttributes> object_to_attributes)
{
auto client_ptr = client.get();
auto client_ptr = clients.get()->client;
auto settings_ptr = s3_settings.get();
auto size = S3::getObjectSize(*client_ptr, bucket, object_from.remote_path, {}, settings_ptr->request_settings, /* for_disk_s3= */ true);
auto scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "S3ObjStor_copy");
@ -458,35 +460,33 @@ void S3ObjectStorage::setNewSettings(std::unique_ptr<S3ObjectStorageSettings> &&
s3_settings.set(std::move(s3_settings_));
}
void S3ObjectStorage::setNewClient(std::unique_ptr<S3::Client> && client_)
{
client.set(std::move(client_));
}
void S3ObjectStorage::shutdown()
{
auto client_ptr = client.get();
auto clients_ptr = clients.get();
/// This call stops any next retry attempts for ongoing S3 requests.
/// If S3 request is failed and the method below is executed S3 client immediately returns the last failed S3 request outcome.
/// If S3 is healthy nothing wrong will be happened and S3 requests will be processed in a regular way without errors.
/// This should significantly speed up shutdown process if S3 is unhealthy.
const_cast<S3::Client &>(*client_ptr).DisableRequestProcessing();
const_cast<S3::Client &>(*clients_ptr->client).DisableRequestProcessing();
const_cast<S3::Client &>(*clients_ptr->client_with_long_timeout).DisableRequestProcessing();
}
void S3ObjectStorage::startup()
{
auto client_ptr = client.get();
auto clients_ptr = clients.get();
/// Need to be enabled if it was disabled during shutdown() call.
const_cast<S3::Client &>(*client_ptr).EnableRequestProcessing();
const_cast<S3::Client &>(*clients_ptr->client).EnableRequestProcessing();
const_cast<S3::Client &>(*clients_ptr->client_with_long_timeout).EnableRequestProcessing();
}
void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration & config, const std::string & config_prefix, ContextPtr context)
{
auto new_s3_settings = getSettings(config, config_prefix, context);
auto new_client = getClient(config, config_prefix, context, *new_s3_settings);
auto new_clients = std::make_unique<Clients>(std::move(new_client), *new_s3_settings);
s3_settings.set(std::move(new_s3_settings));
client.set(std::move(new_client));
clients.set(std::move(new_clients));
}
std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
@ -501,7 +501,9 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
endpoint);
}
S3ObjectStorage::Clients::Clients(std::shared_ptr<S3::Client> client_, const S3ObjectStorageSettings & settings)
: client(std::move(client_)), client_with_long_timeout(client->clone(std::nullopt, settings.request_settings.long_request_timeout_ms)) {}
}
#endif

View File

@ -39,6 +39,16 @@ struct S3ObjectStorageSettings
class S3ObjectStorage : public IObjectStorage
{
public:
struct Clients
{
std::shared_ptr<S3::Client> client;
std::shared_ptr<S3::Client> client_with_long_timeout;
Clients() = default;
Clients(std::shared_ptr<S3::Client> client, const S3ObjectStorageSettings & settings);
};
private:
friend class S3PlainObjectStorage;
@ -51,7 +61,7 @@ private:
String bucket_,
String connection_string)
: bucket(bucket_)
, client(std::move(client_))
, clients(std::make_unique<Clients>(std::move(client_), *s3_settings_))
, s3_settings(std::move(s3_settings_))
, s3_capabilities(s3_capabilities_)
, version_id(std::move(version_id_))
@ -159,14 +169,12 @@ public:
private:
void setNewSettings(std::unique_ptr<S3ObjectStorageSettings> && s3_settings_);
void setNewClient(std::unique_ptr<S3::Client> && client_);
void removeObjectImpl(const StoredObject & object, bool if_exists);
void removeObjectsImpl(const StoredObjects & objects, bool if_exists);
std::string bucket;
MultiVersion<S3::Client> client;
MultiVersion<Clients> clients;
MultiVersion<S3ObjectStorageSettings> s3_settings;
S3Capabilities s3_capabilities;

View File

@ -129,9 +129,12 @@ std::unique_ptr<S3::Client> getClient(
throw Exception(ErrorCodes::BAD_ARGUMENTS, "S3 path must ends with '/', but '{}' doesn't.", uri.key);
client_configuration.connectTimeoutMs = config.getUInt(config_prefix + ".connect_timeout_ms", 1000);
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 30000);
client_configuration.requestTimeoutMs = config.getUInt(config_prefix + ".request_timeout_ms", 3000);
client_configuration.maxConnections = config.getUInt(config_prefix + ".max_connections", 100);
client_configuration.endpointOverride = uri.endpoint;
client_configuration.http_keep_alive_timeout_ms = config.getUInt(config_prefix + ".http_keep_alive_timeout_ms", 10000);
client_configuration.http_connection_pool_size = config.getUInt(config_prefix + ".http_connection_pool_size", 1000);
client_configuration.wait_on_pool_size_limit = false;
auto proxy_config = getProxyConfiguration(config_prefix, config);
if (proxy_config)

View File

@ -209,10 +209,17 @@ DiskPtr StoragePolicy::tryGetDiskByName(const String & disk_name) const
UInt64 StoragePolicy::getMaxUnreservedFreeSpace() const
{
UInt64 res = 0;
std::optional<UInt64> res;
for (const auto & volume : volumes)
res = std::max(res, volume->getMaxUnreservedFreeSpace());
return res;
{
auto volume_unreserved_space = volume->getMaxUnreservedFreeSpace();
if (!volume_unreserved_space)
return -1ULL; /// There is at least one unlimited disk.
if (!res || *volume_unreserved_space > *res)
res = volume_unreserved_space;
}
return res.value_or(-1ULL);
}
@ -248,22 +255,37 @@ ReservationPtr StoragePolicy::reserveAndCheck(UInt64 bytes) const
ReservationPtr StoragePolicy::makeEmptyReservationOnLargestDisk() const
{
UInt64 max_space = 0;
bool found_bottomless_disk = false;
DiskPtr max_disk;
for (const auto & volume : volumes)
{
for (const auto & disk : volume->getDisks())
{
auto avail_space = disk->getAvailableSpace();
if (avail_space > max_space)
auto available_space = disk->getAvailableSpace();
if (!available_space)
{
max_space = avail_space;
max_disk = disk;
found_bottomless_disk = true;
break;
}
if (*available_space > max_space)
{
max_space = *available_space;
max_disk = disk;
}
}
if (found_bottomless_disk)
break;
}
if (!max_disk)
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "There is no space on any disk in storage policy: {}. "
"It's likely all disks are broken", name);
auto reservation = max_disk->reserve(0);
if (!reservation)
{

View File

@ -40,20 +40,28 @@ VolumeJBOD::VolumeJBOD(
auto ratio = config.getDouble(config_prefix + ".max_data_part_size_ratio");
if (ratio < 0)
throw Exception(ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG, "'max_data_part_size_ratio' have to be not less then 0.");
UInt64 sum_size = 0;
std::vector<UInt64> sizes;
for (const auto & disk : disks)
{
sizes.push_back(disk->getTotalSpace());
sum_size += sizes.back();
auto size = disk->getTotalSpace();
if (size)
sum_size += *size;
else
break;
sizes.push_back(*size);
}
max_data_part_size = static_cast<decltype(max_data_part_size)>(sum_size * ratio / disks.size());
for (size_t i = 0; i < disks.size(); ++i)
if (sizes.size() == disks.size())
{
if (sizes[i] < max_data_part_size)
max_data_part_size = static_cast<UInt64>(sum_size * ratio / disks.size());
for (size_t i = 0; i < disks.size(); ++i)
{
LOG_WARNING(logger, "Disk {} on volume {} have not enough space ({}) for containing part the size of max_data_part_size ({})",
backQuote(disks[i]->getName()), backQuote(config_prefix), ReadableSize(sizes[i]), ReadableSize(max_data_part_size));
if (sizes[i] < max_data_part_size)
{
LOG_WARNING(logger, "Disk {} on volume {} have not enough space ({}) for containing part the size of max_data_part_size ({})",
backQuote(disks[i]->getName()), backQuote(config_prefix), ReadableSize(sizes[i]), ReadableSize(max_data_part_size));
}
}
}
}

View File

@ -68,7 +68,7 @@ private:
struct DiskWithSize
{
DiskPtr disk;
uint64_t free_size = 0;
std::optional<UInt64> free_size = 0;
DiskWithSize(DiskPtr disk_)
: disk(disk_)
@ -80,7 +80,7 @@ private:
return free_size < rhs.free_size;
}
ReservationPtr reserve(uint64_t bytes)
ReservationPtr reserve(UInt64 bytes)
{
ReservationPtr reservation = disk->reserve(bytes);
if (!reservation)

View File

@ -56,7 +56,7 @@ void loadDiskLocalConfig(const String & name,
tmp_path = context->getPath();
// Create tmp disk for getting total disk space.
keep_free_space_bytes = static_cast<UInt64>(DiskLocal("tmp", tmp_path, 0, config, config_prefix).getTotalSpace() * ratio);
keep_free_space_bytes = static_cast<UInt64>(*DiskLocal("tmp", tmp_path, 0, config, config_prefix).getTotalSpace() * ratio);
}
}

View File

@ -155,7 +155,6 @@ public:
if (!((executeType<UInt8>(result_column, arguments, input_rows_count))
|| (executeType<UInt16>(result_column, arguments, input_rows_count))
|| (executeType<UInt32>(result_column, arguments, input_rows_count))
|| (executeType<UInt32>(result_column, arguments, input_rows_count))
|| (executeType<UInt64>(result_column, arguments, input_rows_count))
|| (executeType<Int8>(result_column, arguments, input_rows_count))
|| (executeType<Int16>(result_column, arguments, input_rows_count))

View File

@ -17,7 +17,7 @@ namespace ErrorCodes
/** Token search the string, means that needle must be surrounded by some separator chars, like whitespace or puctuation.
*/
template <typename Name, typename TokenSearcher, bool negate>
template <typename Name, typename Searcher, bool negate>
struct HasTokenImpl
{
using ResultType = UInt8;
@ -46,7 +46,7 @@ struct HasTokenImpl
const UInt8 * const end = haystack_data.data() + haystack_data.size();
const UInt8 * pos = begin;
if (!ASCIICaseSensitiveTokenSearcher::isValidNeedle(pattern.data(), pattern.size()))
if (!std::none_of(pattern.begin(), pattern.end(), isTokenSeparator))
{
if (res_null)
{
@ -58,7 +58,8 @@ struct HasTokenImpl
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Needle must not contain whitespace or separator characters");
}
TokenSearcher searcher(pattern.data(), pattern.size(), end - pos);
size_t pattern_size = pattern.size();
Searcher searcher(pattern.data(), pattern_size, end - pos);
if (res_null)
std::ranges::fill(res_null->getData(), false);
@ -67,21 +68,31 @@ struct HasTokenImpl
/// We will search for the next occurrence in all rows at once.
while (pos < end && end != (pos = searcher.search(pos, end - pos)))
{
/// Let's determine which index it refers to.
while (begin + haystack_offsets[i] <= pos)
/// The found substring is a token
if ((pos == begin || isTokenSeparator(pos[-1]))
&& (pos + pattern_size == end || isTokenSeparator(pos[pattern_size])))
{
res[i] = negate;
/// Let's determine which index it refers to.
while (begin + haystack_offsets[i] <= pos)
{
res[i] = negate;
++i;
}
/// We check that the entry does not pass through the boundaries of strings.
if (pos + pattern.size() < begin + haystack_offsets[i])
res[i] = !negate;
else
res[i] = negate;
pos = begin + haystack_offsets[i];
++i;
}
/// We check that the entry does not pass through the boundaries of strings.
if (pos + pattern.size() < begin + haystack_offsets[i])
res[i] = !negate;
else
res[i] = negate;
pos = begin + haystack_offsets[i];
++i;
{
/// Not a token. Jump over it.
pos += pattern_size;
}
}
/// Tail, in which there can be no substring.
@ -113,6 +124,12 @@ struct HasTokenImpl
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Function '{}' doesn't support FixedString haystack argument", name);
}
private:
static bool isTokenSeparator(UInt8 c)
{
return isASCII(c) && !isAlphaNumericASCII(c);
}
};
}

View File

@ -20,7 +20,6 @@ namespace DB
namespace ErrorCodes
{
extern const int UNSUPPORTED_METHOD;
extern const int FUNCTION_CANNOT_HAVE_PARAMETERS;
}
void UserDefinedSQLFunctionVisitor::visit(ASTPtr & ast)
@ -139,12 +138,6 @@ ASTPtr UserDefinedSQLFunctionVisitor::tryToReplaceFunction(const ASTFunction & f
if (!user_defined_function)
return nullptr;
/// All UDFs are not parametric for now.
if (function.parameters)
{
throw Exception(ErrorCodes::FUNCTION_CANNOT_HAVE_PARAMETERS, "Function {} is not parametric", function.name);
}
const auto & function_arguments_list = function.children.at(0)->as<ASTExpressionList>();
auto & function_arguments = function_arguments_list->children;

View File

@ -22,19 +22,19 @@ namespace
struct FilesystemAvailable
{
static constexpr auto name = "filesystemAvailable";
static std::uintmax_t get(const DiskPtr & disk) { return disk->getAvailableSpace(); }
static UInt64 get(const DiskPtr & disk) { return disk->getAvailableSpace().value_or(std::numeric_limits<UInt64>::max()); }
};
struct FilesystemUnreserved
{
static constexpr auto name = "filesystemUnreserved";
static std::uintmax_t get(const DiskPtr & disk) { return disk->getUnreservedSpace(); }
static UInt64 get(const DiskPtr & disk) { return disk->getUnreservedSpace().value_or(std::numeric_limits<UInt64>::max()); }
};
struct FilesystemCapacity
{
static constexpr auto name = "filesystemCapacity";
static std::uintmax_t get(const DiskPtr & disk) { return disk->getTotalSpace(); }
static UInt64 get(const DiskPtr & disk) { return disk->getTotalSpace().value_or(std::numeric_limits<UInt64>::max()); }
};
template <typename Impl>

View File

@ -4,6 +4,7 @@
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <string>
@ -16,7 +17,6 @@ namespace DB
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int ILLEGAL_COLUMN;
extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
}
@ -37,7 +37,6 @@ public:
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {2}; }
bool useDefaultImplementationForConstants() const override { return true; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
@ -58,14 +57,25 @@ public:
return std::make_shared<DataTypeString>();
}
template <typename LonType, typename LatType>
bool tryExecute(const IColumn * lon_column, const IColumn * lat_column, UInt64 precision_value, ColumnPtr & result) const
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
{
const ColumnVector<LonType> * longitude = checkAndGetColumn<ColumnVector<LonType>>(lon_column);
const ColumnVector<LatType> * latitude = checkAndGetColumn<ColumnVector<LatType>>(lat_column);
if (!latitude || !longitude)
return false;
const IColumn * longitude = arguments[0].column.get();
const IColumn * latitude = arguments[1].column.get();
ColumnPtr precision;
if (arguments.size() < 3)
precision = DataTypeUInt8().createColumnConst(longitude->size(), GEOHASH_MAX_TEXT_LENGTH);
else
precision = arguments[2].column;
ColumnPtr res_column;
vector(longitude, latitude, precision.get(), res_column);
return res_column;
}
private:
void vector(const IColumn * lon_column, const IColumn * lat_column, const IColumn * precision_column, ColumnPtr & result) const
{
auto col_str = ColumnString::create();
ColumnString::Chars & out_vec = col_str->getChars();
ColumnString::Offsets & out_offsets = col_str->getOffsets();
@ -80,8 +90,9 @@ public:
for (size_t i = 0; i < size; ++i)
{
const Float64 longitude_value = longitude->getElement(i);
const Float64 latitude_value = latitude->getElement(i);
const Float64 longitude_value = lon_column->getFloat64(i);
const Float64 latitude_value = lat_column->getFloat64(i);
const UInt64 precision_value = std::min<UInt64>(precision_column->get64(i), GEOHASH_MAX_TEXT_LENGTH);
const size_t encoded_size = geohashEncode(longitude_value, latitude_value, precision_value, pos);
@ -95,37 +106,6 @@ public:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Column size mismatch (internal logical error)");
result = std::move(col_str);
return true;
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
{
const IColumn * longitude = arguments[0].column.get();
const IColumn * latitude = arguments[1].column.get();
const UInt64 precision_value = std::min<UInt64>(GEOHASH_MAX_TEXT_LENGTH,
arguments.size() == 3 ? arguments[2].column->get64(0) : GEOHASH_MAX_TEXT_LENGTH);
ColumnPtr res_column;
if (tryExecute<Float32, Float32>(longitude, latitude, precision_value, res_column) ||
tryExecute<Float64, Float32>(longitude, latitude, precision_value, res_column) ||
tryExecute<Float32, Float64>(longitude, latitude, precision_value, res_column) ||
tryExecute<Float64, Float64>(longitude, latitude, precision_value, res_column))
return res_column;
std::string arguments_description;
for (size_t i = 0; i < arguments.size(); ++i)
{
if (i != 0)
arguments_description += ", ";
arguments_description += arguments[i].column->getName();
}
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Unsupported argument types: {} for function {}",
arguments_description, getName());
}
};

View File

@ -6,6 +6,7 @@
namespace DB
{
struct NameHasToken
{
static constexpr auto name = "hasToken";
@ -17,9 +18,9 @@ struct NameHasTokenOrNull
};
using FunctionHasToken
= FunctionsStringSearch<HasTokenImpl<NameHasToken, VolnitskyCaseSensitiveToken, false>>;
= FunctionsStringSearch<HasTokenImpl<NameHasToken, Volnitsky, false>>;
using FunctionHasTokenOrNull
= FunctionsStringSearch<HasTokenImpl<NameHasTokenOrNull, VolnitskyCaseSensitiveToken, false>, ExecutionErrorPolicy::Null>;
= FunctionsStringSearch<HasTokenImpl<NameHasTokenOrNull, Volnitsky, false>, ExecutionErrorPolicy::Null>;
REGISTER_FUNCTION(HasToken)
{

View File

@ -6,6 +6,7 @@
namespace DB
{
struct NameHasTokenCaseInsensitive
{
static constexpr auto name = "hasTokenCaseInsensitive";
@ -17,9 +18,9 @@ struct NameHasTokenCaseInsensitiveOrNull
};
using FunctionHasTokenCaseInsensitive
= FunctionsStringSearch<HasTokenImpl<NameHasTokenCaseInsensitive, VolnitskyCaseInsensitiveToken, false>>;
= FunctionsStringSearch<HasTokenImpl<NameHasTokenCaseInsensitive, VolnitskyCaseInsensitive, false>>;
using FunctionHasTokenCaseInsensitiveOrNull
= FunctionsStringSearch<HasTokenImpl<NameHasTokenCaseInsensitiveOrNull, VolnitskyCaseInsensitiveToken, false>, ExecutionErrorPolicy::Null>;
= FunctionsStringSearch<HasTokenImpl<NameHasTokenCaseInsensitiveOrNull, VolnitskyCaseInsensitive, false>, ExecutionErrorPolicy::Null>;
REGISTER_FUNCTION(HasTokenCaseInsensitive)
{

View File

@ -1,8 +1,10 @@
#include <IO/HTTPCommon.h>
#include <Server/HTTP/HTTPServerResponse.h>
#include <Poco/Any.h>
#include <Common/DNSResolver.h>
#include <Common/Exception.h>
#include <Common/MemoryTrackerSwitcher.h>
#include <Common/PoolBase.h>
#include <Common/ProfileEvents.h>
#include <Common/SipHash.h>
@ -40,6 +42,7 @@ namespace ErrorCodes
extern const int RECEIVED_ERROR_TOO_MANY_REQUESTS;
extern const int FEATURE_IS_NOT_ENABLED_AT_BUILD_TIME;
extern const int UNSUPPORTED_URI_SCHEME;
extern const int LOGICAL_ERROR;
}
@ -107,6 +110,9 @@ namespace
ObjectPtr allocObject() override
{
/// Pool is global, we shouldn't attribute this memory to query/user.
MemoryTrackerSwitcher switcher{&total_memory_tracker};
auto session = makeHTTPSessionImpl(host, port, https, true, resolve_host);
if (!proxy_host.empty())
{
@ -131,8 +137,12 @@ namespace
UInt16 proxy_port_,
bool proxy_https_,
size_t max_pool_size_,
bool resolve_host_ = true)
: Base(static_cast<unsigned>(max_pool_size_), &Poco::Logger::get("HTTPSessionPool"))
bool resolve_host_,
bool wait_on_pool_size_limit)
: Base(
static_cast<unsigned>(max_pool_size_),
&Poco::Logger::get("HTTPSessionPool"),
wait_on_pool_size_limit ? BehaviourOnLimit::Wait : BehaviourOnLimit::AllocateNewBypassingPool)
, host(host_)
, port(port_)
, https(https_)
@ -155,11 +165,12 @@ namespace
String proxy_host;
UInt16 proxy_port;
bool is_proxy_https;
bool wait_on_pool_size_limit;
bool operator ==(const Key & rhs) const
{
return std::tie(target_host, target_port, is_target_https, proxy_host, proxy_port, is_proxy_https)
== std::tie(rhs.target_host, rhs.target_port, rhs.is_target_https, rhs.proxy_host, rhs.proxy_port, rhs.is_proxy_https);
return std::tie(target_host, target_port, is_target_https, proxy_host, proxy_port, is_proxy_https, wait_on_pool_size_limit)
== std::tie(rhs.target_host, rhs.target_port, rhs.is_target_https, rhs.proxy_host, rhs.proxy_port, rhs.is_proxy_https, rhs.wait_on_pool_size_limit);
}
};
@ -178,6 +189,7 @@ namespace
s.update(k.proxy_host);
s.update(k.proxy_port);
s.update(k.is_proxy_https);
s.update(k.wait_on_pool_size_limit);
return s.get64();
}
};
@ -218,14 +230,14 @@ namespace
const Poco::URI & proxy_uri,
const ConnectionTimeouts & timeouts,
size_t max_connections_per_endpoint,
bool resolve_host = true)
bool resolve_host,
bool wait_on_pool_size_limit)
{
std::lock_guard lock(mutex);
std::unique_lock lock(mutex);
const std::string & host = uri.getHost();
UInt16 port = uri.getPort();
bool https = isHTTPS(uri);
String proxy_host;
UInt16 proxy_port = 0;
bool proxy_https = false;
@ -236,36 +248,42 @@ namespace
proxy_https = isHTTPS(proxy_uri);
}
HTTPSessionPool::Key key{host, port, https, proxy_host, proxy_port, proxy_https};
HTTPSessionPool::Key key{host, port, https, proxy_host, proxy_port, proxy_https, wait_on_pool_size_limit};
auto pool_ptr = endpoints_pool.find(key);
if (pool_ptr == endpoints_pool.end())
std::tie(pool_ptr, std::ignore) = endpoints_pool.emplace(
key, std::make_shared<SingleEndpointHTTPSessionPool>(host, port, https, proxy_host, proxy_port, proxy_https, max_connections_per_endpoint, resolve_host));
key,
std::make_shared<SingleEndpointHTTPSessionPool>(
host,
port,
https,
proxy_host,
proxy_port,
proxy_https,
max_connections_per_endpoint,
resolve_host,
wait_on_pool_size_limit));
/// Some routines held session objects until the end of its lifetime. Also this routines may create another sessions in this time frame.
/// If some other session holds `lock` because it waits on another lock inside `pool_ptr->second->get` it isn't possible to create any
/// new session and thus finish routine, return session to the pool and unlock the thread waiting inside `pool_ptr->second->get`.
/// To avoid such a deadlock we unlock `lock` before entering `pool_ptr->second->get`.
lock.unlock();
auto retry_timeout = timeouts.connection_timeout.totalMicroseconds();
auto session = pool_ptr->second->get(retry_timeout);
/// We store exception messages in session data.
/// Poco HTTPSession also stores exception, but it can be removed at any time.
const auto & session_data = session->sessionData();
if (!session_data.empty())
if (session_data.empty() || !Poco::AnyCast<HTTPSessionReuseTag>(&session_data))
{
auto msg = Poco::AnyCast<std::string>(session_data);
if (!msg.empty())
{
LOG_TRACE((&Poco::Logger::get("HTTPCommon")), "Failed communicating with {} with error '{}' will try to reconnect session", host, msg);
session->reset();
if (resolve_host)
{
updateHostIfIpChanged(session, DNSResolver::instance().resolveHost(host).toString());
}
}
/// Reset the message, once it has been printed,
/// otherwise you will get report for failed parts on and on,
/// even for different tables (since they uses the same session).
session->attachSessionData({});
if (resolve_host)
updateHostIfIpChanged(session, DNSResolver::instance().resolveHost(host).toString());
}
session->attachSessionData({});
setTimeouts(*session, timeouts);
return session;
@ -295,14 +313,25 @@ HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts &
}
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host)
PooledHTTPSessionPtr makePooledHTTPSession(
const Poco::URI & uri,
const ConnectionTimeouts & timeouts,
size_t per_endpoint_pool_size,
bool resolve_host,
bool wait_on_pool_size_limit)
{
return makePooledHTTPSession(uri, {}, timeouts, per_endpoint_pool_size, resolve_host);
return makePooledHTTPSession(uri, {}, timeouts, per_endpoint_pool_size, resolve_host, wait_on_pool_size_limit);
}
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Poco::URI & proxy_uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host)
PooledHTTPSessionPtr makePooledHTTPSession(
const Poco::URI & uri,
const Poco::URI & proxy_uri,
const ConnectionTimeouts & timeouts,
size_t per_endpoint_pool_size,
bool resolve_host,
bool wait_on_pool_size_limit)
{
return HTTPSessionPool::instance().getSession(uri, proxy_uri, timeouts, per_endpoint_pool_size, resolve_host);
return HTTPSessionPool::instance().getSession(uri, proxy_uri, timeouts, per_endpoint_pool_size, resolve_host, wait_on_pool_size_limit);
}
bool isRedirect(const Poco::Net::HTTPResponse::HTTPStatus status) { return status == Poco::Net::HTTPResponse::HTTP_MOVED_PERMANENTLY || status == Poco::Net::HTTPResponse::HTTP_FOUND || status == Poco::Net::HTTPResponse::HTTP_SEE_OTHER || status == Poco::Net::HTTPResponse::HTTP_TEMPORARY_REDIRECT; }
@ -351,4 +380,24 @@ Exception HTTPException::makeExceptionMessage(
uri, static_cast<int>(http_status), reason, body);
}
void markSessionForReuse(Poco::Net::HTTPSession & session)
{
const auto & session_data = session.sessionData();
if (!session_data.empty() && !Poco::AnyCast<HTTPSessionReuseTag>(&session_data))
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Data of an unexpected type ({}) is attached to the session", session_data.type().name());
session.attachSessionData(HTTPSessionReuseTag{});
}
void markSessionForReuse(HTTPSessionPtr session)
{
markSessionForReuse(*session);
}
void markSessionForReuse(PooledHTTPSessionPtr session)
{
markSessionForReuse(static_cast<Poco::Net::HTTPSession &>(*session));
}
}

View File

@ -55,14 +55,38 @@ private:
using PooledHTTPSessionPtr = PoolBase<Poco::Net::HTTPClientSession>::Entry; // SingleEndpointHTTPSessionPool::Entry
using HTTPSessionPtr = std::shared_ptr<Poco::Net::HTTPClientSession>;
/// If a session have this tag attached, it will be reused without calling `reset()` on it.
/// All pooled sessions don't have this tag attached after being taken from a pool.
/// If the request and the response were fully written/read, the client code should add this tag
/// explicitly by calling `markSessionForReuse()`.
struct HTTPSessionReuseTag
{
};
void markSessionForReuse(HTTPSessionPtr session);
void markSessionForReuse(PooledHTTPSessionPtr session);
void setResponseDefaultHeaders(HTTPServerResponse & response, size_t keep_alive_timeout);
/// Create session object to perform requests and set required parameters.
HTTPSessionPtr makeHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, bool resolve_host = true);
/// As previous method creates session, but tooks it from pool, without and with proxy uri.
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host = true);
PooledHTTPSessionPtr makePooledHTTPSession(const Poco::URI & uri, const Poco::URI & proxy_uri, const ConnectionTimeouts & timeouts, size_t per_endpoint_pool_size, bool resolve_host = true);
PooledHTTPSessionPtr makePooledHTTPSession(
const Poco::URI & uri,
const ConnectionTimeouts & timeouts,
size_t per_endpoint_pool_size,
bool resolve_host = true,
bool wait_on_pool_size_limit = true);
PooledHTTPSessionPtr makePooledHTTPSession(
const Poco::URI & uri,
const Poco::URI & proxy_uri,
const ConnectionTimeouts & timeouts,
size_t per_endpoint_pool_size,
bool resolve_host = true,
bool wait_on_pool_size_limit = true);
bool isRedirect(Poco::Net::HTTPResponse::HTTPStatus status);

View File

@ -1,5 +1,6 @@
#include "config.h"
#include <IO/HTTPCommon.h>
#include <IO/S3Common.h>
#include "config.h"
#if USE_AWS_S3
@ -24,6 +25,8 @@ namespace ProfileEvents
extern const Event ReadBufferFromS3InitMicroseconds;
extern const Event ReadBufferFromS3Bytes;
extern const Event ReadBufferFromS3RequestsErrors;
extern const Event ReadBufferFromS3ResetSessions;
extern const Event ReadBufferFromS3PreservedSessions;
extern const Event ReadBufferSeekCancelConnection;
extern const Event S3GetObject;
extern const Event DiskS3GetObject;
@ -31,6 +34,46 @@ namespace ProfileEvents
extern const Event RemoteReadThrottlerSleepMicroseconds;
}
namespace
{
DB::PooledHTTPSessionPtr getSession(Aws::S3::Model::GetObjectResult & read_result)
{
if (auto * session_aware_stream = dynamic_cast<DB::S3::SessionAwareIOStream<DB::PooledHTTPSessionPtr> *>(&read_result.GetBody()))
return static_cast<DB::PooledHTTPSessionPtr &>(session_aware_stream->getSession());
else if (!dynamic_cast<DB::S3::SessionAwareIOStream<DB::HTTPSessionPtr> *>(&read_result.GetBody()))
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Session of unexpected type encountered");
return {};
}
void resetSession(Aws::S3::Model::GetObjectResult & read_result)
{
if (auto session = getSession(read_result); !session.isNull())
{
auto & http_session = static_cast<Poco::Net::HTTPClientSession &>(*session);
http_session.reset();
}
}
void resetSessionIfNeeded(bool read_all_range_successfully, std::optional<Aws::S3::Model::GetObjectResult> & read_result)
{
if (!read_result)
return;
if (!read_all_range_successfully)
{
/// When we abandon a session with an ongoing GetObject request and there is another one trying to delete the same object this delete
/// operation will hang until GetObject's session idle timeouts. So we have to call `reset()` on GetObject's session session immediately.
resetSession(*read_result);
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3ResetSessions);
}
else if (auto session = getSession(*read_result); !session.isNull())
{
DB::markSessionForReuse(session);
ProfileEvents::increment(ProfileEvents::ReadBufferFromS3PreservedSessions);
}
}
}
namespace DB
{
namespace ErrorCodes
@ -154,7 +197,10 @@ bool ReadBufferFromS3::nextImpl()
}
if (!next_result)
{
read_all_range_successfully = true;
return false;
}
BufferBase::set(impl->buffer().begin(), impl->buffer().size(), impl->offset());
@ -240,6 +286,8 @@ off_t ReadBufferFromS3::seek(off_t offset_, int whence)
if (offset_ == getPosition() && whence == SEEK_SET)
return offset_;
read_all_range_successfully = false;
if (impl && restricted_seek)
{
throw Exception(
@ -312,6 +360,8 @@ void ReadBufferFromS3::setReadUntilPosition(size_t position)
{
if (position != static_cast<size_t>(read_until_position))
{
read_all_range_successfully = false;
if (impl)
{
if (!atEndOfRequestedRangeGuess())
@ -328,6 +378,8 @@ void ReadBufferFromS3::setReadUntilEnd()
{
if (read_until_position)
{
read_all_range_successfully = false;
read_until_position = 0;
if (impl)
{
@ -351,8 +403,23 @@ bool ReadBufferFromS3::atEndOfRequestedRangeGuess()
return false;
}
ReadBufferFromS3::~ReadBufferFromS3()
{
try
{
resetSessionIfNeeded(readAllRangeSuccessfully(), read_result);
}
catch (...)
{
tryLogCurrentException(log);
}
}
std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
{
resetSessionIfNeeded(readAllRangeSuccessfully(), read_result);
read_all_range_successfully = false;
/**
* If remote_filesystem_read_method = 'threadpool', then for MergeTree family tables
* exact byte ranges to read are always passed here.
@ -363,7 +430,7 @@ std::unique_ptr<ReadBuffer> ReadBufferFromS3::initialize()
read_result = sendRequest(offset, read_until_position ? std::make_optional(read_until_position - 1) : std::nullopt);
size_t buffer_size = use_external_buffer ? 0 : read_settings.remote_fs_buffer_size;
return std::make_unique<ReadBufferFromIStream>(read_result.GetBody(), buffer_size);
return std::make_unique<ReadBufferFromIStream>(read_result->GetBody(), buffer_size);
}
Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t range_begin, std::optional<size_t> range_end_incl) const
@ -415,6 +482,10 @@ Aws::S3::Model::GetObjectResult ReadBufferFromS3::sendRequest(size_t range_begin
}
}
bool ReadBufferFromS3::readAllRangeSuccessfully() const
{
return read_until_position ? offset == read_until_position : read_all_range_successfully;
}
}
#endif

View File

@ -41,7 +41,7 @@ private:
std::atomic<off_t> offset = 0;
std::atomic<off_t> read_until_position = 0;
Aws::S3::Model::GetObjectResult read_result;
std::optional<Aws::S3::Model::GetObjectResult> read_result;
std::unique_ptr<ReadBuffer> impl;
Poco::Logger * log = &Poco::Logger::get("ReadBufferFromS3");
@ -60,6 +60,8 @@ public:
bool restricted_seek_ = false,
std::optional<size_t> file_size = std::nullopt);
~ReadBufferFromS3() override;
bool nextImpl() override;
off_t seek(off_t off, int whence) override;
@ -93,6 +95,8 @@ private:
Aws::S3::Model::GetObjectResult sendRequest(size_t range_begin, std::optional<size_t> range_end_incl) const;
bool readAllRangeSuccessfully() const;
ReadSettings read_settings;
bool use_external_buffer;
@ -100,6 +104,8 @@ private:
/// There is different seek policy for disk seek and for non-disk seek
/// (non-disk seek is applied for seekable input formats: orc, arrow, parquet).
bool restricted_seek;
bool read_all_range_successfully = false;
};
}

View File

@ -1,8 +1,11 @@
#include "ReadWriteBufferFromHTTP.h"
#include <IO/HTTPCommon.h>
namespace ProfileEvents
{
extern const Event ReadBufferSeekCancelConnection;
extern const Event ReadWriteBufferFromHTTPPreservedSessions;
}
namespace DB
@ -146,30 +149,20 @@ std::istream * ReadWriteBufferFromHTTPBase<UpdatableSessionPtr>::callImpl(
LOG_TRACE(log, "Sending request to {}", uri_.toString());
auto sess = current_session->getSession();
try
{
auto & stream_out = sess->sendRequest(request);
auto & stream_out = sess->sendRequest(request);
if (out_stream_callback)
out_stream_callback(stream_out);
if (out_stream_callback)
out_stream_callback(stream_out);
auto result_istr = receiveResponse(*sess, request, response, true);
response.getCookies(cookies);
auto result_istr = receiveResponse(*sess, request, response, true);
response.getCookies(cookies);
/// we can fetch object info while the request is being processed
/// and we don't want to override any context used by it
if (!for_object_info)
content_encoding = response.get("Content-Encoding", "");
/// we can fetch object info while the request is being processed
/// and we don't want to override any context used by it
if (!for_object_info)
content_encoding = response.get("Content-Encoding", "");
return result_istr;
}
catch (const Poco::Exception & e)
{
/// We use session data storage as storage for exception text
/// Depend on it we can deduce to reconnect session or reresolve session host
sess->attachSessionData(e.message());
throw;
}
return result_istr;
}
template <typename UpdatableSessionPtr>
@ -429,23 +422,10 @@ void ReadWriteBufferFromHTTPBase<UpdatableSessionPtr>::initialize()
if (!read_range.end && response.hasContentLength())
file_info = parseFileInfo(response, withPartialContent(read_range) ? getOffset() : 0);
try
{
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size);
impl = std::make_unique<ReadBufferFromIStream>(*istr, buffer_size);
if (use_external_buffer)
{
setupExternalBuffer();
}
}
catch (const Poco::Exception & e)
{
/// We use session data storage as storage for exception text
/// Depend on it we can deduce to reconnect session or reresolve session host
auto sess = session->getSession();
sess->attachSessionData(e.message());
throw;
}
if (use_external_buffer)
setupExternalBuffer();
}
template <typename UpdatableSessionPtr>
@ -460,7 +440,12 @@ bool ReadWriteBufferFromHTTPBase<UpdatableSessionPtr>::nextImpl()
if ((read_range.end && getOffset() > read_range.end.value()) ||
(file_info && file_info->file_size && getOffset() >= file_info->file_size.value()))
{
/// Response was fully read.
markSessionForReuse(session->getSession());
ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPPreservedSessions);
return false;
}
if (impl)
{
@ -582,7 +567,12 @@ bool ReadWriteBufferFromHTTPBase<UpdatableSessionPtr>::nextImpl()
std::rethrow_exception(exception);
if (!result)
{
/// Eof is reached, i.e response was fully read.
markSessionForReuse(session->getSession());
ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPPreservedSessions);
return false;
}
internal_buffer = impl->buffer();
working_buffer = internal_buffer;
@ -635,12 +625,17 @@ size_t ReadWriteBufferFromHTTPBase<UpdatableSessionPtr>::readBigAt(char * to, si
bool cancelled;
size_t r = copyFromIStreamWithProgressCallback(*result_istr, to, n, progress_callback, &cancelled);
if (!cancelled)
{
/// Response was fully read.
markSessionForReuse(sess);
ProfileEvents::increment(ProfileEvents::ReadWriteBufferFromHTTPPreservedSessions);
}
return r;
}
catch (const Poco::Exception & e)
{
sess->attachSessionData(e.message());
LOG_ERROR(
log,
"HTTP request (positioned) to `{}` with range [{}, {}) failed at try {}/{}: {}",

View File

@ -100,7 +100,7 @@ std::unique_ptr<Client> Client::create(
size_t max_redirects_,
ServerSideEncryptionKMSConfig sse_kms_config_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider,
const Aws::Client::ClientConfiguration & client_configuration,
const PocoHTTPClientConfiguration & client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing)
{
@ -109,9 +109,16 @@ std::unique_ptr<Client> Client::create(
new Client(max_redirects_, std::move(sse_kms_config_), credentials_provider, client_configuration, sign_payloads, use_virtual_addressing));
}
std::unique_ptr<Client> Client::create(const Client & other)
std::unique_ptr<Client> Client::clone(
std::optional<std::shared_ptr<RetryStrategy>> override_retry_strategy,
std::optional<Int64> override_request_timeout_ms) const
{
return std::unique_ptr<Client>(new Client(other));
PocoHTTPClientConfiguration new_configuration = client_configuration;
if (override_retry_strategy.has_value())
new_configuration.retryStrategy = *override_retry_strategy;
if (override_request_timeout_ms.has_value())
new_configuration.requestTimeoutMs = *override_request_timeout_ms;
return std::unique_ptr<Client>(new Client(*this, new_configuration));
}
namespace
@ -134,11 +141,14 @@ Client::Client(
size_t max_redirects_,
ServerSideEncryptionKMSConfig sse_kms_config_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider_,
const Aws::Client::ClientConfiguration & client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing)
: Aws::S3::S3Client(credentials_provider_, client_configuration, std::move(sign_payloads), use_virtual_addressing)
const PocoHTTPClientConfiguration & client_configuration_,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads_,
bool use_virtual_addressing_)
: Aws::S3::S3Client(credentials_provider_, client_configuration_, sign_payloads_, use_virtual_addressing_)
, credentials_provider(credentials_provider_)
, client_configuration(client_configuration_)
, sign_payloads(sign_payloads_)
, use_virtual_addressing(use_virtual_addressing_)
, max_redirects(max_redirects_)
, sse_kms_config(std::move(sse_kms_config_))
, log(&Poco::Logger::get("S3Client"))
@ -175,10 +185,15 @@ Client::Client(
ClientCacheRegistry::instance().registerClient(cache);
}
Client::Client(const Client & other)
: Aws::S3::S3Client(other)
Client::Client(
const Client & other, const PocoHTTPClientConfiguration & client_configuration_)
: Aws::S3::S3Client(other.credentials_provider, client_configuration_, other.sign_payloads,
other.use_virtual_addressing)
, initial_endpoint(other.initial_endpoint)
, credentials_provider(other.credentials_provider)
, client_configuration(client_configuration_)
, sign_payloads(other.sign_payloads)
, use_virtual_addressing(other.use_virtual_addressing)
, explicit_region(other.explicit_region)
, detect_region(other.detect_region)
, provider_type(other.provider_type)

View File

@ -105,6 +105,8 @@ private:
class Client : private Aws::S3::S3Client
{
public:
class RetryStrategy;
/// we use a factory method to verify arguments before creating a client because
/// there are certain requirements on arguments for it to work correctly
/// e.g. Client::RetryStrategy should be used
@ -112,11 +114,19 @@ public:
size_t max_redirects_,
ServerSideEncryptionKMSConfig sse_kms_config_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider,
const Aws::Client::ClientConfiguration & client_configuration,
const PocoHTTPClientConfiguration & client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing);
static std::unique_ptr<Client> create(const Client & other);
/// Create a client with adjusted settings:
/// * override_retry_strategy can be used to disable retries to avoid nested retries when we have
/// a retry loop outside of S3 client. Specifically, for read and write buffers. Currently not
/// actually used.
/// * override_request_timeout_ms is used to increase timeout for CompleteMultipartUploadRequest
/// because it often sits idle for 10 seconds: https://github.com/ClickHouse/ClickHouse/pull/42321
std::unique_ptr<Client> clone(
std::optional<std::shared_ptr<RetryStrategy>> override_retry_strategy = std::nullopt,
std::optional<Int64> override_request_timeout_ms = std::nullopt) const;
Client & operator=(const Client &) = delete;
@ -211,11 +221,12 @@ private:
Client(size_t max_redirects_,
ServerSideEncryptionKMSConfig sse_kms_config_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider_,
const Aws::Client::ClientConfiguration& client_configuration,
const PocoHTTPClientConfiguration & client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing);
Client(const Client & other);
Client(
const Client & other, const PocoHTTPClientConfiguration & client_configuration);
/// Leave regular functions private so we don't accidentally use them
/// otherwise region and endpoint redirection won't work
@ -251,6 +262,9 @@ private:
String initial_endpoint;
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;
PocoHTTPClientConfiguration client_configuration;
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads;
bool use_virtual_addressing;
std::string explicit_region;
mutable bool detect_region = true;

View File

@ -1,3 +1,4 @@
#include <Poco/Timespan.h>
#include "Common/DNSResolver.h"
#include "config.h"
@ -138,8 +139,9 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & client_config
, timeouts(ConnectionTimeouts(
Poco::Timespan(client_configuration.connectTimeoutMs * 1000), /// connection timeout.
Poco::Timespan(client_configuration.requestTimeoutMs * 1000), /// send timeout.
Poco::Timespan(client_configuration.requestTimeoutMs * 1000) /// receive timeout.
))
Poco::Timespan(client_configuration.requestTimeoutMs * 1000), /// receive timeout.
Poco::Timespan(client_configuration.enableTcpKeepAlive ? client_configuration.tcpKeepAliveIntervalMs * 1000 : 0),
Poco::Timespan(client_configuration.http_keep_alive_timeout_ms * 1000))) /// flag indicating whether keep-alive is enabled is set to each session upon creation
, remote_host_filter(client_configuration.remote_host_filter)
, s3_max_redirects(client_configuration.s3_max_redirects)
, enable_s3_requests_logging(client_configuration.enable_s3_requests_logging)
@ -147,6 +149,8 @@ PocoHTTPClient::PocoHTTPClient(const PocoHTTPClientConfiguration & client_config
, get_request_throttler(client_configuration.get_request_throttler)
, put_request_throttler(client_configuration.put_request_throttler)
, extra_headers(client_configuration.extra_headers)
, http_connection_pool_size(client_configuration.http_connection_pool_size)
, wait_on_pool_size_limit(client_configuration.wait_on_pool_size_limit)
{
}
@ -254,9 +258,27 @@ void PocoHTTPClient::addMetric(const Aws::Http::HttpRequest & request, S3MetricT
void PocoHTTPClient::makeRequestInternal(
Aws::Http::HttpRequest & request,
std::shared_ptr<PocoHTTPResponse> & response,
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter ,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const
{
/// Most sessions in pool are already connected and it is not possible to set proxy host/port to a connected session.
const auto request_configuration = per_request_configuration(request);
if (http_connection_pool_size && request_configuration.proxy_host.empty())
makeRequestInternalImpl<true>(request, request_configuration, response, readLimiter, writeLimiter);
else
makeRequestInternalImpl<false>(request, request_configuration, response, readLimiter, writeLimiter);
}
template <bool pooled>
void PocoHTTPClient::makeRequestInternalImpl(
Aws::Http::HttpRequest & request,
const ClientConfigurationPerRequest & request_configuration,
std::shared_ptr<PocoHTTPResponse> & response,
Aws::Utils::RateLimits::RateLimiterInterface *,
Aws::Utils::RateLimits::RateLimiterInterface *) const
{
using SessionPtr = std::conditional_t<pooled, PooledHTTPSessionPtr, HTTPSessionPtr>;
Poco::Logger * log = &Poco::Logger::get("AWSClient");
auto uri = request.GetUri().GetURIString();
@ -303,8 +325,7 @@ void PocoHTTPClient::makeRequestInternal(
for (unsigned int attempt = 0; attempt <= s3_max_redirects; ++attempt)
{
Poco::URI target_uri(uri);
HTTPSessionPtr session;
auto request_configuration = per_request_configuration(request);
SessionPtr session;
if (!request_configuration.proxy_host.empty())
{
@ -313,7 +334,11 @@ void PocoHTTPClient::makeRequestInternal(
/// Reverse proxy can replace host header with resolved ip address instead of host name.
/// This can lead to request signature difference on S3 side.
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ false);
if constexpr (pooled)
session = makePooledHTTPSession(
target_uri, timeouts, http_connection_pool_size, /* resolve_host = */ true, wait_on_pool_size_limit);
else
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ false);
bool use_tunnel = request_configuration.proxy_scheme == Aws::Http::Scheme::HTTP && target_uri.getScheme() == "https";
session->setProxy(
@ -325,7 +350,11 @@ void PocoHTTPClient::makeRequestInternal(
}
else
{
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ true);
if constexpr (pooled)
session = makePooledHTTPSession(
target_uri, timeouts, http_connection_pool_size, /* resolve_host = */ true, wait_on_pool_size_limit);
else
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ false);
}
/// In case of error this address will be written to logs

View File

@ -53,6 +53,13 @@ struct PocoHTTPClientConfiguration : public Aws::Client::ClientConfiguration
ThrottlerPtr put_request_throttler;
HTTPHeaderEntries extra_headers;
/// Not a client parameter in terms of HTTP and we won't send it to the server. Used internally to determine when connection have to be re-established.
uint32_t http_keep_alive_timeout_ms = 0;
/// Zero means pooling will not be used.
size_t http_connection_pool_size = 0;
/// See PoolBase::BehaviourOnLimit
bool wait_on_pool_size_limit = true;
void updateSchemeAndRegion();
std::function<void(const ClientConfigurationPerRequest &)> error_report;
@ -90,6 +97,12 @@ public:
);
}
void SetResponseBody(Aws::IStream & incoming_stream, PooledHTTPSessionPtr & session_) /// NOLINT
{
body_stream = Aws::Utils::Stream::ResponseStream(
Aws::New<SessionAwareIOStream<PooledHTTPSessionPtr>>("http result streambuf", session_, incoming_stream.rdbuf()));
}
void SetResponseBody(std::string & response_body) /// NOLINT
{
auto stream = Aws::New<std::stringstream>("http result buf", response_body); // STYLE_CHECK_ALLOW_STD_STRING_STREAM
@ -149,6 +162,15 @@ private:
EnumSize,
};
template <bool pooled>
void makeRequestInternalImpl(
Aws::Http::HttpRequest & request,
const ClientConfigurationPerRequest & per_request_configuration,
std::shared_ptr<PocoHTTPResponse> & response,
Aws::Utils::RateLimits::RateLimiterInterface * readLimiter,
Aws::Utils::RateLimits::RateLimiterInterface * writeLimiter) const;
protected:
static S3MetricKind getMetricKind(const Aws::Http::HttpRequest & request);
void addMetric(const Aws::Http::HttpRequest & request, S3MetricType type, ProfileEvents::Count amount = 1) const;
@ -170,6 +192,9 @@ private:
ThrottlerPtr put_request_throttler;
const HTTPHeaderEntries extra_headers;
size_t http_connection_pool_size = 0;
bool wait_on_pool_size_limit = true;
};
}

View File

@ -18,6 +18,10 @@ public:
{
}
Session & getSession() { return session; }
const Session & getSession() const { return session; }
private:
/// Poco HTTP session is holder of response stream.
Session session;

View File

@ -89,6 +89,7 @@ void doWriteRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::
DB::S3Settings::RequestSettings request_settings;
request_settings.max_unexpected_write_error_retries = max_unexpected_write_error_retries;
DB::WriteBufferFromS3 write_buffer(
client,
client,
uri.bucket,
uri.key,

Some files were not shown because too many files have changed in this diff Show More