Merge branch 'master' into clickhouse_as_library_2

This commit is contained in:
Alexey Milovidov 2023-07-09 08:47:53 +03:00 committed by GitHub
commit d2fca36983
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
377 changed files with 3738 additions and 2451 deletions

View File

@ -850,6 +850,48 @@ jobs:
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinRISCV64:
needs: [DockerHubPush]
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
BUILD_NAME=binary_riscv64
EOF
- name: Download changed images
uses: actions/download-artifact@v3
with:
name: changed_images
path: ${{ env.IMAGES_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
submodules: true
fetch-depth: 0 # otherwise we will have no info about contributors
- name: Build
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
- name: Upload build URLs to artifacts
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v3
with:
name: ${{ env.BUILD_URLS }}
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
############################################################################################
##################################### Docker images #######################################
############################################################################################
@ -932,6 +974,7 @@ jobs:
- BuilderBinDarwinAarch64
- BuilderBinFreeBSD
- BuilderBinPPC64
- BuilderBinRISCV64
- BuilderBinAmd64Compat
- BuilderBinAarch64V80Compat
- BuilderBinClangTidy

View File

@ -75,51 +75,6 @@ jobs:
Codebrowser:
needs: [DockerHubPush]
uses: ./.github/workflows/woboq.yml
BuilderCoverity:
needs: DockerHubPush
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
BUILD_NAME=coverity
CACHES_PATH=${{runner.temp}}/../ccaches
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
TEMP_PATH=${{runner.temp}}/build_check
EOF
echo "COVERITY_TOKEN=${{ secrets.COVERITY_TOKEN }}" >> "$GITHUB_ENV"
- name: Download changed images
uses: actions/download-artifact@v3
with:
name: changed_images
path: ${{ env.IMAGES_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
submodules: true
- name: Build
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
- name: Upload Coverity Analysis
if: ${{ success() || failure() }}
run: |
curl --form token="${COVERITY_TOKEN}" \
--form email='security+coverity@clickhouse.com' \
--form file="@$TEMP_PATH/$BUILD_NAME/coverity-scan.tar.gz" \
--form version="${GITHUB_REF#refs/heads/}-${GITHUB_SHA::6}" \
--form description="Nighly Scan: $(date +'%Y-%m-%dT%H:%M:%S')" \
https://scan.coverity.com/builds?project=ClickHouse%2FClickHouse
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
SonarCloud:
runs-on: [self-hosted, builder]
env:

View File

@ -911,6 +911,47 @@ jobs:
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinRISCV64:
needs: [DockerHubPush, FastTest, StyleCheck]
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
BUILD_NAME=binary_riscv64
EOF
- name: Download changed images
uses: actions/download-artifact@v3
with:
name: changed_images
path: ${{ env.IMAGES_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
submodules: true
- name: Build
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
- name: Upload build URLs to artifacts
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v3
with:
name: ${{ env.BUILD_URLS }}
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
############################################################################################
##################################### Docker images #######################################
############################################################################################
@ -992,6 +1033,7 @@ jobs:
- BuilderBinDarwinAarch64
- BuilderBinFreeBSD
- BuilderBinPPC64
- BuilderBinRISCV64
- BuilderBinAmd64Compat
- BuilderBinAarch64V80Compat
- BuilderBinClangTidy

View File

@ -87,7 +87,6 @@ if (ENABLE_FUZZING)
set (ENABLE_CLICKHOUSE_ODBC_BRIDGE OFF)
set (ENABLE_LIBRARIES 0)
set (ENABLE_SSL 1)
set (USE_UNWIND ON)
set (ENABLE_EMBEDDED_COMPILER 0)
set (ENABLE_EXAMPLES 0)
set (ENABLE_UTILS 0)
@ -344,9 +343,9 @@ if (COMPILER_CLANG)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fdiagnostics-absolute-paths")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fdiagnostics-absolute-paths")
if (NOT ENABLE_TESTS AND NOT SANITIZE)
if (NOT ENABLE_TESTS AND NOT SANITIZE AND OS_LINUX)
# https://clang.llvm.org/docs/ThinLTO.html
# Applies to clang only.
# Applies to clang and linux only.
# Disabled when building with tests or sanitizers.
option(ENABLE_THINLTO "Clang-specific link time optimization" ON)
endif()

View File

@ -15,9 +15,8 @@
static thread_local uint64_t current_tid = 0;
uint64_t getThreadId()
{
if (!current_tid)
static void setCurrentThreadId()
{
#if defined(OS_ANDROID)
current_tid = gettid();
@ -35,5 +34,15 @@ uint64_t getThreadId()
#endif
}
uint64_t getThreadId()
{
if (!current_tid)
setCurrentThreadId();
return current_tid;
}
void updateCurrentThreadIdAfterFork()
{
setCurrentThreadId();
}

View File

@ -3,3 +3,5 @@
/// Obtain thread id from OS. The value is cached in thread local variable.
uint64_t getThreadId();
void updateCurrentThreadIdAfterFork();

View File

@ -15,6 +15,7 @@ set(CMAKE_OSX_DEPLOYMENT_TARGET 10.15)
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
include (cmake/unwind.cmake)
include (cmake/cxx.cmake)
link_libraries(global-group)

View File

@ -18,6 +18,9 @@ if (NOT PARALLEL_COMPILE_JOBS AND TOTAL_PHYSICAL_MEMORY AND MAX_COMPILER_MEMORY)
if (NOT PARALLEL_COMPILE_JOBS)
set (PARALLEL_COMPILE_JOBS 1)
endif ()
if (NOT NUMBER_OF_LOGICAL_CORES OR PARALLEL_COMPILE_JOBS LESS NUMBER_OF_LOGICAL_CORES)
set (PARALLEL_COMPILE_JOBS_LESS TRUE)
endif()
endif ()
if (PARALLEL_COMPILE_JOBS AND (NOT NUMBER_OF_LOGICAL_CORES OR PARALLEL_COMPILE_JOBS LESS NUMBER_OF_LOGICAL_CORES))
@ -33,6 +36,9 @@ if (NOT PARALLEL_LINK_JOBS AND TOTAL_PHYSICAL_MEMORY AND MAX_LINKER_MEMORY)
if (NOT PARALLEL_LINK_JOBS)
set (PARALLEL_LINK_JOBS 1)
endif ()
if (NOT NUMBER_OF_LOGICAL_CORES OR PARALLEL_LINK_JOBS LESS NUMBER_OF_LOGICAL_CORES)
set (PARALLEL_LINK_JOBS_LESS TRUE)
endif()
endif ()
# ThinLTO provides its own parallel linking
@ -56,4 +62,10 @@ if (PARALLEL_COMPILE_JOBS OR PARALLEL_LINK_JOBS)
message(STATUS
"${CMAKE_CURRENT_SOURCE_DIR}: Have ${TOTAL_PHYSICAL_MEMORY} megabytes of memory.
Limiting concurrent linkers jobs to ${PARALLEL_LINK_JOBS} and compiler jobs to ${PARALLEL_COMPILE_JOBS} (system has ${NUMBER_OF_LOGICAL_CORES} logical cores)")
if (PARALLEL_COMPILE_JOBS_LESS)
message(WARNING "The autocalculated compile jobs limit (${PARALLEL_COMPILE_JOBS}) underutilizes CPU cores (${NUMBER_OF_LOGICAL_CORES}). Set PARALLEL_COMPILE_JOBS to override.")
endif()
if (PARALLEL_LINK_JOBS_LESS)
message(WARNING "The autocalculated link jobs limit (${PARALLEL_LINK_JOBS}) underutilizes CPU cores (${NUMBER_OF_LOGICAL_CORES}). Set PARALLEL_LINK_JOBS to override.")
endif()
endif ()

View File

@ -33,6 +33,18 @@ if (CMAKE_CROSSCOMPILING)
elseif (ARCH_PPC64LE)
set (ENABLE_GRPC OFF CACHE INTERNAL "")
set (ENABLE_SENTRY OFF CACHE INTERNAL "")
elseif (ARCH_RISCV64)
# RISC-V support is preliminary
set (GLIBC_COMPATIBILITY OFF CACHE INTERNAL "")
set (ENABLE_LDAP OFF CACHE INTERNAL "")
set (OPENSSL_NO_ASM ON CACHE INTERNAL "")
set (ENABLE_JEMALLOC ON CACHE INTERNAL "")
set (ENABLE_PARQUET OFF CACHE INTERNAL "")
set (ENABLE_GRPC OFF CACHE INTERNAL "")
set (ENABLE_HDFS OFF CACHE INTERNAL "")
set (ENABLE_MYSQL OFF CACHE INTERNAL "")
# It might be ok, but we need to update 'sysroot'
set (ENABLE_RUST OFF CACHE INTERNAL "")
elseif (ARCH_S390X)
set (ENABLE_GRPC OFF CACHE INTERNAL "")
set (ENABLE_SENTRY OFF CACHE INTERNAL "")

View File

@ -1,13 +1 @@
option (USE_UNWIND "Enable libunwind (better stacktraces)" ${ENABLE_LIBRARIES})
if (USE_UNWIND)
add_subdirectory(contrib/libunwind-cmake)
set (UNWIND_LIBRARIES unwind)
set (EXCEPTION_HANDLING_LIBRARY ${UNWIND_LIBRARIES})
message (STATUS "Using libunwind: ${UNWIND_LIBRARIES}")
else ()
set (EXCEPTION_HANDLING_LIBRARY gcc_eh)
endif ()
message (STATUS "Using exception handler: ${EXCEPTION_HANDLING_LIBRARY}")

View File

@ -170,16 +170,13 @@ endif ()
target_compile_definitions(_jemalloc PRIVATE -DJEMALLOC_PROF=1)
if (USE_UNWIND)
# jemalloc provides support for two different libunwind flavors: the original HP libunwind and the one coming with gcc / g++ / libstdc++.
# The latter is identified by `JEMALLOC_PROF_LIBGCC` and uses `_Unwind_Backtrace` method instead of `unw_backtrace`.
# At the time ClickHouse uses LLVM libunwind which follows libgcc's way of backtracing.
#
# ClickHouse has to provide `unw_backtrace` method by the means of [commit 8e2b31e](https://github.com/ClickHouse/libunwind/commit/8e2b31e766dd502f6df74909e04a7dbdf5182eb1).
target_compile_definitions (_jemalloc PRIVATE -DJEMALLOC_PROF_LIBGCC=1)
target_link_libraries (_jemalloc PRIVATE unwind)
endif ()
# for RTLD_NEXT
target_compile_options(_jemalloc PRIVATE -D_GNU_SOURCE)

View File

@ -61,9 +61,7 @@ target_include_directories(cxx SYSTEM BEFORE PUBLIC $<$<COMPILE_LANGUAGE:CXX>:$
target_compile_definitions(cxx PRIVATE -D_LIBCPP_BUILDING_LIBRARY -DLIBCXX_BUILDING_LIBCXXABI)
# Enable capturing stack traces for all exceptions.
if (USE_UNWIND)
target_compile_definitions(cxx PUBLIC -DSTD_EXCEPTION_HAS_STACK_TRACE=1)
endif ()
if (USE_MUSL)
target_compile_definitions(cxx PUBLIC -D_LIBCPP_HAS_MUSL_LIBC=1)

View File

@ -35,12 +35,10 @@ target_include_directories(cxxabi SYSTEM BEFORE
)
target_compile_definitions(cxxabi PRIVATE -D_LIBCPP_BUILDING_LIBRARY)
target_compile_options(cxxabi PRIVATE -nostdinc++ -fno-sanitize=undefined -Wno-macro-redefined) # If we don't disable UBSan, infinite recursion happens in dynamic_cast.
target_link_libraries(cxxabi PUBLIC ${EXCEPTION_HANDLING_LIBRARY})
target_link_libraries(cxxabi PUBLIC unwind)
# Enable capturing stack traces for all exceptions.
if (USE_UNWIND)
target_compile_definitions(cxxabi PUBLIC -DSTD_EXCEPTION_HAS_STACK_TRACE=1)
endif ()
install(
TARGETS cxxabi

View File

@ -32,7 +32,7 @@ RUN arch=${TARGETARCH:-amd64} \
esac
ARG REPOSITORY="https://s3.amazonaws.com/clickhouse-builds/22.4/31c367d3cd3aefd316778601ff6565119fe36682/package_release"
ARG VERSION="23.6.1.1524"
ARG VERSION="23.6.2.18"
ARG PACKAGES="clickhouse-keeper"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -49,8 +49,8 @@ ENV CARGO_HOME=/rust/cargo
ENV PATH="/rust/cargo/bin:${PATH}"
RUN curl https://sh.rustup.rs -sSf | bash -s -- -y && \
chmod 777 -R /rust && \
rustup toolchain install nightly && \
rustup default nightly && \
rustup toolchain install nightly-2023-07-04 && \
rustup default nightly-2023-07-04 && \
rustup component add rust-src && \
rustup target add aarch64-unknown-linux-gnu && \
rustup target add x86_64-apple-darwin && \

View File

@ -138,6 +138,7 @@ def parse_env_variables(
ARM_V80COMPAT_SUFFIX = "-aarch64-v80compat"
FREEBSD_SUFFIX = "-freebsd"
PPC_SUFFIX = "-ppc64le"
RISCV_SUFFIX = "-riscv64"
AMD64_COMPAT_SUFFIX = "-amd64-compat"
result = []
@ -150,6 +151,7 @@ def parse_env_variables(
is_cross_arm = compiler.endswith(ARM_SUFFIX)
is_cross_arm_v80compat = compiler.endswith(ARM_V80COMPAT_SUFFIX)
is_cross_ppc = compiler.endswith(PPC_SUFFIX)
is_cross_riscv = compiler.endswith(RISCV_SUFFIX)
is_cross_freebsd = compiler.endswith(FREEBSD_SUFFIX)
is_amd64_compat = compiler.endswith(AMD64_COMPAT_SUFFIX)
@ -206,6 +208,11 @@ def parse_env_variables(
cmake_flags.append(
"-DCMAKE_TOOLCHAIN_FILE=/build/cmake/linux/toolchain-ppc64le.cmake"
)
elif is_cross_riscv:
cc = compiler[: -len(RISCV_SUFFIX)]
cmake_flags.append(
"-DCMAKE_TOOLCHAIN_FILE=/build/cmake/linux/toolchain-riscv64.cmake"
)
elif is_amd64_compat:
cc = compiler[: -len(AMD64_COMPAT_SUFFIX)]
result.append("DEB_ARCH=amd64")
@ -370,6 +377,7 @@ def parse_args() -> argparse.Namespace:
"clang-16-aarch64",
"clang-16-aarch64-v80compat",
"clang-16-ppc64le",
"clang-16-riscv64",
"clang-16-amd64-compat",
"clang-16-freebsd",
),

View File

@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
# lts / testing / prestable / etc
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
ARG VERSION="23.6.1.1524"
ARG VERSION="23.6.2.18"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# user/group precreated explicitly with fixed uid/gid on purpose.

View File

@ -23,7 +23,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
ARG REPO_CHANNEL="stable"
ARG REPOSITORY="deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
ARG VERSION="23.6.1.1524"
ARG VERSION="23.6.2.18"
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
# set non-empty deb_location_url url to create a docker image

View File

@ -166,7 +166,6 @@ function run_cmake
"-DENABLE_UTILS=0"
"-DENABLE_EMBEDDED_COMPILER=0"
"-DENABLE_THINLTO=0"
"-DUSE_UNWIND=1"
"-DENABLE_NURAFT=1"
"-DENABLE_SIMDJSON=1"
"-DENABLE_JEMALLOC=1"

View File

@ -291,7 +291,7 @@ quit
if [ "$server_died" == 1 ]
then
# The server has died.
if ! rg --text -o 'Received signal.*|Logical error.*|Assertion.*failed|Failed assertion.*|.*runtime error: .*|.*is located.*|(SUMMARY|ERROR): [a-zA-Z]+Sanitizer:.*|.*_LIBCPP_ASSERT.*' server.log > description.txt
if ! rg --text -o 'Received signal.*|Logical error.*|Assertion.*failed|Failed assertion.*|.*runtime error: .*|.*is located.*|(SUMMARY|ERROR): [a-zA-Z]+Sanitizer:.*|.*_LIBCPP_ASSERT.*|.*Child process was terminated by signal 9.*' server.log > description.txt
then
echo "Lost connection to server. See the logs." > description.txt
fi

View File

@ -92,8 +92,8 @@ sudo clickhouse stop ||:
for _ in $(seq 1 60); do if [[ $(wget --timeout=1 -q 'localhost:8123' -O-) == 'Ok.' ]]; then sleep 1 ; else break; fi ; done
grep -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server.log ||:
pigz < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.gz &
rg -Fa "Fatal" /var/log/clickhouse-server/clickhouse-server.log ||:
zstd < /var/log/clickhouse-server/clickhouse-server.log > /test_output/clickhouse-server.log.zst &
# Compressed (FIXME: remove once only github actions will be left)
rm /var/log/clickhouse-server/clickhouse-server.log

View File

@ -33,7 +33,6 @@ RUN apt-get update -y \
qemu-user-static \
sqlite3 \
sudo \
telnet \
tree \
unixodbc \
wget \

View File

@ -8,8 +8,6 @@ RUN apt-get update -y \
apt-get install --yes --no-install-recommends \
bash \
tzdata \
fakeroot \
debhelper \
parallel \
expect \
python3 \
@ -20,7 +18,6 @@ RUN apt-get update -y \
sudo \
openssl \
netcat-openbsd \
telnet \
brotli \
&& apt-get clean

View File

@ -8,8 +8,6 @@ RUN apt-get update -y \
apt-get install --yes --no-install-recommends \
bash \
tzdata \
fakeroot \
debhelper \
parallel \
expect \
python3 \
@ -20,7 +18,6 @@ RUN apt-get update -y \
sudo \
openssl \
netcat-openbsd \
telnet \
brotli \
&& apt-get clean

View File

@ -67,6 +67,13 @@ start
stop
mv /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.initial.log
# Start server from previous release
# Let's enable S3 storage by default
export USE_S3_STORAGE_FOR_MERGE_TREE=1
# Previous version may not be ready for fault injections
export ZOOKEEPER_FAULT_INJECTION=0
configure
# force_sync=false doesn't work correctly on some older versions
sudo cat /etc/clickhouse-server/config.d/keeper_port.xml \
| sed "s|<force_sync>false</force_sync>|<force_sync>true</force_sync>|" \
@ -81,13 +88,6 @@ mv /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp /etc/cli
sudo chown clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
# Start server from previous release
# Let's enable S3 storage by default
export USE_S3_STORAGE_FOR_MERGE_TREE=1
# Previous version may not be ready for fault injections
export ZOOKEEPER_FAULT_INJECTION=0
configure
# it contains some new settings, but we can safely remove it
rm /etc/clickhouse-server/config.d/merge_tree.xml
rm /etc/clickhouse-server/users.d/nonconst_timezone.xml

View File

@ -44,7 +44,6 @@ RUN apt-get update \
clang-${LLVM_VERSION} \
clang-tidy-${LLVM_VERSION} \
cmake \
fakeroot \
gdb \
git \
gperf \
@ -94,7 +93,10 @@ RUN mkdir /tmp/ccache \
&& rm -rf /tmp/ccache
ARG TARGETARCH
ARG SCCACHE_VERSION=v0.4.1
ARG SCCACHE_VERSION=v0.5.4
ENV SCCACHE_IGNORE_SERVER_IO_ERROR=1
# sccache requires a value for the region. So by default we use The Default Region
ENV SCCACHE_REGION=us-east-1
RUN arch=${TARGETARCH:-amd64} \
&& case $arch in \
amd64) rarch=x86_64 ;; \

View File

@ -33,6 +33,9 @@ then
elif [ "${ARCH}" = "powerpc64le" -o "${ARCH}" = "ppc64le" ]
then
DIR="powerpc64le"
elif [ "${ARCH}" = "riscv64" ]
then
DIR="riscv64"
fi
elif [ "${OS}" = "FreeBSD" ]
then

View File

@ -0,0 +1,20 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v22.8.20.11-lts (c9ca79e24e8) FIXME as compared to v22.8.19.10-lts (989bc2fe8b0)
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix broken index analysis when binary operator contains a null constant argument [#50177](https://github.com/ClickHouse/ClickHouse/pull/50177) ([Amos Bird](https://github.com/amosbird)).
* Fix incorrect constant folding [#50536](https://github.com/ClickHouse/ClickHouse/pull/50536) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Fix fuzzer failure in ActionsDAG [#51301](https://github.com/ClickHouse/ClickHouse/pull/51301) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Fix segfault in MathUnary [#51499](https://github.com/ClickHouse/ClickHouse/pull/51499) ([Ilya Yatsishin](https://github.com/qoega)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Decoupled commits from [#51180](https://github.com/ClickHouse/ClickHouse/issues/51180) for backports [#51561](https://github.com/ClickHouse/ClickHouse/pull/51561) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -0,0 +1,25 @@
---
sidebar_position: 1
sidebar_label: 2023
---
# 2023 Changelog
### ClickHouse release v23.6.2.18-stable (89f39a7ccfe) FIXME as compared to v23.6.1.1524-stable (d1c7e13d088)
#### Build/Testing/Packaging Improvement
* Backported in [#51888](https://github.com/ClickHouse/ClickHouse/issues/51888): Update cargo dependencies. [#51721](https://github.com/ClickHouse/ClickHouse/pull/51721) ([Raúl Marín](https://github.com/Algunenano)).
#### Bug Fix (user-visible misbehavior in an official stable release)
* Fix reading from empty column in `parseSipHashKey` [#51804](https://github.com/ClickHouse/ClickHouse/pull/51804) ([Nikita Taranov](https://github.com/nickitat)).
* Allow parametric UDFs [#51964](https://github.com/ClickHouse/ClickHouse/pull/51964) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Remove the usage of Analyzer setting in the client [#51578](https://github.com/ClickHouse/ClickHouse/pull/51578) ([Alexey Milovidov](https://github.com/alexey-milovidov)).
* Fix 02116_tuple_element with Analyzer [#51669](https://github.com/ClickHouse/ClickHouse/pull/51669) ([Robert Schulze](https://github.com/rschu1ze)).
* Fix SQLLogic docker images [#51719](https://github.com/ClickHouse/ClickHouse/pull/51719) ([Antonio Andelic](https://github.com/antonio2368)).
* Fix source image for sqllogic [#51728](https://github.com/ClickHouse/ClickHouse/pull/51728) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).
* Pin for docker-ce [#51743](https://github.com/ClickHouse/ClickHouse/pull/51743) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -23,7 +23,7 @@ sudo bash -c "$(wget -O - https://apt.llvm.org/llvm.sh)"
``` bash
cd ClickHouse
mkdir build-riscv64
CC=clang-16 CXX=clang++-16 cmake . -Bbuild-riscv64 -G Ninja -DCMAKE_TOOLCHAIN_FILE=cmake/linux/toolchain-riscv64.cmake -DGLIBC_COMPATIBILITY=OFF -DENABLE_LDAP=OFF -DOPENSSL_NO_ASM=ON -DENABLE_JEMALLOC=ON -DENABLE_PARQUET=OFF -DUSE_UNWIND=OFF -DENABLE_GRPC=OFF -DENABLE_HDFS=OFF -DENABLE_MYSQL=OFF
CC=clang-16 CXX=clang++-16 cmake . -Bbuild-riscv64 -G Ninja -DCMAKE_TOOLCHAIN_FILE=cmake/linux/toolchain-riscv64.cmake -DGLIBC_COMPATIBILITY=OFF -DENABLE_LDAP=OFF -DOPENSSL_NO_ASM=ON -DENABLE_JEMALLOC=ON -DENABLE_PARQUET=OFF -DENABLE_GRPC=OFF -DENABLE_HDFS=OFF -DENABLE_MYSQL=OFF
ninja -C build-riscv64
```

View File

@ -33,6 +33,15 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name
- `options` — MongoDB connection string options (optional parameter).
:::tip
If you are using the MongoDB Atlas cloud offering please add these options:
```
'connectTimeoutMS=10000&ssl=true&authSource=admin'
```
:::
## Usage Example {#usage-example}
Create a table in ClickHouse which allows to read data from MongoDB collection:

View File

@ -37,8 +37,8 @@ The [Merge](/docs/en/engines/table-engines/special/merge.md/#merge) engine does
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1] [TTL expr1],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2] [TTL expr2],
name1 [type1] [DEFAULT|MATERIALIZED|ALIAS|EPHEMERAL expr1] [TTL expr1] [CODEC(codec1)] [[NOT] NULL|PRIMARY KEY],
name2 [type2] [DEFAULT|MATERIALIZED|ALIAS|EPHEMERAL expr2] [TTL expr2] [CODEC(codec2)] [[NOT] NULL|PRIMARY KEY],
...
INDEX index_name1 expr1 TYPE type1(...) [GRANULARITY value1],
INDEX index_name2 expr2 TYPE type2(...) [GRANULARITY value2],

View File

@ -17,7 +17,8 @@ Default value: 0.
**Example**
``` sql
insert into table_1 values (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
INSERT INTO table_1 VALUES (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
SELECT * FROM table_1;
```
```response
┌─x─┬─y────┐
@ -30,7 +31,7 @@ insert into table_1 values (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
```sql
SELECT *
FROM table_1
SETTINGS additional_table_filters = (('table_1', 'x != 2'))
SETTINGS additional_table_filters = {'table_1': 'x != 2'}
```
```response
┌─x─┬─y────┐
@ -50,7 +51,8 @@ Default value: `''`.
**Example**
``` sql
insert into table_1 values (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
INSERT INTO table_1 VALUES (1, 'a'), (2, 'bb'), (3, 'ccc'), (4, 'dddd');
SElECT * FROM table_1;
```
```response
┌─x─┬─y────┐
@ -3201,6 +3203,40 @@ ENGINE = Log
└──────────────────────────────────────────────────────────────────────────┘
```
## default_temporary_table_engine {#default_temporary_table_engine}
Same as [default_table_engine](#default_table_engine) but for temporary tables.
Default value: `Memory`.
In this example, any new temporary table that does not specify an `Engine` will use the `Log` table engine:
Query:
```sql
SET default_temporary_table_engine = 'Log';
CREATE TEMPORARY TABLE my_table (
x UInt32,
y UInt32
);
SHOW CREATE TEMPORARY TABLE my_table;
```
Result:
```response
┌─statement────────────────────────────────────────────────────────────────┐
│ CREATE TEMPORARY TABLE default.my_table
(
`x` UInt32,
`y` UInt32
)
ENGINE = Log
└──────────────────────────────────────────────────────────────────────────┘
```
## data_type_default_nullable {#data_type_default_nullable}
Allows data types without explicit modifiers [NULL or NOT NULL](../../sql-reference/statements/create/table.md/#null-modifiers) in column definition will be [Nullable](../../sql-reference/data-types/nullable.md/#data_type-nullable).
@ -3501,7 +3537,7 @@ Possible values:
- Any positive integer.
- 0 - Disabled (infinite timeout).
Default value: 180.
Default value: 30.
## http_receive_timeout {#http_receive_timeout}
@ -3512,7 +3548,7 @@ Possible values:
- Any positive integer.
- 0 - Disabled (infinite timeout).
Default value: 180.
Default value: 30.
## check_query_single_value_result {#check_query_single_value_result}

View File

@ -9,7 +9,6 @@ Columns:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — Event date.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Event time.
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — Event time with microseconds resolution.
- `name` ([String](../../sql-reference/data-types/string.md)) — Metric name.
- `value` ([Float64](../../sql-reference/data-types/float.md)) — Metric value.
@ -20,18 +19,18 @@ SELECT * FROM system.asynchronous_metric_log LIMIT 10
```
``` text
┌─event_date─┬──────────event_time─┬────event_time_microseconds─┬─name─────────────────────────────────────┬─────value─┐
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ CPUFrequencyMHz_0 │ 2120.9 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pmuzzy │ 743 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pdirty │ 26288 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.run_intervals │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.num_runs │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.retained │ 60694528 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.mapped │ 303161344 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.resident │ 260931584 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.metadata │ 12079488 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.allocated │ 133756128 │
└────────────┴─────────────────────┴────────────────────────────┴──────────────────────────────────────────┴───────────┘
┌─event_date─┬──────────event_time─┬─name─────────────────────────────────────┬─────value─┐
│ 2020-09-05 │ 2020-09-05 15:56:30 │ CPUFrequencyMHz_0 │ 2120.9 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pmuzzy │ 743 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pdirty │ 26288 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.run_intervals │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.num_runs │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.retained │ 60694528 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.mapped │ 303161344 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.resident │ 260931584 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.metadata │ 12079488 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.allocated │ 133756128 │
└────────────┴─────────────────────┴──────────────────────────────────────────┴───────────┘
```
**See Also**

View File

@ -171,12 +171,13 @@ Result:
└──────────────────────────────┘
```
Executable user defined functions can take constant parameters configured in `command` setting (works only for user defined functions with `executable` type).
Executable user defined functions can take constant parameters configured in `command` setting (works only for user defined functions with `executable` type). It also requires the `execute_direct` option (to ensure no shell argument expansion vulnerability).
File `test_function_parameter_python.xml` (`/etc/clickhouse-server/test_function_parameter_python.xml` with default path settings).
```xml
<functions>
<function>
<type>executable</type>
<execute_direct>true</execute_direct>
<name>test_function_parameter_python</name>
<return_type>String</return_type>
<argument>

View File

@ -30,6 +30,14 @@ mongodb(host:port, database, collection, user, password, structure [, options])
- `options` - MongoDB connection string options (optional parameter).
:::tip
If you are using the MongoDB Atlas cloud offering please add these options:
```
'connectTimeoutMS=10000&ssl=true&authSource=admin'
```
:::
**Returned Value**

View File

@ -8,7 +8,6 @@ slug: /ru/operations/system-tables/asynchronous_metric_log
Столбцы:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — дата события.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — время события.
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — время события в микросекундах.
- `name` ([String](../../sql-reference/data-types/string.md)) — название метрики.
- `value` ([Float64](../../sql-reference/data-types/float.md)) — значение метрики.

View File

@ -3,13 +3,6 @@ slug: /zh/development/build
---
# 如何构建 ClickHouse 发布包 {#ru-he-gou-jian-clickhouse-fa-bu-bao}
## 安装 Git 和 Pbuilder {#an-zhuang-git-he-pbuilder}
``` bash
sudo apt-get update
sudo apt-get install git pbuilder debhelper lsb-release fakeroot sudo debian-archive-keyring debian-keyring
```
## 拉取 ClickHouse 源码 {#la-qu-clickhouse-yuan-ma}
``` bash

View File

@ -8,7 +8,6 @@ slug: /zh/operations/system-tables/asynchronous_metric_log
列:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — 事件日期。
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — 事件时间。
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — 事件时间(微秒)。
- `name` ([String](../../sql-reference/data-types/string.md)) — 指标名。
- `value` ([Float64](../../sql-reference/data-types/float.md)) — 指标值。
@ -17,18 +16,18 @@ slug: /zh/operations/system-tables/asynchronous_metric_log
SELECT * FROM system.asynchronous_metric_log LIMIT 10
```
``` text
┌─event_date─┬──────────event_time─┬────event_time_microseconds─┬─name─────────────────────────────────────┬─────value─┐
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ CPUFrequencyMHz_0 │ 2120.9 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pmuzzy │ 743 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.arenas.all.pdirty │ 26288 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.run_intervals │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.background_thread.num_runs │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.retained │ 60694528 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.mapped │ 303161344 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.resident │ 260931584 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.metadata │ 12079488 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ 2020-09-05 15:56:30.025227 │ jemalloc.allocated │ 133756128 │
└────────────┴─────────────────────┴────────────────────────────┴──────────────────────────────────────────┴───────────┘
┌─event_date─┬──────────event_time─┬─name─────────────────────────────────────┬─────value─┐
│ 2020-09-05 │ 2020-09-05 15:56:30 │ CPUFrequencyMHz_0 │ 2120.9 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pmuzzy │ 743 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.arenas.all.pdirty │ 26288 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.run_intervals │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.background_thread.num_runs │ 0 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.retained │ 60694528 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.mapped │ 303161344 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.resident │ 260931584 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.metadata │ 12079488 │
│ 2020-09-05 │ 2020-09-05 15:56:30 │ jemalloc.allocated │ 133756128 │
└────────────┴─────────────────────┴──────────────────────────────────────────┴───────────┘
```
**另请参阅**

View File

@ -59,7 +59,7 @@ public:
String relative_path_from = validatePathAndGetAsRelative(path_from);
String relative_path_to = validatePathAndGetAsRelative(path_to);
disk_from->copy(relative_path_from, disk_to, relative_path_to);
disk_from->copyDirectoryContent(relative_path_from, disk_to, relative_path_to);
}
};
}

View File

@ -42,7 +42,7 @@ int mainEntryClickHouseKeeperConverter(int argc, char ** argv)
{
auto keeper_context = std::make_shared<KeeperContext>(true);
keeper_context->setDigestEnabled(true);
keeper_context->setSnapshotDisk(std::make_shared<DiskLocal>("Keeper-snapshots", options["output-dir"].as<std::string>(), 0));
keeper_context->setSnapshotDisk(std::make_shared<DiskLocal>("Keeper-snapshots", options["output-dir"].as<std::string>()));
DB::KeeperStorage storage(/* tick_time_ms */ 500, /* superdigest */ "", keeper_context, /* initialize_system_nodes */ false);

View File

@ -485,7 +485,7 @@ try
LOG_INFO(log, "Closed all listening sockets.");
if (current_connections > 0)
current_connections = waitServersToFinish(*servers, config().getInt("shutdown_wait_unfinished", 5));
current_connections = waitServersToFinish(*servers, servers_lock, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections)
LOG_INFO(log, "Closed connections to Keeper. But {} remain. Probably some users cannot finish their connections after context shutdown.", current_connections);

View File

@ -75,6 +75,15 @@ namespace ErrorCodes
extern const int FILE_ALREADY_EXISTS;
}
void applySettingsOverridesForLocal(ContextMutablePtr context)
{
Settings settings = context->getSettings();
settings.allow_introspection_functions = true;
settings.storage_file_read_method = LocalFSReadMethod::mmap;
context->setSettings(settings);
}
void LocalServer::processError(const String &) const
{
@ -668,6 +677,12 @@ void LocalServer::processConfig()
CompiledExpressionCacheFactory::instance().init(compiled_expression_cache_size, compiled_expression_cache_elements_size);
#endif
/// NOTE: it is important to apply any overrides before
/// setDefaultProfiles() calls since it will copy current context (i.e.
/// there is separate context for Buffer tables).
applySettingsOverridesForLocal(global_context);
applyCmdOptions(global_context);
/// Load global settings from default_profile and system_profile.
global_context->setDefaultProfiles(config());
@ -682,7 +697,6 @@ void LocalServer::processConfig()
std::string default_database = config().getString("default_database", "_local");
DatabaseCatalog::instance().attachDatabase(default_database, createClickHouseLocalDatabaseOverlay(default_database, global_context));
global_context->setCurrentDatabase(default_database);
applyCmdOptions(global_context);
if (config().has("path"))
{

View File

@ -1146,7 +1146,16 @@ try
size_t merges_mutations_memory_usage_soft_limit = server_settings_.merges_mutations_memory_usage_soft_limit;
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(memory_amount * server_settings_.merges_mutations_memory_usage_to_ram_ratio);
if (merges_mutations_memory_usage_soft_limit == 0 || merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage)
if (merges_mutations_memory_usage_soft_limit == 0)
{
merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage;
LOG_INFO(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}"
" ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit),
formatReadableSizeWithBinarySuffix(memory_amount),
server_settings_.merges_mutations_memory_usage_to_ram_ratio);
}
else if (merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage)
{
merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage;
LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}"
@ -1523,7 +1532,7 @@ try
LOG_INFO(log, "Closed all listening sockets.");
if (current_connections > 0)
current_connections = waitServersToFinish(servers_to_start_before_tables, config().getInt("shutdown_wait_unfinished", 5));
current_connections = waitServersToFinish(servers_to_start_before_tables, servers_lock, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections)
LOG_INFO(log, "Closed connections to servers for tables. But {} remain. Probably some tables of other users cannot finish their connections after context shutdown.", current_connections);
@ -1581,6 +1590,15 @@ try
/// After attaching system databases we can initialize system log.
global_context->initializeSystemLogs();
global_context->setSystemZooKeeperLogAfterInitializationIfNeeded();
/// Build loggers before tables startup to make log messages from tables
/// attach available in system.text_log
{
String level_str = config().getString("text_log.level", "");
int level = level_str.empty() ? INT_MAX : Poco::Logger::parseLevel(level_str);
setTextLog(global_context->getTextLog(), level);
buildLoggers(config(), logger());
}
/// After the system database is created, attach virtual system tables (in addition to query_log and part_log)
attachSystemTablesServer(global_context, *database_catalog.getSystemDatabase(), has_zookeeper);
attachInformationSchema(global_context, *database_catalog.getDatabase(DatabaseCatalog::INFORMATION_SCHEMA));
@ -1609,7 +1627,7 @@ try
/// Init trace collector only after trace_log system table was created
/// Disable it if we collect test coverage information, because it will work extremely slow.
#if USE_UNWIND && !WITH_COVERAGE
#if !WITH_COVERAGE
/// Profilers cannot work reliably with any other libunwind or without PHDR cache.
if (hasPHDRCache())
{
@ -1632,10 +1650,6 @@ try
/// Describe multiple reasons when query profiler cannot work.
#if !USE_UNWIND
LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they cannot work without bundled unwind (stack unwinding) library.");
#endif
#if WITH_COVERAGE
LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they work extremely slow with test coverage.");
#endif
@ -1707,14 +1721,6 @@ try
/// Must be done after initialization of `servers`, because async_metrics will access `servers` variable from its thread.
async_metrics.start();
{
String level_str = config().getString("text_log.level", "");
int level = level_str.empty() ? INT_MAX : Poco::Logger::parseLevel(level_str);
setTextLog(global_context->getTextLog(), level);
}
buildLoggers(config(), logger());
main_config_reloader->start();
access_control.startPeriodicReloading();
@ -1827,7 +1833,7 @@ try
global_context->getProcessList().killAllQueries();
if (current_connections)
current_connections = waitServersToFinish(servers, config().getInt("shutdown_wait_unfinished", 5));
current_connections = waitServersToFinish(servers, servers_lock, config().getInt("shutdown_wait_unfinished", 5));
if (current_connections)
LOG_WARNING(log, "Closed connections. But {} remain."

View File

@ -155,7 +155,7 @@ namespace
AccessRightsElement::AccessRightsElement(AccessFlags access_flags_, std::string_view database_)
: access_flags(access_flags_), database(database_), any_database(false)
: access_flags(access_flags_), database(database_), parameter(database_), any_database(false), any_parameter(false)
{
}

View File

@ -70,7 +70,7 @@ enum class AccessType
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
\
M(ALTER_DATABASE_SETTINGS, "ALTER DATABASE SETTING, ALTER MODIFY DATABASE SETTING, MODIFY DATABASE SETTING", DATABASE, ALTER_DATABASE) /* allows to execute ALTER MODIFY SETTING */\
M(ALTER_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_CONTROL) /* allows to execute ALTER NAMED COLLECTION */\
M(ALTER_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) /* allows to execute ALTER NAMED COLLECTION */\
\
M(ALTER_TABLE, "", GROUP, ALTER) \
M(ALTER_DATABASE, "", GROUP, ALTER) \
@ -92,7 +92,7 @@ enum class AccessType
M(CREATE_ARBITRARY_TEMPORARY_TABLE, "", GLOBAL, CREATE) /* allows to create and manipulate temporary tables
with arbitrary table engine */\
M(CREATE_FUNCTION, "", GLOBAL, CREATE) /* allows to execute CREATE FUNCTION */ \
M(CREATE_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_CONTROL) /* allows to execute CREATE NAMED COLLECTION */ \
M(CREATE_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) /* allows to execute CREATE NAMED COLLECTION */ \
M(CREATE, "", GROUP, ALL) /* allows to execute {CREATE|ATTACH} */ \
\
M(DROP_DATABASE, "", DATABASE, DROP) /* allows to execute {DROP|DETACH} DATABASE */\
@ -101,7 +101,7 @@ enum class AccessType
implicitly enabled by the grant DROP_TABLE */\
M(DROP_DICTIONARY, "", DICTIONARY, DROP) /* allows to execute {DROP|DETACH} DICTIONARY */\
M(DROP_FUNCTION, "", GLOBAL, DROP) /* allows to execute DROP FUNCTION */\
M(DROP_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_CONTROL) /* allows to execute DROP NAMED COLLECTION */\
M(DROP_NAMED_COLLECTION, "", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) /* allows to execute DROP NAMED COLLECTION */\
M(DROP, "", GROUP, ALL) /* allows to execute {DROP|DETACH} */\
\
M(UNDROP_TABLE, "", TABLE, ALL) /* allows to execute {UNDROP} TABLE */\
@ -140,9 +140,10 @@ enum class AccessType
M(SHOW_SETTINGS_PROFILES, "SHOW PROFILES, SHOW CREATE SETTINGS PROFILE, SHOW CREATE PROFILE", GLOBAL, SHOW_ACCESS) \
M(SHOW_ACCESS, "", GROUP, ACCESS_MANAGEMENT) \
M(ACCESS_MANAGEMENT, "", GROUP, ALL) \
M(SHOW_NAMED_COLLECTIONS, "SHOW NAMED COLLECTIONS", NAMED_COLLECTION, NAMED_COLLECTION_CONTROL) \
M(SHOW_NAMED_COLLECTIONS_SECRETS, "SHOW NAMED COLLECTIONS SECRETS", NAMED_COLLECTION, NAMED_COLLECTION_CONTROL) \
M(NAMED_COLLECTION_CONTROL, "", NAMED_COLLECTION, ALL) \
M(SHOW_NAMED_COLLECTIONS, "SHOW NAMED COLLECTIONS", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) \
M(SHOW_NAMED_COLLECTIONS_SECRETS, "SHOW NAMED COLLECTIONS SECRETS", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) \
M(NAMED_COLLECTION, "NAMED COLLECTION USAGE, USE NAMED COLLECTION", NAMED_COLLECTION, NAMED_COLLECTION_ADMIN) \
M(NAMED_COLLECTION_ADMIN, "NAMED COLLECTION CONTROL", NAMED_COLLECTION, ALL) \
\
M(SYSTEM_SHUTDOWN, "SYSTEM KILL, SHUTDOWN", GLOBAL, SYSTEM) \
M(SYSTEM_DROP_DNS_CACHE, "SYSTEM DROP DNS, DROP DNS CACHE, DROP DNS", GLOBAL, SYSTEM_DROP_CACHE) \
@ -157,7 +158,6 @@ enum class AccessType
M(SYSTEM_DROP_CACHE, "DROP CACHE", GROUP, SYSTEM) \
M(SYSTEM_RELOAD_CONFIG, "RELOAD CONFIG", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_USERS, "RELOAD USERS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_SYMBOLS, "RELOAD SYMBOLS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_DICTIONARY, "SYSTEM RELOAD DICTIONARIES, RELOAD DICTIONARY, RELOAD DICTIONARIES", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_MODEL, "SYSTEM RELOAD MODELS, RELOAD MODEL, RELOAD MODELS", GLOBAL, SYSTEM_RELOAD) \
M(SYSTEM_RELOAD_FUNCTION, "SYSTEM RELOAD FUNCTIONS, RELOAD FUNCTION, RELOAD FUNCTIONS", GLOBAL, SYSTEM_RELOAD) \

View File

@ -328,7 +328,7 @@ namespace
if (!named_collection_control)
{
user->access.revoke(AccessType::NAMED_COLLECTION_CONTROL);
user->access.revoke(AccessType::NAMED_COLLECTION_ADMIN);
}
if (!show_named_collections_secrets)

View File

@ -53,7 +53,7 @@ TEST(AccessRights, Union)
"SHOW ROW POLICIES, SYSTEM MERGES, SYSTEM TTL MERGES, SYSTEM FETCHES, "
"SYSTEM MOVES, SYSTEM SENDS, SYSTEM REPLICATION QUEUES, "
"SYSTEM DROP REPLICA, SYSTEM SYNC REPLICA, SYSTEM RESTART REPLICA, "
"SYSTEM RESTORE REPLICA, SYSTEM WAIT LOADING PARTS, SYSTEM SYNC DATABASE REPLICA, SYSTEM FLUSH DISTRIBUTED, dictGet ON db1.*, GRANT NAMED COLLECTION CONTROL ON db1");
"SYSTEM RESTORE REPLICA, SYSTEM WAIT LOADING PARTS, SYSTEM SYNC DATABASE REPLICA, SYSTEM FLUSH DISTRIBUTED, dictGet ON db1.*, GRANT NAMED COLLECTION ADMIN ON db1");
}

View File

@ -51,7 +51,8 @@ private:
T value = T{};
public:
static constexpr bool is_nullable = false;
static constexpr bool result_is_nullable = false;
static constexpr bool should_skip_null_arguments = true;
static constexpr bool is_any = false;
bool has() const
@ -501,7 +502,8 @@ private:
char small_data[MAX_SMALL_STRING_SIZE]; /// Including the terminating zero.
public:
static constexpr bool is_nullable = false;
static constexpr bool result_is_nullable = false;
static constexpr bool should_skip_null_arguments = true;
static constexpr bool is_any = false;
bool has() const
@ -769,7 +771,7 @@ static_assert(
/// For any other value types.
template <bool IS_NULLABLE = false>
template <bool RESULT_IS_NULLABLE = false>
struct SingleValueDataGeneric
{
private:
@ -779,12 +781,13 @@ private:
bool has_value = false;
public:
static constexpr bool is_nullable = IS_NULLABLE;
static constexpr bool result_is_nullable = RESULT_IS_NULLABLE;
static constexpr bool should_skip_null_arguments = !RESULT_IS_NULLABLE;
static constexpr bool is_any = false;
bool has() const
{
if constexpr (is_nullable)
if constexpr (result_is_nullable)
return has_value;
return !value.isNull();
}
@ -820,14 +823,14 @@ public:
void change(const IColumn & column, size_t row_num, Arena *)
{
column.get(row_num, value);
if constexpr (is_nullable)
if constexpr (result_is_nullable)
has_value = true;
}
void change(const Self & to, Arena *)
{
value = to.value;
if constexpr (is_nullable)
if constexpr (result_is_nullable)
has_value = true;
}
@ -844,7 +847,7 @@ public:
bool changeFirstTime(const Self & to, Arena * arena)
{
if (!has() && (is_nullable || to.has()))
if (!has() && (result_is_nullable || to.has()))
{
change(to, arena);
return true;
@ -879,7 +882,7 @@ public:
}
else
{
if constexpr (is_nullable)
if constexpr (result_is_nullable)
{
Field new_value;
column.get(row_num, new_value);
@ -910,7 +913,7 @@ public:
{
if (!to.has())
return false;
if constexpr (is_nullable)
if constexpr (result_is_nullable)
{
if (!has())
{
@ -945,7 +948,7 @@ public:
}
else
{
if constexpr (is_nullable)
if constexpr (result_is_nullable)
{
Field new_value;
column.get(row_num, new_value);
@ -975,7 +978,7 @@ public:
{
if (!to.has())
return false;
if constexpr (is_nullable)
if constexpr (result_is_nullable)
{
if (!value.isNull() && (to.value.isNull() || value < to.value))
{
@ -1138,13 +1141,20 @@ struct AggregateFunctionAnyLastData : Data
#endif
};
/** The aggregate function 'singleValueOrNull' is used to implement subquery operators,
* such as x = ALL (SELECT ...)
* It checks if there is only one unique non-NULL value in the data.
* If there is only one unique value - returns it.
* If there are zero or at least two distinct values - returns NULL.
*/
template <typename Data>
struct AggregateFunctionSingleValueOrNullData : Data
{
static constexpr bool is_nullable = true;
using Self = AggregateFunctionSingleValueOrNullData;
static constexpr bool result_is_nullable = true;
bool first_value = true;
bool is_null = false;
@ -1166,7 +1176,7 @@ struct AggregateFunctionSingleValueOrNullData : Data
if (!to.has())
return;
if (first_value)
if (first_value && !to.first_value)
{
first_value = false;
this->change(to, arena);
@ -1311,7 +1321,7 @@ public:
static DataTypePtr createResultType(const DataTypePtr & type_)
{
if constexpr (Data::is_nullable)
if constexpr (Data::result_is_nullable)
return makeNullable(type_);
return type_;
}
@ -1431,13 +1441,13 @@ public:
}
AggregateFunctionPtr getOwnNullAdapter(
const AggregateFunctionPtr & nested_function,
const AggregateFunctionPtr & original_function,
const DataTypes & /*arguments*/,
const Array & /*params*/,
const AggregateFunctionProperties & /*properties*/) const override
{
if (Data::is_nullable)
return nested_function;
if (Data::result_is_nullable && !Data::should_skip_null_arguments)
return original_function;
return nullptr;
}

View File

@ -116,7 +116,6 @@ namespace ErrorCodes
extern const int UNKNOWN_TABLE;
extern const int ILLEGAL_COLUMN;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int FUNCTION_CANNOT_HAVE_PARAMETERS;
}
/** Query analyzer implementation overview. Please check documentation in QueryAnalysisPass.h first.
@ -4897,11 +4896,6 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
lambda_expression_untyped->formatASTForErrorMessage(),
scope.scope_node->formatASTForErrorMessage());
if (!parameters.empty())
{
throw Exception(ErrorCodes::FUNCTION_CANNOT_HAVE_PARAMETERS, "Function {} is not parametric", function_node.formatASTForErrorMessage());
}
auto lambda_expression_clone = lambda_expression_untyped->clone();
IdentifierResolveScope lambda_scope(lambda_expression_clone, &scope /*parent_scope*/);
@ -5018,12 +5012,9 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
}
FunctionOverloadResolverPtr function = UserDefinedExecutableFunctionFactory::instance().tryGet(function_name, scope.context, parameters);
bool is_executable_udf = false;
if (!function)
function = FunctionFactory::instance().tryGet(function_name, scope.context);
else
is_executable_udf = true;
if (!function)
{
@ -5074,12 +5065,6 @@ ProjectionNames QueryAnalyzer::resolveFunction(QueryTreeNodePtr & node, Identifi
return result_projection_names;
}
/// Executable UDFs may have parameters. They are checked in UserDefinedExecutableFunctionFactory.
if (!parameters.empty() && !is_executable_udf)
{
throw Exception(ErrorCodes::FUNCTION_CANNOT_HAVE_PARAMETERS, "Function {} is not parametric", function_name);
}
/** For lambda arguments we need to initialize lambda argument types DataTypeFunction using `getLambdaArgumentTypes` function.
* Then each lambda arguments are initialized with columns, where column source is lambda.
* This information is important for later steps of query processing.

View File

@ -253,6 +253,7 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
{
return std::make_unique<WriteBufferFromS3>(
client,
client, // already has long timeout
s3_uri.bucket,
fs::path(s3_uri.key) / file_name,
DBMS_DEFAULT_BUFFER_SIZE,

View File

@ -24,7 +24,7 @@ protected:
/// Make local disk.
temp_dir = std::make_unique<Poco::TemporaryFile>();
temp_dir->createDirectories();
local_disk = std::make_shared<DiskLocal>("local_disk", temp_dir->path() + "/", 0);
local_disk = std::make_shared<DiskLocal>("local_disk", temp_dir->path() + "/");
/// Make encrypted disk.
auto settings = std::make_unique<DiskEncryptedSettings>();
@ -38,7 +38,7 @@ protected:
settings->current_key = key;
settings->current_key_fingerprint = fingerprint;
encrypted_disk = std::make_shared<DiskEncrypted>("encrypted_disk", std::move(settings), true);
encrypted_disk = std::make_shared<DiskEncrypted>("encrypted_disk", std::move(settings));
}
void TearDown() override

View File

@ -575,9 +575,11 @@ try
}
auto flags = O_WRONLY | O_EXCL;
if (query_with_output->is_outfile_append)
auto file_exists = fs::exists(out_file);
if (file_exists && query_with_output->is_outfile_append)
flags |= O_APPEND;
else if (query_with_output->is_outfile_truncate)
else if (file_exists && query_with_output->is_outfile_truncate)
flags |= O_TRUNC;
else
flags |= O_CREAT;

View File

@ -107,8 +107,8 @@ struct FloatCompareHelper
}
};
template <class U> struct CompareHelper<Float32, U> : public FloatCompareHelper<Float32> {};
template <class U> struct CompareHelper<Float64, U> : public FloatCompareHelper<Float64> {};
template <typename U> struct CompareHelper<Float32, U> : public FloatCompareHelper<Float32> {};
template <typename U> struct CompareHelper<Float64, U> : public FloatCompareHelper<Float64> {};
/** A template for columns that use a simple array to store.

View File

@ -8,7 +8,7 @@
* See also: https://gcc.gnu.org/legacy-ml/gcc-help/2017-12/msg00021.html
*/
#ifdef NDEBUG
__attribute__((__weak__)) extern const size_t MMAP_THRESHOLD = 64 * (1ULL << 20);
__attribute__((__weak__)) extern const size_t MMAP_THRESHOLD = 128 * (1ULL << 20);
#else
/**
* In debug build, use small mmap threshold to reproduce more memory

View File

@ -93,8 +93,8 @@
M(ThreadPoolFSReaderThreadsActive, "Number of threads in the thread pool for local_filesystem_read_method=threadpool running a task.") \
M(BackupsIOThreads, "Number of threads in the BackupsIO thread pool.") \
M(BackupsIOThreadsActive, "Number of threads in the BackupsIO thread pool running a task.") \
M(DiskObjectStorageAsyncThreads, "Number of threads in the async thread pool for DiskObjectStorage.") \
M(DiskObjectStorageAsyncThreadsActive, "Number of threads in the async thread pool for DiskObjectStorage running a task.") \
M(DiskObjectStorageAsyncThreads, "Obsolete metric, shows nothing.") \
M(DiskObjectStorageAsyncThreadsActive, "Obsolete metric, shows nothing.") \
M(StorageHiveThreads, "Number of threads in the StorageHive thread pool.") \
M(StorageHiveThreadsActive, "Number of threads in the StorageHive thread pool running a task.") \
M(TablesLoaderThreads, "Number of threads in the tables loader thread pool.") \
@ -141,6 +141,8 @@
M(MergeTreeOutdatedPartsLoaderThreadsActive, "Number of active threads in the threadpool for loading Outdated data parts.") \
M(MergeTreePartsCleanerThreads, "Number of threads in the MergeTree parts cleaner thread pool.") \
M(MergeTreePartsCleanerThreadsActive, "Number of threads in the MergeTree parts cleaner thread pool running a task.") \
M(IDiskCopierThreads, "Number of threads for copying data between disks of different types.") \
M(IDiskCopierThreadsActive, "Number of threads for copying data between disks of different types running a task.") \
M(SystemReplicasThreads, "Number of threads in the system.replicas thread pool.") \
M(SystemReplicasThreadsActive, "Number of threads in the system.replicas thread pool running a task.") \
M(RestartReplicaThreads, "Number of threads in the RESTART REPLICA thread pool.") \

View File

@ -418,6 +418,18 @@ PreformattedMessage getCurrentExceptionMessageAndPattern(bool with_stacktrace, b
<< " (version " << VERSION_STRING << VERSION_OFFICIAL << ")";
}
catch (...) {}
// #ifdef ABORT_ON_LOGICAL_ERROR
// try
// {
// throw;
// }
// catch (const std::logic_error &)
// {
// abortOnFailedAssertion(stream.str());
// }
// catch (...) {}
// #endif
}
catch (...)
{

View File

@ -0,0 +1,42 @@
#pragma once
#include <Common/CurrentThread.h>
#include <Common/MemoryTracker.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
struct MemoryTrackerSwitcher
{
explicit MemoryTrackerSwitcher(MemoryTracker * new_tracker)
{
if (!current_thread)
throw Exception(ErrorCodes::LOGICAL_ERROR, "current_thread is not initialized");
auto * thread_tracker = CurrentThread::getMemoryTracker();
prev_untracked_memory = current_thread->untracked_memory;
prev_memory_tracker_parent = thread_tracker->getParent();
current_thread->untracked_memory = 0;
thread_tracker->setParent(new_tracker);
}
~MemoryTrackerSwitcher()
{
CurrentThread::flushUntrackedMemory();
auto * thread_tracker = CurrentThread::getMemoryTracker();
current_thread->untracked_memory = prev_untracked_memory;
thread_tracker->setParent(prev_memory_tracker_parent);
}
MemoryTracker * prev_memory_tracker_parent = nullptr;
Int64 prev_untracked_memory = 0;
};
}

View File

@ -1,9 +1,11 @@
#pragma once
#include <mutex>
#include <condition_variable>
#include <Poco/Timespan.h>
#include <mutex>
#include <type_traits>
#include <variant>
#include <boost/noncopyable.hpp>
#include <Poco/Timespan.h>
#include <Common/logger_useful.h>
#include <Common/Exception.h>
@ -15,14 +17,6 @@ namespace ProfileEvents
extern const Event ConnectionPoolIsFullMicroseconds;
}
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
/** A class from which you can inherit and get a pool of something. Used for database connection pools.
* Descendant class must provide a method for creating a new object to place in the pool.
*/
@ -35,6 +29,22 @@ public:
using ObjectPtr = std::shared_ptr<Object>;
using Ptr = std::shared_ptr<PoolBase<TObject>>;
enum class BehaviourOnLimit
{
/**
* Default behaviour - when limit on pool size is reached, callers will wait until object will be returned back in pool.
*/
Wait,
/**
* If no free objects in pool - allocate a new object, but not store it in pool.
* This behaviour is needed when we simply don't want to waste time waiting or if we cannot guarantee that query could be processed using fixed amount of connections.
* For example, when we read from table on s3, one GetObject request corresponds to the whole FileSystemCache segment. This segments are shared between different
* reading tasks, so in general case connection could be taken from pool by one task and returned back by another one. And these tasks are processed completely independently.
*/
AllocateNewBypassingPool,
};
private:
/** The object with the flag, whether it is currently used. */
@ -89,37 +99,53 @@ public:
Object & operator*() && = delete;
const Object & operator*() const && = delete;
Object * operator->() & { return &*data->data.object; }
const Object * operator->() const & { return &*data->data.object; }
Object & operator*() & { return *data->data.object; }
const Object & operator*() const & { return *data->data.object; }
Object * operator->() & { return castToObjectPtr(); }
const Object * operator->() const & { return castToObjectPtr(); }
Object & operator*() & { return *castToObjectPtr(); }
const Object & operator*() const & { return *castToObjectPtr(); }
/**
* Expire an object to make it reallocated later.
*/
void expire()
{
data->data.is_expired = true;
if (data.index() == 1)
std::get<1>(data)->data.is_expired = true;
}
bool isNull() const { return data == nullptr; }
PoolBase * getPool() const
{
if (!data)
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "Attempt to get pool from uninitialized entry");
return &data->data.pool;
}
bool isNull() const { return data.index() == 0 ? !std::get<0>(data) : !std::get<1>(data); }
private:
std::shared_ptr<PoolEntryHelper> data;
/**
* Plain object will be stored instead of PoolEntryHelper if fallback was made in get() (see BehaviourOnLimit::AllocateNewBypassingPool).
*/
std::variant<ObjectPtr, std::shared_ptr<PoolEntryHelper>> data;
explicit Entry(ObjectPtr && object) : data(std::move(object)) { }
explicit Entry(PooledObject & object) : data(std::make_shared<PoolEntryHelper>(object)) { }
auto castToObjectPtr() const
{
return std::visit(
[](const auto & ptr)
{
using T = std::decay_t<decltype(ptr)>;
if constexpr (std::is_same_v<ObjectPtr, T>)
return ptr.get();
else
return ptr->data.object.get();
},
data);
}
};
virtual ~PoolBase() = default;
/** Allocates the object. Wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite. */
/** Allocates the object.
* If 'behaviour_on_limit' is Wait - wait for free object in pool for 'timeout'. With 'timeout' < 0, the timeout is infinite.
* If 'behaviour_on_limit' is AllocateNewBypassingPool and there is no free object - a new object will be created but not stored in the pool.
*/
Entry get(Poco::Timespan::TimeDiff timeout)
{
std::unique_lock lock(mutex);
@ -150,6 +176,9 @@ public:
return Entry(*items.back());
}
if (behaviour_on_limit == BehaviourOnLimit::AllocateNewBypassingPool)
return Entry(allocObject());
Stopwatch blocked;
if (timeout < 0)
{
@ -184,6 +213,8 @@ private:
/** The maximum size of the pool. */
unsigned max_items;
BehaviourOnLimit behaviour_on_limit;
/** Pool. */
Objects items;
@ -192,11 +223,10 @@ private:
std::condition_variable available;
protected:
Poco::Logger * log;
PoolBase(unsigned max_items_, Poco::Logger * log_)
: max_items(max_items_), log(log_)
PoolBase(unsigned max_items_, Poco::Logger * log_, BehaviourOnLimit behaviour_on_limit_ = BehaviourOnLimit::Wait)
: max_items(max_items_), behaviour_on_limit(behaviour_on_limit_), log(log_)
{
items.reserve(max_items);
}

View File

@ -368,6 +368,10 @@ The server successfully detected this situation and will download merged part fr
M(ReadBufferFromS3InitMicroseconds, "Time spent initializing connection to S3.") \
M(ReadBufferFromS3Bytes, "Bytes read from S3.") \
M(ReadBufferFromS3RequestsErrors, "Number of exceptions while reading from S3.") \
M(ReadBufferFromS3ResetSessions, "Number of HTTP sessions that were reset in ReadBufferFromS3.") \
M(ReadBufferFromS3PreservedSessions, "Number of HTTP sessions that were preserved in ReadBufferFromS3.") \
\
M(ReadWriteBufferFromHTTPPreservedSessions, "Number of HTTP sessions that were preserved in ReadWriteBufferFromHTTP.") \
\
M(WriteBufferFromS3Microseconds, "Time spent on writing to S3.") \
M(WriteBufferFromS3Bytes, "Bytes written to S3.") \

View File

@ -91,7 +91,7 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
#if USE_UNWIND
#ifndef __APPLE__
Timer::Timer()
: log(&Poco::Logger::get("Timer"))
{}
@ -120,6 +120,15 @@ void Timer::createIfNecessary(UInt64 thread_id, int clock_type, int pause_signal
throw Exception(ErrorCodes::CANNOT_CREATE_TIMER, "Failed to create thread timer. The function "
"'timer_create' returned non-zero but didn't set errno. This is bug in your OS.");
/// For example, it cannot be created if the server is run under QEMU:
/// "Failed to create thread timer, errno: 11, strerror: Resource temporarily unavailable."
/// You could accidentally run the server under QEMU without being aware,
/// if you use Docker image for a different architecture,
/// and you have the "binfmt-misc" kernel module, and "qemu-user" tools.
/// Also, it cannot be created if the server has too many threads.
throwFromErrno("Failed to create thread timer", ErrorCodes::CANNOT_CREATE_TIMER);
}
timer_id.emplace(local_timer_id);
@ -200,13 +209,13 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(UInt64 thread_id, int clock_t
UNUSED(pause_signal);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "QueryProfiler disabled because they cannot work under sanitizers");
#elif !USE_UNWIND
#elif defined(__APPLE__)
UNUSED(thread_id);
UNUSED(clock_type);
UNUSED(period);
UNUSED(pause_signal);
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "QueryProfiler cannot work with stock libunwind");
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "QueryProfiler cannot work on OSX");
#else
/// Sanity check.
if (!hasPHDRCache())
@ -255,7 +264,7 @@ QueryProfilerBase<ProfilerImpl>::~QueryProfilerBase()
template <typename ProfilerImpl>
void QueryProfilerBase<ProfilerImpl>::cleanup()
{
#if USE_UNWIND
#ifndef __APPLE__
timer.stop();
signal_handler_disarmed = true;
#endif

View File

@ -28,7 +28,7 @@ namespace DB
* Note that signal handler implementation is defined by template parameter. See QueryProfilerReal and QueryProfilerCPU.
*/
#if USE_UNWIND
#ifndef __APPLE__
class Timer
{
public:
@ -60,7 +60,7 @@ private:
Poco::Logger * log;
#if USE_UNWIND
#ifndef __APPLE__
inline static thread_local Timer timer = Timer();
#endif

View File

@ -20,13 +20,10 @@
#include <sstream>
#include <unordered_map>
#include <fmt/format.h>
#include <libunwind.h>
#include "config.h"
#if USE_UNWIND
# include <libunwind.h>
#endif
namespace
{
/// Currently this variable is set up once on server startup.
@ -287,12 +284,8 @@ StackTrace::StackTrace(const ucontext_t & signal_context)
void StackTrace::tryCapture()
{
#if USE_UNWIND
size = unw_backtrace(frame_pointers.data(), capacity);
__msan_unpoison(frame_pointers.data(), size * sizeof(frame_pointers[0]));
#else
size = 0;
#endif
}
/// ClickHouse uses bundled libc++ so type names will be the same on every system thus it's safe to hardcode them

View File

@ -793,88 +793,6 @@ public:
}
};
// Searches for needle surrounded by token-separators.
// Separators are anything inside ASCII (0-128) and not alphanum.
// Any value outside of basic ASCII (>=128) is considered a non-separator symbol, hence UTF-8 strings
// should work just fine. But any Unicode whitespace is not considered a token separtor.
template <typename StringSearcher>
class TokenSearcher : public StringSearcherBase
{
StringSearcher searcher;
size_t needle_size;
public:
template <typename CharT>
requires (sizeof(CharT) == 1)
static bool isValidNeedle(const CharT * needle_, size_t needle_size_)
{
return std::none_of(needle_, needle_ + needle_size_, isTokenSeparator);
}
template <typename CharT>
requires (sizeof(CharT) == 1)
TokenSearcher(const CharT * needle_, size_t needle_size_)
: searcher(needle_, needle_size_)
, needle_size(needle_size_)
{
/// The caller is responsible for calling isValidNeedle()
chassert(isValidNeedle(needle_, needle_size_));
}
template <typename CharT>
requires (sizeof(CharT) == 1)
ALWAYS_INLINE bool compare(const CharT * haystack, const CharT * haystack_end, const CharT * pos) const
{
// use searcher only if pos is in the beginning of token and pos + searcher.needle_size is end of token.
if (isToken(haystack, haystack_end, pos))
return searcher.compare(haystack, haystack_end, pos);
return false;
}
template <typename CharT>
requires (sizeof(CharT) == 1)
const CharT * search(const CharT * haystack, const CharT * const haystack_end) const
{
// use searcher.search(), then verify that returned value is a token
// if it is not, skip it and re-run
const auto * pos = haystack;
while (pos < haystack_end)
{
pos = searcher.search(pos, haystack_end);
if (pos == haystack_end || isToken(haystack, haystack_end, pos))
return pos;
// assuming that heendle does not contain any token separators.
pos += needle_size;
}
return haystack_end;
}
template <typename CharT>
requires (sizeof(CharT) == 1)
const CharT * search(const CharT * haystack, size_t haystack_size) const
{
return search(haystack, haystack + haystack_size);
}
template <typename CharT>
requires (sizeof(CharT) == 1)
ALWAYS_INLINE bool isToken(const CharT * haystack, const CharT * const haystack_end, const CharT* p) const
{
return (p == haystack || isTokenSeparator(*(p - 1)))
&& (p + needle_size >= haystack_end || isTokenSeparator(*(p + needle_size)));
}
ALWAYS_INLINE static bool isTokenSeparator(const uint8_t c)
{
return !(isAlphaNumericASCII(c) || !isASCII(c));
}
};
}
using ASCIICaseSensitiveStringSearcher = impl::StringSearcher<true, true>;
@ -882,9 +800,6 @@ using ASCIICaseInsensitiveStringSearcher = impl::StringSearcher<false, true>;
using UTF8CaseSensitiveStringSearcher = impl::StringSearcher<true, false>;
using UTF8CaseInsensitiveStringSearcher = impl::StringSearcher<false, false>;
using ASCIICaseSensitiveTokenSearcher = impl::TokenSearcher<ASCIICaseSensitiveStringSearcher>;
using ASCIICaseInsensitiveTokenSearcher = impl::TokenSearcher<ASCIICaseInsensitiveStringSearcher>;
/// Use only with short haystacks where cheap initialization is required.
template <bool CaseInsensitive>
struct StdLibASCIIStringSearcher

View File

@ -9,7 +9,6 @@
#include <link.h>
//#include <iostream>
#include <filesystem>
#include <base/sort.h>
@ -561,13 +560,6 @@ MultiVersion<SymbolIndex>::Version SymbolIndex::instance()
return instanceImpl().get();
}
void SymbolIndex::reload()
{
instanceImpl().set(std::unique_ptr<SymbolIndex>(new SymbolIndex));
/// Also drop stacktrace cache.
StackTrace::dropCache();
}
}
#endif

View File

@ -24,7 +24,6 @@ protected:
public:
static MultiVersion<SymbolIndex>::Version instance();
static void reload();
struct Symbol
{

View File

@ -199,13 +199,14 @@ ThreadStatus::~ThreadStatus()
if (deleter)
deleter();
chassert(!check_current_thread_on_destruction || current_thread == this);
/// Only change current_thread if it's currently being used by this ThreadStatus
/// For example, PushingToViews chain creates and deletes ThreadStatus instances while running in the main query thread
if (check_current_thread_on_destruction)
{
assert(current_thread == this);
if (current_thread == this)
current_thread = nullptr;
}
else if (check_current_thread_on_destruction)
LOG_ERROR(log, "current_thread contains invalid address");
}
void ThreadStatus::updatePerformanceCounters()

View File

@ -730,9 +730,6 @@ using VolnitskyUTF8 = VolnitskyBase<true, false, UTF8CaseSensitiveStringSearcher
using VolnitskyCaseInsensitive = VolnitskyBase<false, true, ASCIICaseInsensitiveStringSearcher>; /// ignores non-ASCII bytes
using VolnitskyCaseInsensitiveUTF8 = VolnitskyBase<false, false, UTF8CaseInsensitiveStringSearcher>;
using VolnitskyCaseSensitiveToken = VolnitskyBase<true, true, ASCIICaseSensitiveTokenSearcher>;
using VolnitskyCaseInsensitiveToken = VolnitskyBase<false, true, ASCIICaseInsensitiveTokenSearcher>;
using MultiVolnitsky = MultiVolnitskyBase<true, true, ASCIICaseSensitiveStringSearcher>;
using MultiVolnitskyUTF8 = MultiVolnitskyBase<true, false, UTF8CaseSensitiveStringSearcher>;
using MultiVolnitskyCaseInsensitive = MultiVolnitskyBase<false, true, ASCIICaseInsensitiveStringSearcher>;

View File

@ -9,7 +9,6 @@
#cmakedefine01 USE_AWS_S3
#cmakedefine01 USE_AZURE_BLOB_STORAGE
#cmakedefine01 USE_BROTLI
#cmakedefine01 USE_UNWIND
#cmakedefine01 USE_CASSANDRA
#cmakedefine01 USE_SENTRY
#cmakedefine01 USE_GRPC

View File

@ -220,7 +220,7 @@ KeeperContext::Storage KeeperContext::getLogsPathFromConfig(const Poco::Util::Ab
if (!fs::exists(path))
fs::create_directories(path);
return std::make_shared<DiskLocal>("LocalLogDisk", path, 0);
return std::make_shared<DiskLocal>("LocalLogDisk", path);
};
/// the most specialized path
@ -246,7 +246,7 @@ KeeperContext::Storage KeeperContext::getSnapshotsPathFromConfig(const Poco::Uti
if (!fs::exists(path))
fs::create_directories(path);
return std::make_shared<DiskLocal>("LocalSnapshotDisk", path, 0);
return std::make_shared<DiskLocal>("LocalSnapshotDisk", path);
};
/// the most specialized path
@ -272,7 +272,7 @@ KeeperContext::Storage KeeperContext::getStatePathFromConfig(const Poco::Util::A
if (!fs::exists(path))
fs::create_directories(path);
return std::make_shared<DiskLocal>("LocalStateFileDisk", path, 0);
return std::make_shared<DiskLocal>("LocalStateFileDisk", path);
};
if (config.has("keeper_server.state_storage_disk"))

View File

@ -145,14 +145,14 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const SnapshotFileInfo & snapsh
const auto create_writer = [&](const auto & key)
{
return WriteBufferFromS3
{
return WriteBufferFromS3(
s3_client->client,
s3_client->client,
s3_client->uri.bucket,
key,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings_1
};
);
};
LOG_INFO(log, "Will try to upload snapshot on {} to S3", snapshot_file_info.path);

View File

@ -71,16 +71,16 @@ protected:
DB::KeeperContextPtr keeper_context = std::make_shared<DB::KeeperContext>(true);
Poco::Logger * log{&Poco::Logger::get("CoordinationTest")};
void setLogDirectory(const std::string & path) { keeper_context->setLogDisk(std::make_shared<DB::DiskLocal>("LogDisk", path, 0)); }
void setLogDirectory(const std::string & path) { keeper_context->setLogDisk(std::make_shared<DB::DiskLocal>("LogDisk", path)); }
void setSnapshotDirectory(const std::string & path)
{
keeper_context->setSnapshotDisk(std::make_shared<DB::DiskLocal>("SnapshotDisk", path, 0));
keeper_context->setSnapshotDisk(std::make_shared<DB::DiskLocal>("SnapshotDisk", path));
}
void setStateFileDirectory(const std::string & path)
{
keeper_context->setStateFileDisk(std::make_shared<DB::DiskLocal>("StateFile", path, 0));
keeper_context->setStateFileDisk(std::make_shared<DB::DiskLocal>("StateFile", path));
}
};
@ -1503,9 +1503,9 @@ void testLogAndStateMachine(
using namespace DB;
ChangelogDirTest snapshots("./snapshots");
keeper_context->setSnapshotDisk(std::make_shared<DiskLocal>("SnapshotDisk", "./snapshots", 0));
keeper_context->setSnapshotDisk(std::make_shared<DiskLocal>("SnapshotDisk", "./snapshots"));
ChangelogDirTest logs("./logs");
keeper_context->setLogDisk(std::make_shared<DiskLocal>("LogDisk", "./logs", 0));
keeper_context->setLogDisk(std::make_shared<DiskLocal>("LogDisk", "./logs"));
ResponsesQueue queue(std::numeric_limits<size_t>::max());
SnapshotsQueue snapshots_queue{1};

View File

@ -41,7 +41,7 @@
/// The boundary on which the blocks for asynchronous file operations should be aligned.
#define DEFAULT_AIO_FILE_BLOCK_SIZE 4096
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 180
#define DEFAULT_HTTP_READ_BUFFER_TIMEOUT 30
#define DEFAULT_HTTP_READ_BUFFER_CONNECTION_TIMEOUT 1
/// Maximum number of http-connections between two endpoints
/// the number is unmotivated

View File

@ -102,6 +102,7 @@ class IColumn;
M(Bool, s3_allow_parallel_part_upload, true, "Use multiple threads for s3 multipart upload. It may lead to slightly higher memory usage", 0) \
M(Bool, s3_throw_on_zero_files_match, false, "Throw an error, when ListObjects request cannot match any files", 0) \
M(UInt64, s3_retry_attempts, 10, "Setting for Aws::Client::RetryStrategy, Aws::Client does retries itself, 0 means no retries", 0) \
M(UInt64, s3_request_timeout_ms, 3000, "Idleness timeout for sending and receiving data to/from S3. Fail if a single TCP read or write call blocks for this long.", 0) \
M(Bool, enable_s3_requests_logging, false, "Enable very explicit logging of S3 requests. Makes sense for debug only.", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(Bool, hdfs_truncate_on_insert, false, "Enables or disables truncate before insert in s3 engine tables", 0) \
@ -517,6 +518,7 @@ class IColumn;
M(Seconds, wait_for_window_view_fire_signal_timeout, 10, "Timeout for waiting for window view fire signal in event time processing", 0) \
M(UInt64, min_free_disk_space_for_temporary_data, 0, "The minimum disk space to keep while writing temporary data used in external sorting and aggregation.", 0) \
\
M(DefaultTableEngine, default_temporary_table_engine, DefaultTableEngine::Memory, "Default table engine used when ENGINE is not set in CREATE TEMPORARY statement.",0) \
M(DefaultTableEngine, default_table_engine, DefaultTableEngine::None, "Default table engine used when ENGINE is not set in CREATE statement.",0) \
M(Bool, show_table_uuid_in_table_create_query_if_not_nil, false, "For tables in databases with Engine=Atomic show UUID of the table in its CREATE query.", 0) \
M(Bool, database_atomic_wait_for_drop_and_detach_synchronously, false, "When executing DROP or DETACH TABLE in Atomic database, wait for table data to be finally dropped or detached.", 0) \
@ -658,7 +660,7 @@ class IColumn;
M(UInt64, function_range_max_elements_in_block, 500000000, "Maximum number of values generated by function 'range' per block of data (sum of array sizes for every row in a block, see also 'max_block_size' and 'min_insert_block_size_rows'). It is a safety threshold.", 0) \
M(ShortCircuitFunctionEvaluation, short_circuit_function_evaluation, ShortCircuitFunctionEvaluation::ENABLE, "Setting for short-circuit function evaluation configuration. Possible values: 'enable' - use short-circuit function evaluation for functions that are suitable for it, 'disable' - disable short-circuit function evaluation, 'force_enable' - use short-circuit function evaluation for all functions.", 0) \
\
M(LocalFSReadMethod, storage_file_read_method, LocalFSReadMethod::mmap, "Method of reading data from storage file, one of: read, pread, mmap. The mmap method does not apply to clickhouse-server (it's intended for clickhouse-local).", 0) \
M(LocalFSReadMethod, storage_file_read_method, LocalFSReadMethod::pread, "Method of reading data from storage file, one of: read, pread, mmap. The mmap method does not apply to clickhouse-server (it's intended for clickhouse-local).", 0) \
M(String, local_filesystem_read_method, "pread_threadpool", "Method of reading data from local filesystem, one of: read, pread, mmap, io_uring, pread_threadpool. The 'io_uring' method is experimental and does not work for Log, TinyLog, StripeLog, File, Set and Join, and other tables with append-able files in presence of concurrent reads and writes.", 0) \
M(String, remote_filesystem_read_method, "threadpool", "Method of reading data from remote filesystem, one of: read, threadpool.", 0) \
M(Bool, local_filesystem_read_prefetch, false, "Should use prefetching when reading data from local filesystem.", 0) \

View File

@ -80,6 +80,8 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"23.6", {{"http_send_timeout", 180, 30, "3 minutes seems crazy long. Note that this is timeout for a single network write call, not for the whole upload operation."},
{"http_receive_timeout", 180, 30, "See http_send_timeout."}}},
{"23.5", {{"input_format_parquet_preserve_order", true, false, "Allow Parquet reader to reorder rows for better parallelism."},
{"parallelize_output_from_storages", false, true, "Allow parallelism when executing queries that read from file/url/s3/etc. This may reorder rows."},
{"use_with_fill_by_sorting_prefix", false, true, "Columns preceding WITH FILL columns in ORDER BY clause form sorting prefix. Rows with different values in sorting prefix are filled independently"},

View File

@ -154,7 +154,7 @@ static void signalHandler(int sig, siginfo_t * info, void * context)
writePODBinary(*info, out);
writePODBinary(signal_context, out);
writePODBinary(stack_trace, out);
writeVectorBinary(Exception::thread_frame_pointers, out);
writeVectorBinary(Exception::enable_job_stack_trace ? Exception::thread_frame_pointers : std::vector<StackTrace::FramePointers>{}, out);
writeBinary(static_cast<UInt32>(getThreadId()), out);
writePODBinary(current_thread, out);
@ -310,6 +310,57 @@ private:
{
ThreadStatus thread_status;
/// First log those fields that are safe to access and that should not cause new fault.
/// That way we will have some duplicated info in the log but we don't loose important info
/// in case of double fault.
LOG_FATAL(log, "########## Short fault info ############");
LOG_FATAL(log, "(version {}{}, build id: {}, git hash: {}) (from thread {}) Received signal {}",
VERSION_STRING, VERSION_OFFICIAL, daemon.build_id, daemon.git_hash,
thread_num, sig);
std::string signal_description = "Unknown signal";
/// Some of these are not really signals, but our own indications on failure reason.
if (sig == StdTerminate)
signal_description = "std::terminate";
else if (sig == SanitizerTrap)
signal_description = "sanitizer trap";
else if (sig >= 0)
signal_description = strsignal(sig); // NOLINT(concurrency-mt-unsafe) // it is not thread-safe but ok in this context
LOG_FATAL(log, "Signal description: {}", signal_description);
String error_message;
if (sig != SanitizerTrap)
error_message = signalToErrorMessage(sig, info, *context);
else
error_message = "Sanitizer trap.";
LOG_FATAL(log, fmt::runtime(error_message));
String bare_stacktrace_str;
if (stack_trace.getSize())
{
/// Write bare stack trace (addresses) just in case if we will fail to print symbolized stack trace.
/// NOTE: This still require memory allocations and mutex lock inside logger.
/// BTW we can also print it to stderr using write syscalls.
WriteBufferFromOwnString bare_stacktrace;
writeString("Stack trace:", bare_stacktrace);
for (size_t i = stack_trace.getOffset(); i < stack_trace.getSize(); ++i)
{
writeChar(' ', bare_stacktrace);
writePointerHex(stack_trace.getFramePointers()[i], bare_stacktrace);
}
LOG_FATAL(log, fmt::runtime(bare_stacktrace.str()));
bare_stacktrace_str = bare_stacktrace.str();
}
/// Now try to access potentially unsafe data in thread_ptr.
String query_id;
String query;
@ -326,16 +377,6 @@ private:
}
}
std::string signal_description = "Unknown signal";
/// Some of these are not really signals, but our own indications on failure reason.
if (sig == StdTerminate)
signal_description = "std::terminate";
else if (sig == SanitizerTrap)
signal_description = "sanitizer trap";
else if (sig >= 0)
signal_description = strsignal(sig); // NOLINT(concurrency-mt-unsafe) // it is not thread-safe but ok in this context
LOG_FATAL(log, "########################################");
if (query_id.empty())
@ -351,30 +392,11 @@ private:
thread_num, query_id, query, signal_description, sig);
}
String error_message;
if (sig != SanitizerTrap)
error_message = signalToErrorMessage(sig, info, *context);
else
error_message = "Sanitizer trap.";
LOG_FATAL(log, fmt::runtime(error_message));
if (stack_trace.getSize())
if (!bare_stacktrace_str.empty())
{
/// Write bare stack trace (addresses) just in case if we will fail to print symbolized stack trace.
/// NOTE: This still require memory allocations and mutex lock inside logger.
/// BTW we can also print it to stderr using write syscalls.
WriteBufferFromOwnString bare_stacktrace;
writeString("Stack trace:", bare_stacktrace);
for (size_t i = stack_trace.getOffset(); i < stack_trace.getSize(); ++i)
{
writeChar(' ', bare_stacktrace);
writePointerHex(stack_trace.getFramePointers()[i], bare_stacktrace);
}
LOG_FATAL(log, fmt::runtime(bare_stacktrace.str()));
LOG_FATAL(log, fmt::runtime(bare_stacktrace_str));
}
/// Write symbolized stack trace line by line for better grep-ability.
@ -1101,6 +1123,7 @@ void BaseDaemon::setupWatchdog()
if (0 == pid)
{
updateCurrentThreadIdAfterFork();
logger().information("Forked a child process to watch");
#if defined(OS_LINUX)
if (0 != prctl(PR_SET_PDEATHSIG, SIGKILL))

View File

@ -13,6 +13,7 @@
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCreateQuery.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Parsers/ASTCreateQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ParserCreateQuery.h>
@ -182,6 +183,7 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
auto ast = parseQueryFromMetadata(log, getContext(), full_path.string(), /*throw_on_error*/ true, /*remove_empty*/ false);
if (ast)
{
FunctionNameNormalizer().visit(ast.get());
auto * create_query = ast->as<ASTCreateQuery>();
/// NOTE No concurrent writes are possible during database loading
create_query->setDatabase(TSA_SUPPRESS_WARNING_FOR_READ(database_name));

View File

@ -549,16 +549,17 @@ void CacheDictionary<dictionary_key_type>::update(CacheDictionaryUpdateUnitPtr<d
for (size_t i = 0; i < key_index_to_state_from_storage.size(); ++i)
{
if (key_index_to_state_from_storage[i].isExpired()
|| key_index_to_state_from_storage[i].isNotFound())
if (key_index_to_state_from_storage[i].isExpired() || key_index_to_state_from_storage[i].isNotFound())
{
auto requested_key = requested_keys[i];
auto [_, inserted] = not_found_keys.insert(requested_key);
if (inserted)
{
if constexpr (dictionary_key_type == DictionaryKeyType::Simple)
requested_keys_vector.emplace_back(requested_keys[i]);
else
requested_complex_key_rows.emplace_back(i);
auto requested_key = requested_keys[i];
not_found_keys.insert(requested_key);
}
}
}

View File

@ -217,7 +217,7 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
std::optional<Configuration> configuration;
std::string settings_config_prefix = config_prefix + ".clickhouse";
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix) : nullptr;
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix, global_context) : nullptr;
if (named_collection)
{

View File

@ -71,7 +71,7 @@ void registerDictionarySourceMysql(DictionarySourceFactory & factory)
MySQLSettings mysql_settings;
std::optional<MySQLDictionarySource::Configuration> dictionary_configuration;
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix) : nullptr;
auto named_collection = created_from_ddl ? tryGetNamedCollectionWithOverrides(config, settings_config_prefix, global_context) : nullptr;
if (named_collection)
{
auto allowed_arguments{dictionary_allowed_keys};

View File

@ -266,7 +266,7 @@ public:
}
UInt64 getSize() const override { return reservation->getSize(); }
UInt64 getUnreservedSpace() const override { return reservation->getUnreservedSpace(); }
std::optional<UInt64> getUnreservedSpace() const override { return reservation->getUnreservedSpace(); }
DiskPtr getDisk(size_t i) const override
{
@ -285,19 +285,32 @@ private:
};
DiskEncrypted::DiskEncrypted(
const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_, bool use_fake_transaction_)
: DiskEncrypted(name_, parseDiskEncryptedSettings(name_, config_, config_prefix_, map_), use_fake_transaction_)
const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_)
: DiskEncrypted(name_, parseDiskEncryptedSettings(name_, config_, config_prefix_, map_), config_, config_prefix_)
{
}
DiskEncrypted::DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_, bool use_fake_transaction_)
DiskEncrypted::DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_,
const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_)
: IDisk(name_, config_, config_prefix_)
, delegate(settings_->wrapped_disk)
, encrypted_name(name_)
, disk_path(settings_->disk_path)
, disk_absolute_path(settings_->wrapped_disk->getPath() + settings_->disk_path)
, current_settings(std::move(settings_))
, use_fake_transaction(config_.getBool(config_prefix_ + ".use_fake_transaction", true))
{
delegate->createDirectories(disk_path);
}
DiskEncrypted::DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_)
: IDisk(name_)
, delegate(settings_->wrapped_disk)
, encrypted_name(name_)
, disk_path(settings_->disk_path)
, disk_absolute_path(settings_->wrapped_disk->getPath() + settings_->disk_path)
, current_settings(std::move(settings_))
, use_fake_transaction(use_fake_transaction_)
, use_fake_transaction(true)
{
delegate->createDirectories(disk_path);
}
@ -310,32 +323,6 @@ ReservationPtr DiskEncrypted::reserve(UInt64 bytes)
return std::make_unique<DiskEncryptedReservation>(std::static_pointer_cast<DiskEncrypted>(shared_from_this()), std::move(reservation));
}
void DiskEncrypted::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
/// Check if we can copy the file without deciphering.
if (isSameDiskType(*this, *to_disk))
{
/// Disk type is the same, check if the key is the same too.
if (auto * to_disk_enc = typeid_cast<DiskEncrypted *>(to_disk.get()))
{
auto from_settings = current_settings.get();
auto to_settings = to_disk_enc->current_settings.get();
if (from_settings->all_keys == to_settings->all_keys)
{
/// Keys are the same so we can simply copy the encrypted file.
auto wrapped_from_path = wrappedPath(from_path);
auto to_delegate = to_disk_enc->delegate;
auto wrapped_to_path = to_disk_enc->wrappedPath(to_path);
delegate->copy(wrapped_from_path, to_delegate, wrapped_to_path);
return;
}
}
}
/// Copy the file through buffers with deciphering.
copyThroughBuffers(from_path, to_disk, to_path);
}
void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir)
{
@ -359,11 +346,8 @@ void DiskEncrypted::copyDirectoryContent(const String & from_dir, const std::sha
}
}
if (!to_disk->exists(to_dir))
to_disk->createDirectories(to_dir);
/// Copy the file through buffers with deciphering.
copyThroughBuffers(from_dir, to_disk, to_dir);
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir);
}
std::unique_ptr<ReadBufferFromFileBase> DiskEncrypted::readFile(
@ -443,7 +427,7 @@ std::unordered_map<String, String> DiskEncrypted::getSerializedMetadata(const st
void DiskEncrypted::applyNewSettings(
const Poco::Util::AbstractConfiguration & config,
ContextPtr /*context*/,
ContextPtr context,
const String & config_prefix,
const DisksMap & disk_map)
{
@ -455,6 +439,7 @@ void DiskEncrypted::applyNewSettings(
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Сhanging disk path on the fly is not supported. Disk {}", name);
current_settings.set(std::move(new_settings));
IDisk::applyNewSettings(config, context, config_prefix, disk_map);
}
void registerDiskEncrypted(DiskFactory & factory, bool global_skip_access_check)
@ -467,7 +452,7 @@ void registerDiskEncrypted(DiskFactory & factory, bool global_skip_access_check)
const DisksMap & map) -> DiskPtr
{
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
DiskPtr disk = std::make_shared<DiskEncrypted>(name, config, config_prefix, map, config.getBool(config_prefix + ".use_fake_transaction", true));
DiskPtr disk = std::make_shared<DiskEncrypted>(name, config, config_prefix, map);
disk->startup(context, skip_access_check);
return disk;
};

View File

@ -21,8 +21,10 @@ class WriteBufferFromFileBase;
class DiskEncrypted : public IDisk
{
public:
DiskEncrypted(const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_, bool use_fake_transaction_);
DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_, bool use_fake_transaction_);
DiskEncrypted(const String & name_, const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_, const DisksMap & map_);
DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_,
const Poco::Util::AbstractConfiguration & config_, const String & config_prefix_);
DiskEncrypted(const String & name_, std::unique_ptr<const DiskEncryptedSettings> settings_);
const String & getName() const override { return encrypted_name; }
const String & getPath() const override { return disk_absolute_path; }
@ -110,8 +112,6 @@ public:
delegate->listFiles(wrapped_path, file_names);
}
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir) override;
std::unique_ptr<ReadBufferFromFileBase> readFile(
@ -312,17 +312,17 @@ public:
}
}
UInt64 getTotalSpace() const override
std::optional<UInt64> getTotalSpace() const override
{
return delegate->getTotalSpace();
}
UInt64 getAvailableSpace() const override
std::optional<UInt64> getAvailableSpace() const override
{
return delegate->getAvailableSpace();
}
UInt64 getUnreservedSpace() const override
std::optional<UInt64> getUnreservedSpace() const override
{
return delegate->getUnreservedSpace();
}

View File

@ -78,7 +78,7 @@ public:
{}
UInt64 getSize() const override { return size; }
UInt64 getUnreservedSpace() const override { return unreserved_space; }
std::optional<UInt64> getUnreservedSpace() const override { return unreserved_space; }
DiskPtr getDisk(size_t i) const override
{
@ -175,8 +175,11 @@ std::optional<UInt64> DiskLocal::tryReserve(UInt64 bytes)
{
std::lock_guard lock(DiskLocal::reservation_mutex);
UInt64 available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
auto available_space = getAvailableSpace();
UInt64 unreserved_space = available_space
? *available_space - std::min(*available_space, reserved_bytes)
: std::numeric_limits<UInt64>::max();
if (bytes == 0)
{
@ -186,6 +189,8 @@ std::optional<UInt64> DiskLocal::tryReserve(UInt64 bytes)
}
if (unreserved_space >= bytes)
{
if (available_space)
{
LOG_TRACE(
logger,
@ -193,6 +198,16 @@ std::optional<UInt64> DiskLocal::tryReserve(UInt64 bytes)
ReadableSize(bytes),
backQuote(name),
ReadableSize(unreserved_space));
}
else
{
LOG_TRACE(
logger,
"Reserved {} on local disk {}.",
ReadableSize(bytes),
backQuote(name));
}
++reservation_count;
reserved_bytes += bytes;
return {unreserved_space - bytes};
@ -218,14 +233,14 @@ static UInt64 getTotalSpaceByName(const String & name, const String & disk_path,
return total_size - keep_free_space_bytes;
}
UInt64 DiskLocal::getTotalSpace() const
std::optional<UInt64> DiskLocal::getTotalSpace() const
{
if (broken || readonly)
return 0;
return getTotalSpaceByName(name, disk_path, keep_free_space_bytes);
}
UInt64 DiskLocal::getAvailableSpace() const
std::optional<UInt64> DiskLocal::getAvailableSpace() const
{
if (broken || readonly)
return 0;
@ -242,10 +257,10 @@ UInt64 DiskLocal::getAvailableSpace() const
return total_size - keep_free_space_bytes;
}
UInt64 DiskLocal::getUnreservedSpace() const
std::optional<UInt64> DiskLocal::getUnreservedSpace() const
{
std::lock_guard lock(DiskLocal::reservation_mutex);
auto available_space = getAvailableSpace();
auto available_space = *getAvailableSpace();
available_space -= std::min(available_space, reserved_bytes);
return available_space;
}
@ -417,29 +432,12 @@ bool inline isSameDiskType(const IDisk & one, const IDisk & another)
return typeid(one) == typeid(another);
}
void DiskLocal::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
if (isSameDiskType(*this, *to_disk))
{
fs::path to = fs::path(to_disk->getPath()) / to_path;
fs::path from = fs::path(disk_path) / from_path;
if (from_path.ends_with('/'))
from = from.parent_path();
if (fs::is_directory(from))
to /= from.filename();
fs::copy(from, to, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way.
}
else
copyThroughBuffers(from_path, to_disk, to_path, /* copy_root_dir */ true); /// Base implementation.
}
void DiskLocal::copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir)
{
if (isSameDiskType(*this, *to_disk))
fs::copy(from_dir, to_dir, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way.
fs::copy(fs::path(disk_path) / from_dir, fs::path(to_disk->getPath()) / to_dir, fs::copy_options::recursive | fs::copy_options::overwrite_existing); /// Use more optimal way.
else
copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir */ false); /// Base implementation.
IDisk::copyDirectoryContent(from_dir, to_disk, to_dir);
}
SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const
@ -448,7 +446,7 @@ SyncGuardPtr DiskLocal::getDirectorySyncGuard(const String & path) const
}
void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap &)
void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & disk_map)
{
String new_disk_path;
UInt64 new_keep_free_space_bytes;
@ -460,10 +458,13 @@ void DiskLocal::applyNewSettings(const Poco::Util::AbstractConfiguration & confi
if (keep_free_space_bytes != new_keep_free_space_bytes)
keep_free_space_bytes = new_keep_free_space_bytes;
IDisk::applyNewSettings(config, context, config_prefix, disk_map);
}
DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_)
: IDisk(name_)
DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_,
const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
: IDisk(name_, config, config_prefix)
, disk_path(path_)
, keep_free_space_bytes(keep_free_space_bytes_)
, logger(&Poco::Logger::get("DiskLocal"))
@ -472,13 +473,24 @@ DiskLocal::DiskLocal(const String & name_, const String & path_, UInt64 keep_fre
}
DiskLocal::DiskLocal(
const String & name_, const String & path_, UInt64 keep_free_space_bytes_, ContextPtr context, UInt64 local_disk_check_period_ms)
: DiskLocal(name_, path_, keep_free_space_bytes_)
const String & name_, const String & path_, UInt64 keep_free_space_bytes_, ContextPtr context,
const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
: DiskLocal(name_, path_, keep_free_space_bytes_, config, config_prefix)
{
auto local_disk_check_period_ms = config.getUInt("local_disk_check_period_ms", 0);
if (local_disk_check_period_ms > 0)
disk_checker = std::make_unique<DiskLocalCheckThread>(this, context, local_disk_check_period_ms);
}
DiskLocal::DiskLocal(const String & name_, const String & path_)
: IDisk(name_)
, disk_path(path_)
, keep_free_space_bytes(0)
, logger(&Poco::Logger::get("DiskLocal"))
, data_source_description(getLocalDataSourceDescription(disk_path))
{
}
DataSourceDescription DiskLocal::getDataSourceDescription() const
{
return data_source_description;
@ -720,7 +732,7 @@ void registerDiskLocal(DiskFactory & factory, bool global_skip_access_check)
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);
std::shared_ptr<IDisk> disk
= std::make_shared<DiskLocal>(name, path, keep_free_space_bytes, context, config.getUInt("local_disk_check_period_ms", 0));
= std::make_shared<DiskLocal>(name, path, keep_free_space_bytes, context, config, config_prefix);
disk->startup(context, skip_access_check);
return disk;
};

View File

@ -19,23 +19,25 @@ public:
friend class DiskLocalCheckThread;
friend class DiskLocalReservation;
DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_);
DiskLocal(const String & name_, const String & path_, UInt64 keep_free_space_bytes_,
const Poco::Util::AbstractConfiguration & config, const String & config_prefix);
DiskLocal(
const String & name_,
const String & path_,
UInt64 keep_free_space_bytes_,
ContextPtr context,
UInt64 local_disk_check_period_ms);
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix);
DiskLocal(const String & name_, const String & path_);
const String & getPath() const override { return disk_path; }
ReservationPtr reserve(UInt64 bytes) override;
UInt64 getTotalSpace() const override;
UInt64 getAvailableSpace() const override;
UInt64 getUnreservedSpace() const override;
std::optional<UInt64> getTotalSpace() const override;
std::optional<UInt64> getAvailableSpace() const override;
std::optional<UInt64> getUnreservedSpace() const override;
UInt64 getKeepingFreeSpace() const override { return keep_free_space_bytes; }
@ -63,8 +65,6 @@ public:
void replaceFile(const String & from_path, const String & to_path) override;
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir) override;
void listFiles(const String & path, std::vector<String> & file_names) const override;

View File

@ -53,7 +53,7 @@ void DiskSelector::initialize(const Poco::Util::AbstractConfiguration & config,
disks.emplace(
default_disk_name,
std::make_shared<DiskLocal>(
default_disk_name, context->getPath(), 0, context, config.getUInt("local_disk_check_period_ms", 0)));
default_disk_name, context->getPath(), 0, context, config, config_prefix));
}
is_initialized = true;

View File

@ -1,42 +0,0 @@
#pragma once
#include <future>
#include <functional>
namespace DB
{
/// Interface to run task asynchronously with possibility to wait for execution.
class Executor
{
public:
virtual ~Executor() = default;
virtual std::future<void> execute(std::function<void()> task) = 0;
};
/// Executes task synchronously in case when disk doesn't support async operations.
class SyncExecutor : public Executor
{
public:
SyncExecutor() = default;
std::future<void> execute(std::function<void()> task) override
{
auto promise = std::make_shared<std::promise<void>>();
try
{
task();
promise->set_value();
}
catch (...)
{
try
{
promise->set_exception(std::current_exception());
}
catch (...) { }
}
return promise->get_future();
}
};
}

View File

@ -1,5 +1,4 @@
#include "IDisk.h"
#include "Disks/Executor.h"
#include <IO/ReadBufferFromFileBase.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/copyData.h>
@ -80,18 +79,33 @@ UInt128 IDisk::getEncryptedFileIV(const String &) const
using ResultsCollector = std::vector<std::future<void>>;
void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_path, Executor & exec, ResultsCollector & results, bool copy_root_dir, const WriteSettings & settings)
void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_path, ThreadPool & pool, ResultsCollector & results, bool copy_root_dir, const WriteSettings & settings)
{
if (from_disk.isFile(from_path))
{
auto result = exec.execute(
[&from_disk, from_path, &to_disk, to_path, &settings]()
auto promise = std::make_shared<std::promise<void>>();
auto future = promise->get_future();
pool.scheduleOrThrowOnError(
[&from_disk, from_path, &to_disk, to_path, &settings, promise, thread_group = CurrentThread::getGroup()]()
{
setThreadName("DiskCopier");
try
{
SCOPE_EXIT_SAFE(if (thread_group) CurrentThread::detachFromGroupIfNotDetached(););
if (thread_group)
CurrentThread::attachToGroup(thread_group);
from_disk.copyFile(from_path, to_disk, fs::path(to_path) / fileName(from_path), settings);
promise->set_value();
}
catch (...)
{
promise->set_exception(std::current_exception());
}
});
results.push_back(std::move(result));
results.push_back(std::move(future));
}
else
{
@ -104,13 +118,12 @@ void asyncCopy(IDisk & from_disk, String from_path, IDisk & to_disk, String to_p
}
for (auto it = from_disk.iterateDirectory(from_path); it->isValid(); it->next())
asyncCopy(from_disk, it->path(), to_disk, dest, exec, results, true, settings);
asyncCopy(from_disk, it->path(), to_disk, dest, pool, results, true, settings);
}
}
void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path, bool copy_root_dir)
{
auto & exec = to_disk->getExecutor();
ResultsCollector results;
WriteSettings settings;
@ -118,17 +131,12 @@ void IDisk::copyThroughBuffers(const String & from_path, const std::shared_ptr<I
/// Avoid high memory usage. See test_s3_zero_copy_ttl/test.py::test_move_and_s3_memory_usage
settings.s3_allow_parallel_part_upload = false;
asyncCopy(*this, from_path, *to_disk, to_path, exec, results, copy_root_dir, settings);
asyncCopy(*this, from_path, *to_disk, to_path, copying_thread_pool, results, copy_root_dir, settings);
for (auto & result : results)
result.wait();
for (auto & result : results)
result.get();
}
void IDisk::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
{
copyThroughBuffers(from_path, to_disk, to_path, true);
result.get(); /// May rethrow an exception
}
@ -137,7 +145,7 @@ void IDisk::copyDirectoryContent(const String & from_dir, const std::shared_ptr<
if (!to_disk->exists(to_dir))
to_disk->createDirectories(to_dir);
copyThroughBuffers(from_dir, to_disk, to_dir, false);
copyThroughBuffers(from_dir, to_disk, to_dir, /* copy_root_dir */ false);
}
void IDisk::truncateFile(const String &, size_t)
@ -233,4 +241,9 @@ catch (Exception & e)
throw;
}
void IDisk::applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr /*context*/, const String & config_prefix, const DisksMap & /*map*/)
{
copying_thread_pool.setMaxThreads(config.getInt(config_prefix + ".thread_pool_size", 16));
}
}

View File

@ -6,7 +6,6 @@
#include <base/types.h>
#include <Common/CurrentMetrics.h>
#include <Common/Exception.h>
#include <Disks/Executor.h>
#include <Disks/DiskType.h>
#include <IO/ReadSettings.h>
#include <IO/WriteSettings.h>
@ -35,6 +34,12 @@ namespace Poco
}
}
namespace CurrentMetrics
{
extern const Metric IDiskCopierThreads;
extern const Metric IDiskCopierThreadsActive;
}
namespace DB
{
@ -110,9 +115,15 @@ class IDisk : public Space
{
public:
/// Default constructor.
explicit IDisk(const String & name_, std::shared_ptr<Executor> executor_ = std::make_shared<SyncExecutor>())
IDisk(const String & name_, const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
: name(name_)
, executor(executor_)
, copying_thread_pool(CurrentMetrics::IDiskCopierThreads, CurrentMetrics::IDiskCopierThreadsActive, config.getUInt(config_prefix + ".thread_pool_size", 16))
{
}
explicit IDisk(const String & name_)
: name(name_)
, copying_thread_pool(CurrentMetrics::IDiskCopierThreads, CurrentMetrics::IDiskCopierThreadsActive, 16)
{
}
@ -129,13 +140,13 @@ public:
const String & getName() const override { return name; }
/// Total available space on the disk.
virtual UInt64 getTotalSpace() const = 0;
virtual std::optional<UInt64> getTotalSpace() const = 0;
/// Space currently available on the disk.
virtual UInt64 getAvailableSpace() const = 0;
virtual std::optional<UInt64> getAvailableSpace() const = 0;
/// Space available for reservation (available space minus reserved space).
virtual UInt64 getUnreservedSpace() const = 0;
virtual std::optional<UInt64> getUnreservedSpace() const = 0;
/// Amount of bytes which should be kept free on the disk.
virtual UInt64 getKeepingFreeSpace() const { return 0; }
@ -181,9 +192,6 @@ public:
/// If a file with `to_path` path already exists, it will be replaced.
virtual void replaceFile(const String & from_path, const String & to_path) = 0;
/// Recursively copy data containing at `from_path` to `to_path` located at `to_disk`.
virtual void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path);
/// Recursively copy files from from_dir to to_dir. Create to_dir if not exists.
virtual void copyDirectoryContent(const String & from_dir, const std::shared_ptr<IDisk> & to_disk, const String & to_dir);
@ -379,7 +387,7 @@ public:
virtual SyncGuardPtr getDirectorySyncGuard(const String & path) const;
/// Applies new settings for disk in runtime.
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration &, ContextPtr, const String &, const DisksMap &) {}
virtual void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context, const String & config_prefix, const DisksMap & map);
/// Quite leaky abstraction. Some disks can use additional disk to store
/// some parts of metadata. In general case we have only one disk itself and
@ -459,9 +467,6 @@ protected:
const String name;
/// Returns executor to perform asynchronous operations.
virtual Executor & getExecutor() { return *executor; }
/// Base implementation of the function copy().
/// It just opens two files, reads data by portions from the first file, and writes it to the second one.
/// A derived class may override copy() to provide a faster implementation.
@ -470,7 +475,7 @@ protected:
virtual void checkAccessImpl(const String & path);
private:
std::shared_ptr<Executor> executor;
ThreadPool copying_thread_pool;
bool is_custom_disk = false;
/// Check access to the disk.
@ -490,7 +495,7 @@ public:
/// Space available for reservation
/// (with this reservation already take into account).
virtual UInt64 getUnreservedSpace() const = 0;
virtual std::optional<UInt64> getUnreservedSpace() const = 0;
/// Get i-th disk where reservation take place.
virtual DiskPtr getDisk(size_t i = 0) const = 0; /// NOLINT

View File

@ -42,23 +42,17 @@ namespace ErrorCodes
extern const int ARGUMENT_OUT_OF_BOUND;
}
static size_t chooseBufferSize(const ReadSettings & settings, size_t file_size)
{
/// Buffers used for prefetch or pre-download better to have enough size, but not bigger than the whole file.
return std::min<size_t>(std::max<size_t>(settings.prefetch_buffer_size, DBMS_DEFAULT_BUFFER_SIZE), file_size);
}
AsynchronousBoundedReadBuffer::AsynchronousBoundedReadBuffer(
ImplPtr impl_,
IAsynchronousReader & reader_,
const ReadSettings & settings_,
AsyncReadCountersPtr async_read_counters_,
FilesystemReadPrefetchesLogPtr prefetches_log_)
: ReadBufferFromFileBase(chooseBufferSize(settings_, impl_->getFileSize()), nullptr, 0)
: ReadBufferFromFileBase(chooseBufferSizeForRemoteReading(settings_, impl_->getFileSize()), nullptr, 0)
, impl(std::move(impl_))
, read_settings(settings_)
, reader(reader_)
, prefetch_buffer(chooseBufferSize(settings_, impl->getFileSize()))
, prefetch_buffer(chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()))
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, current_reader_id(getRandomASCIIString(8))
, log(&Poco::Logger::get("AsynchronousBoundedReadBuffer"))
@ -111,7 +105,7 @@ void AsynchronousBoundedReadBuffer::prefetch(Priority priority)
last_prefetch_info.submit_time = std::chrono::system_clock::now();
last_prefetch_info.priority = priority;
chassert(prefetch_buffer.size() == chooseBufferSize(read_settings, impl->getFileSize()));
chassert(prefetch_buffer.size() == chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()));
prefetch_future = asyncReadInto(prefetch_buffer.data(), prefetch_buffer.size(), priority);
ProfileEvents::increment(ProfileEvents::RemoteFSPrefetches);
}
@ -190,7 +184,7 @@ bool AsynchronousBoundedReadBuffer::nextImpl()
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::SynchronousRemoteReadWaitMicroseconds);
chassert(memory.size() == chooseBufferSize(read_settings, impl->getFileSize()));
chassert(memory.size() == chooseBufferSizeForRemoteReading(read_settings, impl->getFileSize()));
std::tie(size, offset) = impl->readInto(memory.data(), memory.size(), file_offset_of_buffer_end, bytes_to_ignore);
ProfileEvents::increment(ProfileEvents::RemoteFSUnprefetchedReads);

View File

@ -74,22 +74,19 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
}
void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog(
const FileSegment & file_segment, CachedOnDiskReadBufferFromFile::ReadType type)
const FileSegment::Range & file_segment_range, CachedOnDiskReadBufferFromFile::ReadType type)
{
if (!cache_log)
return;
const auto range = file_segment.range();
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = source_file_path,
.file_segment_range = { range.left, range.right },
.file_segment_range = { file_segment_range.left, file_segment_range.right },
.requested_range = { first_offset, read_until_position },
.file_segment_key = file_segment.key().toString(),
.file_segment_offset = file_segment.offset(),
.file_segment_size = range.size(),
.file_segment_size = file_segment_range.size(),
.read_from_cache_attempted = true,
.read_buffer_id = current_buffer_id,
.profile_counters = std::make_shared<ProfileEvents::Counters::Snapshot>(
@ -498,7 +495,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
auto completed_range = current_file_segment->range();
if (cache_log)
appendFilesystemCacheLog(*current_file_segment, read_type);
appendFilesystemCacheLog(completed_range, read_type);
chassert(file_offset_of_buffer_end > completed_range.right);
@ -521,7 +518,7 @@ CachedOnDiskReadBufferFromFile::~CachedOnDiskReadBufferFromFile()
{
if (cache_log && file_segments && !file_segments->empty())
{
appendFilesystemCacheLog(file_segments->front(), read_type);
appendFilesystemCacheLog(file_segments->front().range(), read_type);
}
}
@ -1090,6 +1087,10 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
first_offset,
file_segments->toString());
/// Release buffer a little bit earlier.
if (read_until_position == file_offset_of_buffer_end)
implementation_buffer.reset();
return result;
}

View File

@ -90,7 +90,7 @@ private:
bool completeFileSegmentAndGetNext();
void appendFilesystemCacheLog(const FileSegment & file_segment, ReadType read_type);
void appendFilesystemCacheLog(const FileSegment::Range & file_segment_range, ReadType read_type);
bool writeCache(char * data, size_t size, size_t offset, FileSegment & file_segment);

View File

@ -2,14 +2,27 @@
#include <IO/SeekableReadBuffer.h>
#include <iostream>
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
#include <Disks/ObjectStorages/Cached/CachedObjectStorage.h>
#include <Common/logger_useful.h>
#include <IO/ReadSettings.h>
#include <IO/SwapHelper.h>
#include <iostream>
#include <base/hex.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <base/hex.h>
#include <Common/logger_useful.h>
using namespace DB;
namespace
{
bool withCache(const ReadSettings & settings)
{
return settings.remote_fs_cache && settings.enable_filesystem_cache
&& (!CurrentThread::getQueryId().empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache
|| !settings.avoid_readthrough_cache_outside_query_context);
}
}
namespace DB
{
@ -18,29 +31,35 @@ namespace ErrorCodes
extern const int CANNOT_SEEK_THROUGH_FILE;
}
size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size)
{
/// Only when cache is used we could download bigger portions of FileSegments than what we actually gonna read within particular task.
if (!withCache(settings))
return settings.remote_fs_buffer_size;
/// Buffers used for prefetch and pre-download better to have enough size, but not bigger than the whole file.
return std::min<size_t>(std::max<size_t>(settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE), file_size);
}
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
bool use_external_buffer_)
: ReadBufferFromFileBase(use_external_buffer_ ? 0 : settings_.remote_fs_buffer_size, nullptr, 0)
: ReadBufferFromFileBase(
use_external_buffer_ ? 0 : chooseBufferSizeForRemoteReading(settings_, getTotalSize(blobs_to_read_)), nullptr, 0)
, settings(settings_)
, blobs_to_read(blobs_to_read_)
, read_buffer_creator(std::move(read_buffer_creator_))
, cache_log(settings.enable_filesystem_cache_log ? cache_log_ : nullptr)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, query_id(CurrentThread::getQueryId())
, use_external_buffer(use_external_buffer_)
, with_cache(withCache(settings))
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
{
if (!blobs_to_read.empty())
current_object = blobs_to_read.front();
with_cache = settings.remote_fs_cache
&& settings.enable_filesystem_cache
&& (!query_id.empty()
|| settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache
|| !settings.avoid_readthrough_cache_outside_query_context);
}
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
@ -90,8 +109,6 @@ void ReadBufferFromRemoteFSGather::appendUncachedReadInfo()
.source_file_path = current_object.remote_path,
.file_segment_range = { 0, current_object.bytes_size },
.cache_type = FilesystemCacheLogElement::CacheType::READ_FROM_FS_BYPASSING_CACHE,
.file_segment_key = {},
.file_segment_offset = {},
.file_segment_size = current_object.bytes_size,
.read_from_cache_attempted = false,
};

View File

@ -73,7 +73,7 @@ private:
const std::shared_ptr<FilesystemCacheLog> cache_log;
const String query_id;
const bool use_external_buffer;
bool with_cache;
const bool with_cache;
size_t read_until_position = 0;
size_t file_offset_of_buffer_end = 0;
@ -86,4 +86,5 @@ private:
Poco::Logger * log;
};
size_t chooseBufferSizeForRemoteReading(const DB::ReadSettings & settings, size_t file_size);
}

View File

@ -49,11 +49,18 @@ IVolume::IVolume(
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Volume must contain at least one disk");
}
UInt64 IVolume::getMaxUnreservedFreeSpace() const
std::optional<UInt64> IVolume::getMaxUnreservedFreeSpace() const
{
UInt64 res = 0;
std::optional<UInt64> res;
for (const auto & disk : disks)
res = std::max(res, disk->getUnreservedSpace());
{
auto disk_unreserved_space = disk->getUnreservedSpace();
if (!disk_unreserved_space)
return std::nullopt; /// There is at least one unlimited disk.
if (!res || *disk_unreserved_space > *res)
res = disk_unreserved_space;
}
return res;
}

View File

@ -74,7 +74,7 @@ public:
virtual VolumeType getType() const = 0;
/// Return biggest unreserved space across all disks
UInt64 getMaxUnreservedFreeSpace() const;
std::optional<UInt64> getMaxUnreservedFreeSpace() const;
DiskPtr getDisk() const { return getDisk(0); }
virtual DiskPtr getDisk(size_t i) const { return disks[i]; }

View File

@ -31,9 +31,6 @@ void registerDiskAzureBlobStorage(DiskFactory & factory, bool global_skip_access
getAzureBlobContainerClient(config, config_prefix),
getAzureBlobStorageSettings(config, config_prefix, context));
uint64_t copy_thread_pool_size = config.getUInt(config_prefix + ".thread_pool_size", 16);
bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);
auto metadata_storage = std::make_shared<MetadataStorageFromDisk>(metadata_disk, "");
std::shared_ptr<IDisk> azure_blob_storage_disk = std::make_shared<DiskObjectStorage>(
@ -42,8 +39,8 @@ void registerDiskAzureBlobStorage(DiskFactory & factory, bool global_skip_access
"DiskAzureBlobStorage",
std::move(metadata_storage),
std::move(azure_object_storage),
send_metadata,
copy_thread_pool_size
config,
config_prefix
);
bool skip_access_check = global_skip_access_check || config.getBool(config_prefix + ".skip_access_check", false);

View File

@ -18,12 +18,6 @@
#include <Poco/Util/AbstractConfiguration.h>
#include <Interpreters/Context.h>
namespace CurrentMetrics
{
extern const Metric DiskObjectStorageAsyncThreads;
extern const Metric DiskObjectStorageAsyncThreadsActive;
}
namespace DB
{
@ -37,55 +31,6 @@ namespace ErrorCodes
extern const int DIRECTORY_DOESNT_EXIST;
}
namespace
{
/// Runs tasks asynchronously using thread pool.
class AsyncThreadPoolExecutor : public Executor
{
public:
AsyncThreadPoolExecutor(const String & name_, int thread_pool_size)
: name(name_)
, pool(CurrentMetrics::DiskObjectStorageAsyncThreads, CurrentMetrics::DiskObjectStorageAsyncThreadsActive, thread_pool_size)
{}
std::future<void> execute(std::function<void()> task) override
{
auto promise = std::make_shared<std::promise<void>>();
pool.scheduleOrThrowOnError(
[promise, task]()
{
try
{
task();
promise->set_value();
}
catch (...)
{
tryLogCurrentException("Failed to run async task");
try
{
promise->set_exception(std::current_exception());
}
catch (...) {}
}
});
return promise->get_future();
}
void setMaxThreads(size_t threads)
{
pool.setMaxThreads(threads);
}
private:
String name;
ThreadPool pool;
};
}
DiskTransactionPtr DiskObjectStorage::createTransaction()
{
@ -105,27 +50,20 @@ DiskTransactionPtr DiskObjectStorage::createObjectStorageTransaction()
send_metadata ? metadata_helper.get() : nullptr);
}
std::shared_ptr<Executor> DiskObjectStorage::getAsyncExecutor(const std::string & log_name, size_t size)
{
static auto reader = std::make_shared<AsyncThreadPoolExecutor>(log_name, size);
return reader;
}
DiskObjectStorage::DiskObjectStorage(
const String & name_,
const String & object_storage_root_path_,
const String & log_name,
MetadataStoragePtr metadata_storage_,
ObjectStoragePtr object_storage_,
bool send_metadata_,
uint64_t thread_pool_size_)
: IDisk(name_, getAsyncExecutor(log_name, thread_pool_size_))
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix)
: IDisk(name_, config, config_prefix)
, object_storage_root_path(object_storage_root_path_)
, log (&Poco::Logger::get("DiskObjectStorage(" + log_name + ")"))
, metadata_storage(std::move(metadata_storage_))
, object_storage(std::move(object_storage_))
, send_metadata(send_metadata_)
, threadpool_size(thread_pool_size_)
, send_metadata(config.getBool(config_prefix + ".send_metadata", false))
, metadata_helper(std::make_unique<DiskObjectStorageRemoteMetadataRestoreHelper>(this, ReadSettings{}))
{}
@ -234,19 +172,23 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
transaction->commit();
}
void DiskObjectStorage::copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path)
void DiskObjectStorage::copyFile( /// NOLINT
const String & from_file_path,
IDisk & to_disk,
const String & to_file_path,
const WriteSettings & settings)
{
/// It's the same object storage disk
if (this == to_disk.get())
if (this == &to_disk)
{
/// It may use s3-server-side copy
auto transaction = createObjectStorageTransaction();
transaction->copyFile(from_path, to_path);
transaction->copyFile(from_file_path, to_file_path);
transaction->commit();
}
else
{
IDisk::copy(from_path, to_disk, to_path);
/// Copy through buffers
IDisk::copyFile(from_file_path, to_disk, to_file_path, settings);
}
}
@ -468,18 +410,25 @@ void DiskObjectStorage::removeSharedRecursive(
transaction->commit();
}
std::optional<UInt64> DiskObjectStorage::tryReserve(UInt64 bytes)
bool DiskObjectStorage::tryReserve(UInt64 bytes)
{
std::lock_guard lock(reservation_mutex);
auto available_space = getAvailableSpace();
UInt64 unreserved_space = available_space - std::min(available_space, reserved_bytes);
if (!available_space)
{
++reservation_count;
reserved_bytes += bytes;
return true;
}
UInt64 unreserved_space = *available_space - std::min(*available_space, reserved_bytes);
if (bytes == 0)
{
LOG_TRACE(log, "Reserved 0 bytes on remote disk {}", backQuote(name));
++reservation_count;
return {unreserved_space};
return true;
}
if (unreserved_space >= bytes)
@ -492,14 +441,14 @@ std::optional<UInt64> DiskObjectStorage::tryReserve(UInt64 bytes)
ReadableSize(unreserved_space));
++reservation_count;
reserved_bytes += bytes;
return {unreserved_space - bytes};
return true;
}
else
{
LOG_TRACE(log, "Could not reserve {} on remote disk {}. Not enough unreserved space", ReadableSize(bytes), backQuote(name));
}
return {};
return false;
}
bool DiskObjectStorage::supportsCache() const
@ -519,14 +468,15 @@ bool DiskObjectStorage::isWriteOnce() const
DiskObjectStoragePtr DiskObjectStorage::createDiskObjectStorage()
{
const auto config_prefix = "storage_configuration.disks." + name;
return std::make_shared<DiskObjectStorage>(
getName(),
object_storage_root_path,
getName(),
metadata_storage,
object_storage,
send_metadata,
threadpool_size);
Context::getGlobalContextInstance()->getConfigRef(),
config_prefix);
}
std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
@ -582,13 +532,12 @@ void DiskObjectStorage::writeFileUsingBlobWritingFunction(const String & path, W
}
void DiskObjectStorage::applyNewSettings(
const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &)
const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String & /*config_prefix*/, const DisksMap & disk_map)
{
/// FIXME we cannot use config_prefix that was passed through arguments because the disk may be wrapped with cache and we need another name
const auto config_prefix = "storage_configuration.disks." + name;
object_storage->applyNewSettings(config, config_prefix, context_);
if (AsyncThreadPoolExecutor * exec = dynamic_cast<AsyncThreadPoolExecutor *>(&getExecutor()))
exec->setMaxThreads(config.getInt(config_prefix + ".thread_pool_size", 16));
IDisk::applyNewSettings(config, context_, config_prefix, disk_map);
}
void DiskObjectStorage::restoreMetadataIfNeeded(

View File

@ -33,8 +33,8 @@ public:
const String & log_name,
MetadataStoragePtr metadata_storage_,
ObjectStoragePtr object_storage_,
bool send_metadata_,
uint64_t thread_pool_size_);
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix);
/// Create fake transaction
DiskTransactionPtr createTransaction() override;
@ -53,11 +53,9 @@ public:
const std::string & getCacheName() const override { return object_storage->getCacheName(); }
UInt64 getTotalSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getAvailableSpace() const override { return std::numeric_limits<UInt64>::max(); }
UInt64 getUnreservedSpace() const override { return std::numeric_limits<UInt64>::max(); }
std::optional<UInt64> getTotalSpace() const override { return {}; }
std::optional<UInt64> getAvailableSpace() const override { return {}; }
std::optional<UInt64> getUnreservedSpace() const override { return {}; }
UInt64 getKeepingFreeSpace() const override { return 0; }
@ -152,7 +150,11 @@ public:
Strings getBlobPath(const String & path) const override;
void writeFileUsingBlobWritingFunction(const String & path, WriteMode mode, WriteBlobFunction && write_blob_function) override;
void copy(const String & from_path, const std::shared_ptr<IDisk> & to_disk, const String & to_path) override;
void copyFile( /// NOLINT
const String & from_file_path,
IDisk & to_disk,
const String & to_file_path,
const WriteSettings & settings = {}) override;
void applyNewSettings(const Poco::Util::AbstractConfiguration & config, ContextPtr context_, const String &, const DisksMap &) override;
@ -198,8 +200,6 @@ public:
NameSet getCacheLayersNames() const override;
#endif
static std::shared_ptr<Executor> getAsyncExecutor(const std::string & log_name, size_t size);
bool supportsStat() const override { return metadata_storage->supportsStat(); }
struct stat stat(const String & path) const override;
@ -222,10 +222,9 @@ private:
UInt64 reservation_count = 0;
std::mutex reservation_mutex;
std::optional<UInt64> tryReserve(UInt64 bytes);
bool tryReserve(UInt64 bytes);
const bool send_metadata;
size_t threadpool_size;
std::unique_ptr<DiskObjectStorageRemoteMetadataRestoreHelper> metadata_helper;
};
@ -243,7 +242,7 @@ public:
UInt64 getSize() const override { return size; }
UInt64 getUnreservedSpace() const override { return unreserved_space; }
std::optional<UInt64> getUnreservedSpace() const override { return unreserved_space; }
DiskPtr getDisk(size_t i) const override;

View File

@ -25,7 +25,7 @@ std::pair<String, DiskPtr> prepareForLocalMetadata(
/// where the metadata files are stored locally
auto metadata_path = getDiskMetadataPath(name, config, config_prefix, context);
fs::create_directories(metadata_path);
auto metadata_disk = std::make_shared<DiskLocal>(name + "-metadata", metadata_path, 0);
auto metadata_disk = std::make_shared<DiskLocal>(name + "-metadata", metadata_path, 0, config, config_prefix);
return std::make_pair(metadata_path, metadata_disk);
}

View File

@ -8,6 +8,14 @@
#include <IO/WriteBufferFromFile.h>
#include <Common/checkStackSize.h>
#include <Common/logger_useful.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics
{
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
namespace DB
{
@ -101,7 +109,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateFileToRestorableSchema
updateObjectMetadata(object.remote_path, metadata);
}
}
void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecursive(const String & path, Futures & results)
void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecursive(const String & path, ThreadPool & pool)
{
checkStackSize(); /// This is needed to prevent stack overflow in case of cyclic symlinks.
@ -120,29 +128,26 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchemaRecu
/// The whole directory can be migrated asynchronously.
if (dir_contains_only_files)
{
auto result = disk->getExecutor().execute([this, path]
pool.scheduleOrThrowOnError([this, path]
{
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
migrateFileToRestorableSchema(it->path());
});
results.push_back(std::move(result));
}
else
{
for (auto it = disk->iterateDirectory(path); it->isValid(); it->next())
if (!disk->isDirectory(it->path()))
{
auto source_path = it->path();
auto result = disk->getExecutor().execute([this, source_path]
if (disk->isDirectory(it->path()))
{
migrateFileToRestorableSchema(source_path);
});
results.push_back(std::move(result));
migrateToRestorableSchemaRecursive(it->path(), pool);
}
else
migrateToRestorableSchemaRecursive(it->path(), results);
{
auto source_path = it->path();
pool.scheduleOrThrowOnError([this, source_path] { migrateFileToRestorableSchema(source_path); });
}
}
}
}
@ -153,16 +158,13 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::migrateToRestorableSchema()
{
LOG_INFO(disk->log, "Start migration to restorable schema for disk {}", disk->name);
Futures results;
ThreadPool pool{CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive};
for (const auto & root : data_roots)
if (disk->exists(root))
migrateToRestorableSchemaRecursive(root + '/', results);
migrateToRestorableSchemaRecursive(root + '/', pool);
for (auto & result : results)
result.wait();
for (auto & result : results)
result.get();
pool.wait();
saveSchemaVersion(RESTORABLE_SCHEMA_VERSION);
}
@ -355,8 +357,8 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
{
LOG_INFO(disk->log, "Starting restore files for disk {}", disk->name);
std::vector<std::future<void>> results;
auto restore_files = [this, &source_object_storage, &restore_information, &results](const RelativePathsWithMetadata & objects)
ThreadPool pool{CurrentMetrics::LocalThread, CurrentMetrics::LocalThreadActive};
auto restore_files = [this, &source_object_storage, &restore_information, &pool](const RelativePathsWithMetadata & objects)
{
std::vector<String> keys_names;
for (const auto & object : objects)
@ -378,12 +380,10 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
if (!keys_names.empty())
{
auto result = disk->getExecutor().execute([this, &source_object_storage, &restore_information, keys_names]()
pool.scheduleOrThrowOnError([this, &source_object_storage, &restore_information, keys_names]()
{
processRestoreFiles(source_object_storage, restore_information.source_path, keys_names);
});
results.push_back(std::move(result));
}
return true;
@ -394,10 +394,7 @@ void DiskObjectStorageRemoteMetadataRestoreHelper::restoreFiles(IObjectStorage *
restore_files(children);
for (auto & result : results)
result.wait();
for (auto & result : results)
result.get();
pool.wait();
LOG_INFO(disk->log, "Files are restored for disk {}", disk->name);

Some files were not shown because too many files have changed in this diff Show More