mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-18 21:51:57 +00:00
Merge branch 'master' into improve_build
This commit is contained in:
commit
a5e80af1e7
46
.github/workflows/master.yml
vendored
46
.github/workflows/master.yml
vendored
@ -887,6 +887,51 @@ jobs:
|
||||
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
|
||||
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
|
||||
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
|
||||
BuilderBinAarch64V80Compat:
|
||||
needs: [DockerHubPush]
|
||||
runs-on: [self-hosted, builder]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/build_check
|
||||
IMAGES_PATH=${{runner.temp}}/images_path
|
||||
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
|
||||
CACHES_PATH=${{runner.temp}}/../ccaches
|
||||
BUILD_NAME=binary_aarch64_v80compat
|
||||
EOF
|
||||
- name: Download changed images
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
name: changed_images
|
||||
path: ${{ env.IMAGES_PATH }}
|
||||
- name: Clear repository
|
||||
run: |
|
||||
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v2
|
||||
with:
|
||||
fetch-depth: 0 # otherwise we will have no info about contributors
|
||||
- name: Build
|
||||
run: |
|
||||
git -C "$GITHUB_WORKSPACE" submodule sync --recursive
|
||||
git -C "$GITHUB_WORKSPACE" submodule update --depth=1 --recursive --init --jobs=10
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
|
||||
- name: Upload build URLs to artifacts
|
||||
if: ${{ success() || failure() }}
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: ${{ env.BUILD_URLS }}
|
||||
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
|
||||
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
|
||||
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
|
||||
############################################################################################
|
||||
##################################### Docker images #######################################
|
||||
############################################################################################
|
||||
@ -972,6 +1017,7 @@ jobs:
|
||||
# - BuilderBinGCC
|
||||
- BuilderBinPPC64
|
||||
- BuilderBinAmd64SSE2
|
||||
- BuilderBinAarch64V80Compat
|
||||
- BuilderBinClangTidy
|
||||
- BuilderDebShared
|
||||
runs-on: [self-hosted, style-checker]
|
||||
|
44
.github/workflows/pull_request.yml
vendored
44
.github/workflows/pull_request.yml
vendored
@ -940,6 +940,49 @@ jobs:
|
||||
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
|
||||
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
|
||||
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
|
||||
BuilderBinAarch64V80Compat:
|
||||
needs: [DockerHubPush, FastTest, StyleCheck]
|
||||
runs-on: [self-hosted, builder]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/build_check
|
||||
IMAGES_PATH=${{runner.temp}}/images_path
|
||||
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
|
||||
CACHES_PATH=${{runner.temp}}/../ccaches
|
||||
BUILD_NAME=binary_aarch64_v80compat
|
||||
EOF
|
||||
- name: Download changed images
|
||||
uses: actions/download-artifact@v2
|
||||
with:
|
||||
name: changed_images
|
||||
path: ${{ env.IMAGES_PATH }}
|
||||
- name: Clear repository
|
||||
run: |
|
||||
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
|
||||
- name: Check out repository code
|
||||
uses: actions/checkout@v2
|
||||
- name: Build
|
||||
run: |
|
||||
git -C "$GITHUB_WORKSPACE" submodule sync --recursive
|
||||
git -C "$GITHUB_WORKSPACE" submodule update --depth=1 --recursive --init --jobs=10
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
|
||||
- name: Upload build URLs to artifacts
|
||||
if: ${{ success() || failure() }}
|
||||
uses: actions/upload-artifact@v2
|
||||
with:
|
||||
name: ${{ env.BUILD_URLS }}
|
||||
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
|
||||
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
|
||||
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
|
||||
############################################################################################
|
||||
##################################### Docker images #######################################
|
||||
############################################################################################
|
||||
@ -1025,6 +1068,7 @@ jobs:
|
||||
# - BuilderBinGCC
|
||||
- BuilderBinPPC64
|
||||
- BuilderBinAmd64SSE2
|
||||
- BuilderBinAarch64V80Compat
|
||||
- BuilderBinClangTidy
|
||||
- BuilderDebShared
|
||||
runs-on: [self-hosted, style-checker]
|
||||
|
@ -1,75 +0,0 @@
|
||||
#pragma once
|
||||
|
||||
#include <map>
|
||||
#include <tuple>
|
||||
#include <mutex>
|
||||
#include "FnTraits.h"
|
||||
|
||||
/**
|
||||
* Caching proxy for a functor that decays to a pointer-to-function.
|
||||
* Saves pairs (func args, func result on args).
|
||||
* Cache size is unlimited. Cache items are evicted only on manual drop.
|
||||
* Invocation/update is O(log(saved cache values)).
|
||||
*
|
||||
* See Common/tests/cached_fn.cpp for examples.
|
||||
*/
|
||||
template <auto * Func>
|
||||
struct CachedFn
|
||||
{
|
||||
private:
|
||||
using Traits = FnTraits<decltype(Func)>;
|
||||
using DecayedArgs = TypeListMap<std::decay_t, typename Traits::Args>;
|
||||
using Key = TypeListChangeRoot<std::tuple, DecayedArgs>;
|
||||
using Result = typename Traits::Ret;
|
||||
|
||||
std::map<Key, Result> cache; // Can't use hashmap as tuples are unhashable by default
|
||||
mutable std::mutex mutex;
|
||||
|
||||
public:
|
||||
template <class ...Args>
|
||||
Result operator()(Args && ...args)
|
||||
{
|
||||
Key key{std::forward<Args>(args)...};
|
||||
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
|
||||
if (auto it = cache.find(key); it != cache.end())
|
||||
return it->second;
|
||||
}
|
||||
|
||||
Result res = std::apply(Func, key);
|
||||
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
cache.emplace(std::move(key), res);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
template <class ...Args>
|
||||
void update(Args && ...args)
|
||||
{
|
||||
Key key{std::forward<Args>(args)...};
|
||||
Result res = std::apply(Func, key);
|
||||
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
// TODO Can't use emplace(std::move(key), ..), causes test_host_ip_change errors.
|
||||
cache[key] = std::move(res);
|
||||
}
|
||||
}
|
||||
|
||||
size_t size() const
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
return cache.size();
|
||||
}
|
||||
|
||||
void drop()
|
||||
{
|
||||
std::lock_guard lock(mutex);
|
||||
cache.clear();
|
||||
}
|
||||
};
|
@ -11,49 +11,86 @@ cmake_push_check_state ()
|
||||
# All of them are unrelated to the instruction set at the host machine
|
||||
# (you can compile for newer instruction set on old machines and vice versa).
|
||||
|
||||
option (ENABLE_SSSE3 "Use SSSE3 instructions on x86_64" 1)
|
||||
option (ENABLE_SSE41 "Use SSE4.1 instructions on x86_64" 1)
|
||||
option (ENABLE_SSE42 "Use SSE4.2 instructions on x86_64" 1)
|
||||
option (ENABLE_PCLMULQDQ "Use pclmulqdq instructions on x86_64" 1)
|
||||
option (ENABLE_POPCNT "Use popcnt instructions on x86_64" 1)
|
||||
option (ENABLE_AVX "Use AVX instructions on x86_64" 0)
|
||||
option (ENABLE_AVX2 "Use AVX2 instructions on x86_64" 0)
|
||||
option (ENABLE_AVX512 "Use AVX512 instructions on x86_64" 0)
|
||||
option (ENABLE_AVX512_VBMI "Use AVX512_VBMI instruction on x86_64 (depends on ENABLE_AVX512)" 0)
|
||||
option (ENABLE_BMI "Use BMI instructions on x86_64" 0)
|
||||
option (ENABLE_AVX2_FOR_SPEC_OP "Use avx2 instructions for specific operations on x86_64" 0)
|
||||
option (ENABLE_AVX512_FOR_SPEC_OP "Use avx512 instructions for specific operations on x86_64" 0)
|
||||
|
||||
# X86: Allow compilation for a SSE2-only target machine. Done by a special build in CI for embedded or very old hardware.
|
||||
option (NO_SSE3_OR_HIGHER "Disable SSE3 or higher on x86_64" 0)
|
||||
if (NO_SSE3_OR_HIGHER)
|
||||
SET(ENABLE_SSSE3 0)
|
||||
SET(ENABLE_SSE41 0)
|
||||
SET(ENABLE_SSE42 0)
|
||||
SET(ENABLE_PCLMULQDQ 0)
|
||||
SET(ENABLE_POPCNT 0)
|
||||
SET(ENABLE_AVX 0)
|
||||
SET(ENABLE_AVX2 0)
|
||||
SET(ENABLE_AVX512 0)
|
||||
SET(ENABLE_AVX512_VBMI 0)
|
||||
SET(ENABLE_BMI 0)
|
||||
SET(ENABLE_AVX2_FOR_SPEC_OP 0)
|
||||
SET(ENABLE_AVX512_FOR_SPEC_OP 0)
|
||||
endif()
|
||||
|
||||
option (ARCH_NATIVE "Add -march=native compiler flag. This makes your binaries non-portable but more performant code may be generated. This option overrides ENABLE_* options for specific instruction set. Highly not recommended to use." 0)
|
||||
|
||||
if (ARCH_NATIVE)
|
||||
set (COMPILER_FLAGS "${COMPILER_FLAGS} -march=native")
|
||||
|
||||
elseif (ARCH_AARCH64)
|
||||
set (COMPILER_FLAGS "${COMPILER_FLAGS} -march=armv8-a+crc+simd+crypto+dotprod+ssbs")
|
||||
# ARM publishes almost every year a new revision of it's ISA [1]. Each version comes with new mandatory and optional features from
|
||||
# which CPU vendors can pick and choose. This creates a lot of variability ... We provide two build "profiles", one for maximum
|
||||
# compatibility intended to run on all 64-bit ARM hardware released after 2013 (e.g. Raspberry Pi 4), and one for modern ARM server
|
||||
# CPUs, (e.g. Graviton).
|
||||
#
|
||||
# [1] https://en.wikipedia.org/wiki/AArch64
|
||||
option (NO_ARMV81_OR_HIGHER "Disable ARMv8.1 or higher on Aarch64 for maximum compatibility with older/embedded hardware." 0)
|
||||
|
||||
if (NO_ARMV81_OR_HIGHER)
|
||||
# crc32 is optional in v8.0 and mandatory in v8.1. Enable it as __crc32()* is used in lot's of places and even very old ARM CPUs
|
||||
# support it.
|
||||
set (COMPILER_FLAGS "${COMPILER_FLAGS} -march=armv8+crc")
|
||||
else ()
|
||||
# ARMv8.2 is quite ancient but the lowest common denominator supported by both Graviton 2 and 3 processors [1]. In particular, it
|
||||
# includes LSE (made mandatory with ARMv8.1) which provides nice speedups without having to fall back to compat flag
|
||||
# "-moutline-atomics" for v8.0 [2, 3, 4] that requires a recent glibc with runtime dispatch helper, limiting our ability to run on
|
||||
# old OSs.
|
||||
#
|
||||
# simd: NEON, introduced as optional in v8.0, A few extensions were added with v8.1 but it's still not mandatory. Enables the
|
||||
# compiler to auto-vectorize.
|
||||
# sve: Scalable Vector Extensions, introduced as optional in v8.2. Available in Graviton 3 but not in Graviton 2, and most likely
|
||||
# also not in CI machines. Compiler support for autovectorization is rudimentary at the time of writing, see [5]. Can be
|
||||
# enabled one-fine-day (TM) but not now.
|
||||
# ssbs: "Speculative Store Bypass Safe". Optional in v8.0, mandatory in v8.5. Meltdown/spectre countermeasure.
|
||||
# crypto: SHA1, SHA256, AES. Optional in v8.0. In v8.4, further algorithms were added but it's still optional, see [6].
|
||||
# dotprod: Scalar vector product (SDOT and UDOT instructions). Probably the most obscure extra flag with doubtful performance benefits
|
||||
# but it has been activated since always, so why not enable it. It's not 100% clear in which revision this flag was
|
||||
# introduced as optional, either in v8.2 [7] or in v8.4 [8].
|
||||
#
|
||||
# [1] https://github.com/aws/aws-graviton-getting-started/blob/main/c-c%2B%2B.md
|
||||
# [2] https://community.arm.com/arm-community-blogs/b/tools-software-ides-blog/posts/making-the-most-of-the-arm-architecture-in-gcc-10
|
||||
# [3] https://mysqlonarm.github.io/ARM-LSE-and-MySQL/
|
||||
# [4] https://dev.to/aws-builders/large-system-extensions-for-aws-graviton-processors-3eci
|
||||
# [5] https://developer.arm.com/tools-and-software/open-source-software/developer-tools/llvm-toolchain/sve-support
|
||||
# [6] https://developer.arm.com/documentation/100067/0612/armclang-Command-line-Options/-mcpu?lang=en
|
||||
# [7] https://gcc.gnu.org/onlinedocs/gcc/ARM-Options.html
|
||||
# [8] https://developer.arm.com/documentation/102651/a/What-are-dot-product-intructions-
|
||||
set (COMPILER_FLAGS "${COMPILER_FLAGS} -march=armv8.2-a+simd+crypto+dotprod+ssbs")
|
||||
endif ()
|
||||
|
||||
elseif (ARCH_PPC64LE)
|
||||
# Note that gcc and clang have support for x86 SSE2 intrinsics when building for PowerPC
|
||||
set (COMPILER_FLAGS "${COMPILER_FLAGS} -maltivec -mcpu=power8 -D__SSE2__=1 -DNO_WARN_X86_INTRINSICS")
|
||||
|
||||
elseif (ARCH_AMD64)
|
||||
option (ENABLE_SSSE3 "Use SSSE3 instructions on x86_64" 1)
|
||||
option (ENABLE_SSE41 "Use SSE4.1 instructions on x86_64" 1)
|
||||
option (ENABLE_SSE42 "Use SSE4.2 instructions on x86_64" 1)
|
||||
option (ENABLE_PCLMULQDQ "Use pclmulqdq instructions on x86_64" 1)
|
||||
option (ENABLE_POPCNT "Use popcnt instructions on x86_64" 1)
|
||||
option (ENABLE_AVX "Use AVX instructions on x86_64" 0)
|
||||
option (ENABLE_AVX2 "Use AVX2 instructions on x86_64" 0)
|
||||
option (ENABLE_AVX512 "Use AVX512 instructions on x86_64" 0)
|
||||
option (ENABLE_AVX512_VBMI "Use AVX512_VBMI instruction on x86_64 (depends on ENABLE_AVX512)" 0)
|
||||
option (ENABLE_BMI "Use BMI instructions on x86_64" 0)
|
||||
option (ENABLE_AVX2_FOR_SPEC_OP "Use avx2 instructions for specific operations on x86_64" 0)
|
||||
option (ENABLE_AVX512_FOR_SPEC_OP "Use avx512 instructions for specific operations on x86_64" 0)
|
||||
|
||||
option (NO_SSE3_OR_HIGHER "Disable SSE3 or higher on x86_64 for maximum compatibility with older/embedded hardware." 0)
|
||||
if (NO_SSE3_OR_HIGHER)
|
||||
SET(ENABLE_SSSE3 0)
|
||||
SET(ENABLE_SSE41 0)
|
||||
SET(ENABLE_SSE42 0)
|
||||
SET(ENABLE_PCLMULQDQ 0)
|
||||
SET(ENABLE_POPCNT 0)
|
||||
SET(ENABLE_AVX 0)
|
||||
SET(ENABLE_AVX2 0)
|
||||
SET(ENABLE_AVX512 0)
|
||||
SET(ENABLE_AVX512_VBMI 0)
|
||||
SET(ENABLE_BMI 0)
|
||||
SET(ENABLE_AVX2_FOR_SPEC_OP 0)
|
||||
SET(ENABLE_AVX512_FOR_SPEC_OP 0)
|
||||
endif()
|
||||
|
||||
set (TEST_FLAG "-mssse3")
|
||||
set (CMAKE_REQUIRED_FLAGS "${TEST_FLAG} -O0")
|
||||
check_cxx_source_compiles("
|
||||
|
@ -128,6 +128,7 @@ def parse_env_variables(
|
||||
DARWIN_SUFFIX = "-darwin"
|
||||
DARWIN_ARM_SUFFIX = "-darwin-aarch64"
|
||||
ARM_SUFFIX = "-aarch64"
|
||||
ARM_V80COMPAT_SUFFIX = "-aarch64-v80compat"
|
||||
FREEBSD_SUFFIX = "-freebsd"
|
||||
PPC_SUFFIX = "-ppc64le"
|
||||
AMD64_SSE2_SUFFIX = "-amd64sse2"
|
||||
@ -140,6 +141,7 @@ def parse_env_variables(
|
||||
is_cross_darwin = compiler.endswith(DARWIN_SUFFIX)
|
||||
is_cross_darwin_arm = compiler.endswith(DARWIN_ARM_SUFFIX)
|
||||
is_cross_arm = compiler.endswith(ARM_SUFFIX)
|
||||
is_cross_arm_v80compat = compiler.endswith(ARM_V80COMPAT_SUFFIX)
|
||||
is_cross_ppc = compiler.endswith(PPC_SUFFIX)
|
||||
is_cross_freebsd = compiler.endswith(FREEBSD_SUFFIX)
|
||||
is_amd64_sse2 = compiler.endswith(AMD64_SSE2_SUFFIX)
|
||||
@ -178,6 +180,13 @@ def parse_env_variables(
|
||||
"-DCMAKE_TOOLCHAIN_FILE=/build/cmake/linux/toolchain-aarch64.cmake"
|
||||
)
|
||||
result.append("DEB_ARCH=arm64")
|
||||
elif is_cross_arm_v80compat:
|
||||
cc = compiler[: -len(ARM_V80COMPAT_SUFFIX)]
|
||||
cmake_flags.append(
|
||||
"-DCMAKE_TOOLCHAIN_FILE=/build/cmake/linux/toolchain-aarch64.cmake"
|
||||
)
|
||||
cmake_flags.append("-DNO_ARMV81_OR_HIGHER=1")
|
||||
result.append("DEB_ARCH=arm64")
|
||||
elif is_cross_freebsd:
|
||||
cc = compiler[: -len(FREEBSD_SUFFIX)]
|
||||
cmake_flags.append(
|
||||
@ -343,6 +352,7 @@ if __name__ == "__main__":
|
||||
"clang-15-darwin",
|
||||
"clang-15-darwin-aarch64",
|
||||
"clang-15-aarch64",
|
||||
"clang-15-aarch64-v80compat",
|
||||
"clang-15-ppc64le",
|
||||
"clang-15-amd64sse2",
|
||||
"clang-15-freebsd",
|
||||
|
@ -33,7 +33,7 @@ RUN arch=${TARGETARCH:-amd64} \
|
||||
# lts / testing / prestable / etc
|
||||
ARG REPO_CHANNEL="stable"
|
||||
ARG REPOSITORY="https://packages.clickhouse.com/tgz/${REPO_CHANNEL}"
|
||||
ARG VERSION="22.8.5.29"
|
||||
ARG VERSION="22.9.2.7"
|
||||
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
|
||||
|
||||
# user/group precreated explicitly with fixed uid/gid on purpose.
|
||||
|
@ -21,7 +21,7 @@ RUN sed -i "s|http://archive.ubuntu.com|${apt_archive}|g" /etc/apt/sources.list
|
||||
|
||||
ARG REPO_CHANNEL="stable"
|
||||
ARG REPOSITORY="deb https://packages.clickhouse.com/deb ${REPO_CHANNEL} main"
|
||||
ARG VERSION="22.8.5.29"
|
||||
ARG VERSION="22.9.2.7"
|
||||
ARG PACKAGES="clickhouse-client clickhouse-server clickhouse-common-static"
|
||||
|
||||
# set non-empty deb_location_url url to create a docker image
|
||||
|
@ -243,7 +243,7 @@ export USE_S3_STORAGE_FOR_MERGE_TREE=1
|
||||
configure
|
||||
|
||||
# But we still need default disk because some tables loaded only into it
|
||||
sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml | sed "s|<disk>s3</disk>|<disk>s3</disk><disk>default</disk>|" > /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp
|
||||
sudo cat /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml | sed "s|<main><disk>s3</disk></main>|<main><disk>s3</disk></main><default><disk>default</disk></default>|" > /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp
|
||||
mv /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml.tmp /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
|
||||
sudo chown clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
|
||||
sudo chgrp clickhouse /etc/clickhouse-server/config.d/s3_storage_policy_by_default.xml
|
||||
|
20
docs/changelogs/v22.9.2.7-stable.md
Normal file
20
docs/changelogs/v22.9.2.7-stable.md
Normal file
@ -0,0 +1,20 @@
|
||||
---
|
||||
sidebar_position: 1
|
||||
sidebar_label: 2022
|
||||
---
|
||||
|
||||
# 2022 Changelog
|
||||
|
||||
### ClickHouse release v22.9.2.7-stable (362e2cefcef) FIXME as compared to v22.9.1.2603-stable (3030d4c7ff0)
|
||||
|
||||
#### Improvement
|
||||
* Backported in [#41709](https://github.com/ClickHouse/ClickHouse/issues/41709): Check file path for path traversal attacks in errors logger for input formats. [#41694](https://github.com/ClickHouse/ClickHouse/pull/41694) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
|
||||
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
|
||||
|
||||
* Backported in [#41696](https://github.com/ClickHouse/ClickHouse/issues/41696): Fixes issue when docker run will fail if "https_port" is not present in config. [#41693](https://github.com/ClickHouse/ClickHouse/pull/41693) ([Yakov Olkhovskiy](https://github.com/yakov-olkhovskiy)).
|
||||
|
||||
#### NOT FOR CHANGELOG / INSIGNIFICANT
|
||||
|
||||
* Fix typos in JSON formats after [#40910](https://github.com/ClickHouse/ClickHouse/issues/40910) [#41614](https://github.com/ClickHouse/ClickHouse/pull/41614) ([Kruglov Pavel](https://github.com/Avogar)).
|
||||
|
@ -131,7 +131,7 @@ Example of configuration for versions later or equal to 22.8:
|
||||
<type>cache</type>
|
||||
<disk>s3</disk>
|
||||
<path>/s3_cache/</path>
|
||||
<max_size>10000000</max_size>
|
||||
<max_size>10Gi</max_size>
|
||||
</cache>
|
||||
</disks>
|
||||
<policies>
|
||||
@ -155,7 +155,7 @@ Example of configuration for versions earlier than 22.8:
|
||||
<endpoint>...</endpoint>
|
||||
... s3 configuration ...
|
||||
<data_cache_enabled>1</data_cache_enabled>
|
||||
<data_cache_max_size>10000000</data_cache_max_size>
|
||||
<data_cache_max_size>10737418240</data_cache_max_size>
|
||||
</s3>
|
||||
</disks>
|
||||
<policies>
|
||||
@ -172,7 +172,7 @@ Cache **configuration settings**:
|
||||
|
||||
- `path` - path to the directory with cache. Default: None, this setting is obligatory.
|
||||
|
||||
- `max_size` - maximum size of the cache in bytes. When the limit is reached, cache files are evicted according to the cache eviction policy. Default: None, this setting is obligatory.
|
||||
- `max_size` - maximum size of the cache in bytes or in readable format, e.g. `ki, Mi, Gi, etc`, example `10Gi` (such format works starting from `22.10` version). When the limit is reached, cache files are evicted according to the cache eviction policy. Default: None, this setting is obligatory.
|
||||
|
||||
- `cache_on_write_operations` - allow to turn on `write-through` cache (caching data on any write operations: `INSERT` queries, background merges). Default: `false`. The `write-through` cache can be disabled per query using setting `enable_filesystem_cache_on_write_operations` (data is cached only if both cache config settings and corresponding query setting are enabled).
|
||||
|
||||
@ -182,7 +182,7 @@ Cache **configuration settings**:
|
||||
|
||||
- `do_not_evict_index_and_mark_files` - do not evict small frequently used files according to cache policy. Default: `false`. This setting was added in version 22.8. If you used filesystem cache before this version, then it will not work on versions starting from 22.8 if this setting is set to `true`. If you want to use this setting, clear old cache created before version 22.8 before upgrading.
|
||||
|
||||
- `max_file_segment_size` - a maximum size of a single cache file. Default: `104857600` (100 Mb).
|
||||
- `max_file_segment_size` - a maximum size of a single cache file in bytes or in readable format (`ki, Mi, Gi, etc`, example `10Gi`). Default: `104857600` (`100Mi`).
|
||||
|
||||
- `max_elements` - a limit for a number of cache files. Default: `1048576`.
|
||||
|
||||
|
@ -179,6 +179,9 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
|
||||
{
|
||||
disconnect();
|
||||
|
||||
/// Remove this possible stale entry from cache
|
||||
DNSResolver::instance().removeHostFromCache(host);
|
||||
|
||||
/// Add server address to exception. Also Exception will remember stack trace. It's a pity that more precise exception type is lost.
|
||||
throw NetException(e.displayText() + " (" + getDescription() + ")", ErrorCodes::NETWORK_ERROR);
|
||||
}
|
||||
@ -186,6 +189,9 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
|
||||
{
|
||||
disconnect();
|
||||
|
||||
/// Remove this possible stale entry from cache
|
||||
DNSResolver::instance().removeHostFromCache(host);
|
||||
|
||||
/// Add server address to exception. Also Exception will remember stack trace. It's a pity that more precise exception type is lost.
|
||||
/// This exception can only be thrown from socket->connect(), so add information about connection timeout.
|
||||
const auto & connection_timeout = static_cast<bool>(secure) ? timeouts.secure_connection_timeout : timeouts.connection_timeout;
|
||||
|
@ -12,14 +12,12 @@
|
||||
#include <Common/RadixSort.h>
|
||||
#include <Common/SipHash.h>
|
||||
#include <Common/WeakHash.h>
|
||||
#include <Common/TargetSpecific.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <base/sort.h>
|
||||
#include <base/unaligned.h>
|
||||
#include <base/bit_cast.h>
|
||||
#include <base/scope_guard.h>
|
||||
|
||||
#include <bit>
|
||||
#include <cmath>
|
||||
#include <cstring>
|
||||
|
||||
@ -27,10 +25,6 @@
|
||||
# include <emmintrin.h>
|
||||
#endif
|
||||
|
||||
#if USE_MULTITARGET_CODE
|
||||
# include <immintrin.h>
|
||||
#endif
|
||||
|
||||
#if USE_EMBEDDED_COMPILER
|
||||
#include <DataTypes/Native.h>
|
||||
#include <llvm/IR/IRBuilder.h>
|
||||
@ -477,128 +471,6 @@ void ColumnVector<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
|
||||
memcpy(data.data() + old_size, &src_vec.data[start], length * sizeof(data[0]));
|
||||
}
|
||||
|
||||
static inline UInt64 blsr(UInt64 mask)
|
||||
{
|
||||
#ifdef __BMI__
|
||||
return _blsr_u64(mask);
|
||||
#else
|
||||
return mask & (mask-1);
|
||||
#endif
|
||||
}
|
||||
|
||||
DECLARE_DEFAULT_CODE(
|
||||
template <typename T, typename Container, size_t SIMD_BYTES>
|
||||
inline void doFilterAligned(const UInt8 *& filt_pos, const UInt8 *& filt_end_aligned, const T *& data_pos, Container & res_data)
|
||||
{
|
||||
while (filt_pos < filt_end_aligned)
|
||||
{
|
||||
UInt64 mask = bytes64MaskToBits64Mask(filt_pos);
|
||||
|
||||
if (0xffffffffffffffff == mask)
|
||||
{
|
||||
res_data.insert(data_pos, data_pos + SIMD_BYTES);
|
||||
}
|
||||
else
|
||||
{
|
||||
while (mask)
|
||||
{
|
||||
size_t index = std::countr_zero(mask);
|
||||
res_data.push_back(data_pos[index]);
|
||||
mask = blsr(mask);
|
||||
}
|
||||
}
|
||||
|
||||
filt_pos += SIMD_BYTES;
|
||||
data_pos += SIMD_BYTES;
|
||||
}
|
||||
}
|
||||
)
|
||||
|
||||
namespace
|
||||
{
|
||||
template <typename T, typename Container>
|
||||
void resize(Container & res_data, size_t reserve_size)
|
||||
{
|
||||
#if defined(MEMORY_SANITIZER)
|
||||
res_data.resize_fill(reserve_size, static_cast<T>(0)); // MSan doesn't recognize that all allocated memory is written by AVX-512 intrinsics.
|
||||
#else
|
||||
res_data.resize(reserve_size);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
DECLARE_AVX512VBMI2_SPECIFIC_CODE(
|
||||
template <size_t ELEMENT_WIDTH>
|
||||
inline void compressStoreAVX512(const void *src, void *dst, const UInt64 mask)
|
||||
{
|
||||
__m512i vsrc = _mm512_loadu_si512(src);
|
||||
if constexpr (ELEMENT_WIDTH == 1)
|
||||
_mm512_mask_compressstoreu_epi8(dst, static_cast<__mmask64>(mask), vsrc);
|
||||
else if constexpr (ELEMENT_WIDTH == 2)
|
||||
_mm512_mask_compressstoreu_epi16(dst, static_cast<__mmask32>(mask), vsrc);
|
||||
else if constexpr (ELEMENT_WIDTH == 4)
|
||||
_mm512_mask_compressstoreu_epi32(dst, static_cast<__mmask16>(mask), vsrc);
|
||||
else if constexpr (ELEMENT_WIDTH == 8)
|
||||
_mm512_mask_compressstoreu_epi64(dst, static_cast<__mmask8>(mask), vsrc);
|
||||
}
|
||||
|
||||
template <typename T, typename Container, size_t SIMD_BYTES>
|
||||
inline void doFilterAligned(const UInt8 *& filt_pos, const UInt8 *& filt_end_aligned, const T *& data_pos, Container & res_data)
|
||||
{
|
||||
static constexpr size_t VEC_LEN = 64; /// AVX512 vector length - 64 bytes
|
||||
static constexpr size_t ELEMENT_WIDTH = sizeof(T);
|
||||
static constexpr size_t ELEMENTS_PER_VEC = VEC_LEN / ELEMENT_WIDTH;
|
||||
static constexpr UInt64 KMASK = 0xffffffffffffffff >> (64 - ELEMENTS_PER_VEC);
|
||||
|
||||
size_t current_offset = res_data.size();
|
||||
size_t reserve_size = res_data.size();
|
||||
size_t alloc_size = SIMD_BYTES * 2;
|
||||
|
||||
while (filt_pos < filt_end_aligned)
|
||||
{
|
||||
/// to avoid calling resize too frequently, resize to reserve buffer.
|
||||
if (reserve_size - current_offset < SIMD_BYTES)
|
||||
{
|
||||
reserve_size += alloc_size;
|
||||
resize<T>(res_data, reserve_size);
|
||||
alloc_size *= 2;
|
||||
}
|
||||
|
||||
UInt64 mask = bytes64MaskToBits64Mask(filt_pos);
|
||||
|
||||
if (0xffffffffffffffff == mask)
|
||||
{
|
||||
for (size_t i = 0; i < SIMD_BYTES; i += ELEMENTS_PER_VEC)
|
||||
_mm512_storeu_si512(reinterpret_cast<void *>(&res_data[current_offset + i]),
|
||||
_mm512_loadu_si512(reinterpret_cast<const void *>(data_pos + i)));
|
||||
current_offset += SIMD_BYTES;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (mask)
|
||||
{
|
||||
for (size_t i = 0; i < SIMD_BYTES; i += ELEMENTS_PER_VEC)
|
||||
{
|
||||
compressStoreAVX512<ELEMENT_WIDTH>(reinterpret_cast<const void *>(data_pos + i),
|
||||
reinterpret_cast<void *>(&res_data[current_offset]), mask & KMASK);
|
||||
current_offset += std::popcount(mask & KMASK);
|
||||
/// prepare mask for next iter, if ELEMENTS_PER_VEC = 64, no next iter
|
||||
if (ELEMENTS_PER_VEC < 64)
|
||||
{
|
||||
mask >>= ELEMENTS_PER_VEC;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
filt_pos += SIMD_BYTES;
|
||||
data_pos += SIMD_BYTES;
|
||||
}
|
||||
/// resize to the real size.
|
||||
res_data.resize(current_offset);
|
||||
}
|
||||
)
|
||||
|
||||
template <typename T>
|
||||
ColumnPtr ColumnVector<T>::filter(const IColumn::Filter & filt, ssize_t result_size_hint) const
|
||||
{
|
||||
@ -624,13 +496,31 @@ ColumnPtr ColumnVector<T>::filter(const IColumn::Filter & filt, ssize_t result_s
|
||||
static constexpr size_t SIMD_BYTES = 64;
|
||||
const UInt8 * filt_end_aligned = filt_pos + size / SIMD_BYTES * SIMD_BYTES;
|
||||
|
||||
#if USE_MULTITARGET_CODE
|
||||
static constexpr bool VBMI2_CAPABLE = sizeof(T) == 1 || sizeof(T) == 2 || sizeof(T) == 4 || sizeof(T) == 8;
|
||||
if (VBMI2_CAPABLE && isArchSupported(TargetArch::AVX512VBMI2))
|
||||
TargetSpecific::AVX512VBMI2::doFilterAligned<T, Container, SIMD_BYTES>(filt_pos, filt_end_aligned, data_pos, res_data);
|
||||
else
|
||||
#endif
|
||||
TargetSpecific::Default::doFilterAligned<T, Container, SIMD_BYTES>(filt_pos, filt_end_aligned, data_pos, res_data);
|
||||
while (filt_pos < filt_end_aligned)
|
||||
{
|
||||
UInt64 mask = bytes64MaskToBits64Mask(filt_pos);
|
||||
|
||||
if (0xffffffffffffffff == mask)
|
||||
{
|
||||
res_data.insert(data_pos, data_pos + SIMD_BYTES);
|
||||
}
|
||||
else
|
||||
{
|
||||
while (mask)
|
||||
{
|
||||
size_t index = std::countr_zero(mask);
|
||||
res_data.push_back(data_pos[index]);
|
||||
#ifdef __BMI__
|
||||
mask = _blsr_u64(mask);
|
||||
#else
|
||||
mask = mask & (mask-1);
|
||||
#endif
|
||||
}
|
||||
}
|
||||
|
||||
filt_pos += SIMD_BYTES;
|
||||
data_pos += SIMD_BYTES;
|
||||
}
|
||||
|
||||
while (filt_pos < filt_end)
|
||||
{
|
||||
|
@ -1,158 +0,0 @@
|
||||
#include <limits>
|
||||
#include <typeinfo>
|
||||
#include <vector>
|
||||
#include <Columns/ColumnsNumber.h>
|
||||
#include <Common/randomSeed.h>
|
||||
#include <gtest/gtest.h>
|
||||
|
||||
using namespace DB;
|
||||
|
||||
static pcg64 rng(randomSeed());
|
||||
static constexpr int error_code = 12345;
|
||||
static constexpr size_t TEST_RUNS = 500;
|
||||
static constexpr size_t MAX_ROWS = 10000;
|
||||
static const std::vector<size_t> filter_ratios = {1, 2, 5, 11, 32, 64, 100, 1000};
|
||||
static const size_t K = filter_ratios.size();
|
||||
|
||||
template <typename T>
|
||||
static MutableColumnPtr createColumn(size_t n)
|
||||
{
|
||||
auto column = ColumnVector<T>::create();
|
||||
auto & values = column->getData();
|
||||
|
||||
for (size_t i = 0; i < n; ++i)
|
||||
{
|
||||
values.push_back(i);
|
||||
}
|
||||
|
||||
return column;
|
||||
}
|
||||
|
||||
bool checkFilter(const PaddedPODArray<UInt8> &flit, const IColumn & src, const IColumn & dst)
|
||||
{
|
||||
size_t n = flit.size();
|
||||
size_t dst_size = dst.size();
|
||||
size_t j = 0; /// index of dest
|
||||
for (size_t i = 0; i < n; ++i)
|
||||
{
|
||||
if (flit[i] != 0)
|
||||
{
|
||||
if ((dst_size <= j) || (src.compareAt(i, j, dst, 0) != 0))
|
||||
return false;
|
||||
j++;
|
||||
}
|
||||
}
|
||||
return dst_size == j; /// filtered size check
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
static void testFilter()
|
||||
{
|
||||
auto test_case = [&](size_t rows, size_t filter_ratio)
|
||||
{
|
||||
auto vector_column = createColumn<T>(rows);
|
||||
PaddedPODArray<UInt8> flit(rows);
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
flit[i] = rng() % filter_ratio == 0;
|
||||
auto res_column = vector_column->filter(flit, -1);
|
||||
|
||||
if (!checkFilter(flit, *vector_column, *res_column))
|
||||
throw Exception(error_code, "VectorColumn filter failure, type: {}", typeid(T).name());
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
for (size_t i = 0; i < TEST_RUNS; ++i)
|
||||
{
|
||||
size_t rows = rng() % MAX_ROWS + 1;
|
||||
size_t filter_ratio = filter_ratios[rng() % K];
|
||||
|
||||
test_case(rows, filter_ratio);
|
||||
}
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
FAIL() << e.displayText();
|
||||
}
|
||||
}
|
||||
|
||||
TEST(ColumnVector, Filter)
|
||||
{
|
||||
testFilter<UInt8>();
|
||||
testFilter<Int16>();
|
||||
testFilter<UInt32>();
|
||||
testFilter<Int64>();
|
||||
testFilter<UInt128>();
|
||||
testFilter<Int256>();
|
||||
testFilter<Float32>();
|
||||
testFilter<Float64>();
|
||||
testFilter<UUID>();
|
||||
}
|
||||
|
||||
template <typename T>
|
||||
static MutableColumnPtr createIndexColumn(size_t limit, size_t rows)
|
||||
{
|
||||
auto column = ColumnVector<T>::create();
|
||||
auto & values = column->getData();
|
||||
auto max = std::numeric_limits<T>::max();
|
||||
limit = limit > max ? max : limit;
|
||||
|
||||
for (size_t i = 0; i < rows; ++i)
|
||||
{
|
||||
T val = rng() % limit;
|
||||
values.push_back(val);
|
||||
}
|
||||
|
||||
return column;
|
||||
}
|
||||
|
||||
template <typename T, typename IndexType>
|
||||
static void testIndex()
|
||||
{
|
||||
static const std::vector<size_t> column_sizes = {64, 128, 196, 256, 512};
|
||||
|
||||
auto test_case = [&](size_t rows, size_t index_rows, size_t limit)
|
||||
{
|
||||
auto vector_column = createColumn<T>(rows);
|
||||
auto index_column = createIndexColumn<IndexType>(rows, index_rows);
|
||||
auto res_column = vector_column->index(*index_column, limit);
|
||||
if (limit == 0)
|
||||
limit = index_column->size();
|
||||
|
||||
/// check results
|
||||
if (limit != res_column->size())
|
||||
throw Exception(error_code, "ColumnVector index size not match to limit: {} {}", typeid(T).name(), typeid(IndexType).name());
|
||||
for (size_t i = 0; i < limit; ++i)
|
||||
{
|
||||
/// vector_column data is the same as index, so indexed column's value will equals to index_column.
|
||||
if (res_column->get64(i) != index_column->get64(i))
|
||||
throw Exception(error_code, "ColumnVector index fail: {} {}", typeid(T).name(), typeid(IndexType).name());
|
||||
}
|
||||
};
|
||||
|
||||
try
|
||||
{
|
||||
for (size_t i = 0; i < TEST_RUNS; ++i)
|
||||
{
|
||||
/// make sure rows distribute in (column_sizes[r-1], colulmn_sizes[r]]
|
||||
size_t row_idx = rng() % column_sizes.size();
|
||||
size_t row_base = row_idx > 0 ? column_sizes[row_idx - 1] : 0;
|
||||
size_t rows = row_base + (rng() % (column_sizes[row_idx] - row_base) + 1);
|
||||
size_t index_rows = rng() % MAX_ROWS + 1;
|
||||
|
||||
test_case(rows, index_rows, 0);
|
||||
test_case(rows, index_rows, static_cast<size_t>(0.5 * index_rows));
|
||||
}
|
||||
}
|
||||
catch (const Exception & e)
|
||||
{
|
||||
FAIL() << e.displayText();
|
||||
}
|
||||
}
|
||||
|
||||
TEST(ColumnVector, Index)
|
||||
{
|
||||
testIndex<UInt8, UInt8>();
|
||||
testIndex<UInt16, UInt8>();
|
||||
testIndex<UInt16, UInt16>();
|
||||
}
|
@ -82,7 +82,6 @@ inline bool cpuid(UInt32 op, UInt32 * res) noexcept /// NOLINT
|
||||
OP(AVX512BW) \
|
||||
OP(AVX512VL) \
|
||||
OP(AVX512VBMI) \
|
||||
OP(AVX512VBMI2) \
|
||||
OP(PREFETCHWT1) \
|
||||
OP(SHA) \
|
||||
OP(ADX) \
|
||||
@ -303,11 +302,6 @@ bool haveAVX512VBMI() noexcept
|
||||
return haveAVX512F() && ((CpuInfo(0x7, 0).registers.ecx >> 1) & 1u);
|
||||
}
|
||||
|
||||
bool haveAVX512VBMI2() noexcept
|
||||
{
|
||||
return haveAVX512F() && ((CpuInfo(0x7, 0).registers.ecx >> 6) & 1u);
|
||||
}
|
||||
|
||||
bool haveRDRAND() noexcept
|
||||
{
|
||||
return CpuInfo(0x0).registers.eax >= 0x7 && ((CpuInfo(0x1).registers.ecx >> 30) & 1u);
|
||||
|
@ -1,7 +1,8 @@
|
||||
#include "DNSResolver.h"
|
||||
#include <base/CachedFn.h>
|
||||
#include <Common/CacheBase.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <Common/ProfileEvents.h>
|
||||
#include <Common/thread_local_rng.h>
|
||||
#include <Core/Names.h>
|
||||
#include <base/types.h>
|
||||
#include <Poco/Net/IPAddress.h>
|
||||
@ -12,6 +13,7 @@
|
||||
#include <atomic>
|
||||
#include <optional>
|
||||
#include <string_view>
|
||||
#include <unordered_set>
|
||||
#include "DNSPTRResolverProvider.h"
|
||||
|
||||
namespace ProfileEvents
|
||||
@ -41,9 +43,11 @@ namespace ErrorCodes
|
||||
extern const int DNS_ERROR;
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
/// Slightly altered implementation from https://github.com/pocoproject/poco/blob/poco-1.6.1/Net/src/SocketAddress.cpp#L86
|
||||
static void splitHostAndPort(const std::string & host_and_port, std::string & out_host, UInt16 & out_port)
|
||||
void splitHostAndPort(const std::string & host_and_port, std::string & out_host, UInt16 & out_port)
|
||||
{
|
||||
String port_str;
|
||||
out_host.clear();
|
||||
@ -84,7 +88,7 @@ static void splitHostAndPort(const std::string & host_and_port, std::string & ou
|
||||
throw Exception("Port must be numeric", ErrorCodes::BAD_ARGUMENTS);
|
||||
}
|
||||
|
||||
static DNSResolver::IPAddresses hostByName(const std::string & host)
|
||||
DNSResolver::IPAddresses hostByName(const std::string & host)
|
||||
{
|
||||
/// Do not resolve IPv6 (or IPv4) if no local IPv6 (or IPv4) addresses are configured.
|
||||
/// It should not affect client address checking, since client cannot connect from IPv6 address
|
||||
@ -112,7 +116,7 @@ static DNSResolver::IPAddresses hostByName(const std::string & host)
|
||||
return addresses;
|
||||
}
|
||||
|
||||
static DNSResolver::IPAddresses resolveIPAddressImpl(const std::string & host)
|
||||
DNSResolver::IPAddresses resolveIPAddressImpl(const std::string & host)
|
||||
{
|
||||
Poco::Net::IPAddress ip;
|
||||
|
||||
@ -136,7 +140,13 @@ static DNSResolver::IPAddresses resolveIPAddressImpl(const std::string & host)
|
||||
return addresses;
|
||||
}
|
||||
|
||||
static std::unordered_set<String> reverseResolveImpl(const Poco::Net::IPAddress & address)
|
||||
DNSResolver::IPAddresses resolveIPAddressWithCache(CacheBase<std::string, DNSResolver::IPAddresses> & cache, const std::string & host)
|
||||
{
|
||||
auto [result, _ ] = cache.getOrSet(host, [&host]() { return std::make_shared<DNSResolver::IPAddresses>(resolveIPAddressImpl(host)); });
|
||||
return *result;
|
||||
}
|
||||
|
||||
std::unordered_set<String> reverseResolveImpl(const Poco::Net::IPAddress & address)
|
||||
{
|
||||
auto ptr_resolver = DB::DNSPTRResolverProvider::get();
|
||||
|
||||
@ -149,13 +159,27 @@ static std::unordered_set<String> reverseResolveImpl(const Poco::Net::IPAddress
|
||||
}
|
||||
}
|
||||
|
||||
std::unordered_set<String> reverseResolveWithCache(
|
||||
CacheBase<Poco::Net::IPAddress, std::unordered_set<std::string>> & cache, const Poco::Net::IPAddress & address)
|
||||
{
|
||||
auto [result, _ ] = cache.getOrSet(address, [&address]() { return std::make_shared<std::unordered_set<String>>(reverseResolveImpl(address)); });
|
||||
return *result;
|
||||
}
|
||||
|
||||
Poco::Net::IPAddress pickAddress(const DNSResolver::IPAddresses & addresses)
|
||||
{
|
||||
return addresses.front();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
struct DNSResolver::Impl
|
||||
{
|
||||
using HostWithConsecutiveFailures = std::unordered_map<String, UInt32>;
|
||||
using AddressWithConsecutiveFailures = std::unordered_map<Poco::Net::IPAddress, UInt32>;
|
||||
|
||||
CachedFn<&resolveIPAddressImpl> cache_host;
|
||||
CachedFn<&reverseResolveImpl> cache_address;
|
||||
CacheBase<std::string, DNSResolver::IPAddresses> cache_host{100};
|
||||
CacheBase<Poco::Net::IPAddress, std::unordered_set<std::string>> cache_address{100};
|
||||
|
||||
std::mutex drop_mutex;
|
||||
std::mutex update_mutex;
|
||||
@ -180,7 +204,7 @@ DNSResolver::DNSResolver() : impl(std::make_unique<DNSResolver::Impl>()), log(&P
|
||||
|
||||
Poco::Net::IPAddress DNSResolver::resolveHost(const std::string & host)
|
||||
{
|
||||
return resolveHostAll(host).front();
|
||||
return pickAddress(resolveHostAll(host));
|
||||
}
|
||||
|
||||
DNSResolver::IPAddresses DNSResolver::resolveHostAll(const std::string & host)
|
||||
@ -189,7 +213,7 @@ DNSResolver::IPAddresses DNSResolver::resolveHostAll(const std::string & host)
|
||||
return resolveIPAddressImpl(host);
|
||||
|
||||
addToNewHosts(host);
|
||||
return impl->cache_host(host);
|
||||
return resolveIPAddressWithCache(impl->cache_host, host);
|
||||
}
|
||||
|
||||
Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host_and_port)
|
||||
@ -202,7 +226,7 @@ Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host_an
|
||||
splitHostAndPort(host_and_port, host, port);
|
||||
|
||||
addToNewHosts(host);
|
||||
return Poco::Net::SocketAddress(impl->cache_host(host).front(), port);
|
||||
return Poco::Net::SocketAddress(pickAddress(resolveIPAddressWithCache(impl->cache_host, host)), port);
|
||||
}
|
||||
|
||||
Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host, UInt16 port)
|
||||
@ -211,7 +235,7 @@ Poco::Net::SocketAddress DNSResolver::resolveAddress(const std::string & host, U
|
||||
return Poco::Net::SocketAddress(host, port);
|
||||
|
||||
addToNewHosts(host);
|
||||
return Poco::Net::SocketAddress(impl->cache_host(host).front(), port);
|
||||
return Poco::Net::SocketAddress(pickAddress(resolveIPAddressWithCache(impl->cache_host, host)), port);
|
||||
}
|
||||
|
||||
std::vector<Poco::Net::SocketAddress> DNSResolver::resolveAddressList(const std::string & host, UInt16 port)
|
||||
@ -224,7 +248,7 @@ std::vector<Poco::Net::SocketAddress> DNSResolver::resolveAddressList(const std:
|
||||
if (!impl->disable_cache)
|
||||
addToNewHosts(host);
|
||||
|
||||
std::vector<Poco::Net::IPAddress> ips = impl->disable_cache ? hostByName(host) : impl->cache_host(host);
|
||||
std::vector<Poco::Net::IPAddress> ips = impl->disable_cache ? hostByName(host) : resolveIPAddressWithCache(impl->cache_host, host);
|
||||
auto ips_end = std::unique(ips.begin(), ips.end());
|
||||
|
||||
addresses.reserve(ips_end - ips.begin());
|
||||
@ -240,13 +264,13 @@ std::unordered_set<String> DNSResolver::reverseResolve(const Poco::Net::IPAddres
|
||||
return reverseResolveImpl(address);
|
||||
|
||||
addToNewAddresses(address);
|
||||
return impl->cache_address(address);
|
||||
return reverseResolveWithCache(impl->cache_address, address);
|
||||
}
|
||||
|
||||
void DNSResolver::dropCache()
|
||||
{
|
||||
impl->cache_host.drop();
|
||||
impl->cache_address.drop();
|
||||
impl->cache_host.reset();
|
||||
impl->cache_address.reset();
|
||||
|
||||
std::scoped_lock lock(impl->update_mutex, impl->drop_mutex);
|
||||
|
||||
@ -257,6 +281,11 @@ void DNSResolver::dropCache()
|
||||
impl->host_name.reset();
|
||||
}
|
||||
|
||||
void DNSResolver::removeHostFromCache(const std::string & host)
|
||||
{
|
||||
impl->cache_host.remove(host);
|
||||
}
|
||||
|
||||
void DNSResolver::setDisableCacheFlag(bool is_disabled)
|
||||
{
|
||||
impl->disable_cache = is_disabled;
|
||||
@ -378,17 +407,20 @@ bool DNSResolver::updateCache(UInt32 max_consecutive_failures)
|
||||
|
||||
bool DNSResolver::updateHost(const String & host)
|
||||
{
|
||||
/// Usage of updateHost implies that host is already in cache and there is no extra computations
|
||||
auto old_value = impl->cache_host(host);
|
||||
impl->cache_host.update(host);
|
||||
return old_value != impl->cache_host(host);
|
||||
const auto old_value = resolveIPAddressWithCache(impl->cache_host, host);
|
||||
auto new_value = resolveIPAddressImpl(host);
|
||||
const bool result = old_value != new_value;
|
||||
impl->cache_host.set(host, std::make_shared<DNSResolver::IPAddresses>(std::move(new_value)));
|
||||
return result;
|
||||
}
|
||||
|
||||
bool DNSResolver::updateAddress(const Poco::Net::IPAddress & address)
|
||||
{
|
||||
auto old_value = impl->cache_address(address);
|
||||
impl->cache_address.update(address);
|
||||
return old_value == impl->cache_address(address);
|
||||
const auto old_value = reverseResolveWithCache(impl->cache_address, address);
|
||||
auto new_value = reverseResolveImpl(address);
|
||||
const bool result = old_value != new_value;
|
||||
impl->cache_address.set(address, std::make_shared<std::unordered_set<String>>(std::move(new_value)));
|
||||
return result;
|
||||
}
|
||||
|
||||
void DNSResolver::addToNewHosts(const String & host)
|
||||
|
@ -18,6 +18,7 @@ class DNSResolver : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
using IPAddresses = std::vector<Poco::Net::IPAddress>;
|
||||
using IPAddressesPtr = std::shared_ptr<IPAddresses>;
|
||||
|
||||
static DNSResolver & instance();
|
||||
|
||||
@ -48,6 +49,9 @@ public:
|
||||
/// Drops all caches
|
||||
void dropCache();
|
||||
|
||||
/// Removes an entry from cache or does nothing
|
||||
void removeHostFromCache(const std::string & host);
|
||||
|
||||
/// Updates all known hosts in cache.
|
||||
/// Returns true if IP of any host has been changed or an element was dropped (too many failures)
|
||||
bool updateCache(UInt32 max_consecutive_failures);
|
||||
|
@ -4,14 +4,15 @@
|
||||
#include <Common/Elf.h>
|
||||
#include <Common/SymbolIndex.h>
|
||||
#include <Common/MemorySanitizer.h>
|
||||
#include <base/CachedFn.h>
|
||||
#include <base/demangle.h>
|
||||
|
||||
#include <atomic>
|
||||
#include <cstring>
|
||||
#include <filesystem>
|
||||
#include <mutex>
|
||||
#include <sstream>
|
||||
#include <unordered_map>
|
||||
#include <map>
|
||||
|
||||
#include <Common/config.h>
|
||||
|
||||
@ -462,20 +463,36 @@ std::string StackTrace::toString(void ** frame_pointers_, size_t offset, size_t
|
||||
return toStringStatic(frame_pointers_copy, offset, size);
|
||||
}
|
||||
|
||||
static CachedFn<&toStringImpl> & cacheInstance()
|
||||
using StackTraceRepresentation = std::tuple<StackTrace::FramePointers, size_t, size_t>;
|
||||
using StackTraceCache = std::map<StackTraceRepresentation, std::string>;
|
||||
|
||||
static StackTraceCache & cacheInstance()
|
||||
{
|
||||
static CachedFn<&toStringImpl> cache;
|
||||
static StackTraceCache cache;
|
||||
return cache;
|
||||
}
|
||||
|
||||
static std::mutex stacktrace_cache_mutex;
|
||||
|
||||
std::string StackTrace::toStringStatic(const StackTrace::FramePointers & frame_pointers, size_t offset, size_t size)
|
||||
{
|
||||
/// Calculation of stack trace text is extremely slow.
|
||||
/// We use simple cache because otherwise the server could be overloaded by trash queries.
|
||||
return cacheInstance()(frame_pointers, offset, size);
|
||||
/// Note that this cache can grow unconditionally, but practically it should be small.
|
||||
std::lock_guard lock{stacktrace_cache_mutex};
|
||||
|
||||
StackTraceRepresentation key{frame_pointers, offset, size};
|
||||
auto & cache = cacheInstance();
|
||||
if (cache.contains(key))
|
||||
return cache[key];
|
||||
|
||||
auto result = toStringImpl(frame_pointers, offset, size);
|
||||
cache[key] = result;
|
||||
return result;
|
||||
}
|
||||
|
||||
void StackTrace::dropCache()
|
||||
{
|
||||
cacheInstance().drop();
|
||||
std::lock_guard lock{stacktrace_cache_mutex};
|
||||
cacheInstance().clear();
|
||||
}
|
||||
|
@ -20,8 +20,6 @@ UInt32 getSupportedArchs()
|
||||
result |= static_cast<UInt32>(TargetArch::AVX512BW);
|
||||
if (Cpu::CpuFlagsCache::have_AVX512VBMI)
|
||||
result |= static_cast<UInt32>(TargetArch::AVX512VBMI);
|
||||
if (Cpu::CpuFlagsCache::have_AVX512VBMI2)
|
||||
result |= static_cast<UInt32>(TargetArch::AVX512VBMI2);
|
||||
return result;
|
||||
}
|
||||
|
||||
@ -40,9 +38,8 @@ String toString(TargetArch arch)
|
||||
case TargetArch::AVX: return "avx";
|
||||
case TargetArch::AVX2: return "avx2";
|
||||
case TargetArch::AVX512F: return "avx512f";
|
||||
case TargetArch::AVX512BW: return "avx512bw";
|
||||
case TargetArch::AVX512VBMI: return "avx512vbmi";
|
||||
case TargetArch::AVX512VBMI2: return "avx512vbmi";
|
||||
case TargetArch::AVX512BW: return "avx512bw";
|
||||
case TargetArch::AVX512VBMI: return "avx512vbmi";
|
||||
}
|
||||
|
||||
__builtin_unreachable();
|
||||
|
@ -31,7 +31,7 @@
|
||||
* int funcImpl() {
|
||||
* return 2;
|
||||
* }
|
||||
* ) // DECLARE_AVX2_SPECIFIC_CODE
|
||||
* ) // DECLARE_DEFAULT_CODE
|
||||
*
|
||||
* int func() {
|
||||
* #if USE_MULTITARGET_CODE
|
||||
@ -80,9 +80,8 @@ enum class TargetArch : UInt32
|
||||
AVX = (1 << 1),
|
||||
AVX2 = (1 << 2),
|
||||
AVX512F = (1 << 3),
|
||||
AVX512BW = (1 << 4),
|
||||
AVX512VBMI = (1 << 5),
|
||||
AVX512VBMI2 = (1 << 6),
|
||||
AVX512BW = (1 << 4),
|
||||
AVX512VBMI = (1 << 5),
|
||||
};
|
||||
|
||||
/// Runtime detection.
|
||||
@ -101,7 +100,6 @@ String toString(TargetArch arch);
|
||||
|
||||
#if defined(__clang__)
|
||||
|
||||
#define AVX512VBMI2_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi,avx512vbmi2")))
|
||||
#define AVX512VBMI_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi")))
|
||||
#define AVX512BW_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw")))
|
||||
#define AVX512_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f")))
|
||||
@ -110,8 +108,6 @@ String toString(TargetArch arch);
|
||||
#define SSE42_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt")))
|
||||
#define DEFAULT_FUNCTION_SPECIFIC_ATTRIBUTE
|
||||
|
||||
# define BEGIN_AVX512VBMI2_SPECIFIC_CODE \
|
||||
_Pragma("clang attribute push(__attribute__((target(\"sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi,avx512vbmi2\"))),apply_to=function)")
|
||||
# define BEGIN_AVX512VBMI_SPECIFIC_CODE \
|
||||
_Pragma("clang attribute push(__attribute__((target(\"sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi\"))),apply_to=function)")
|
||||
# define BEGIN_AVX512BW_SPECIFIC_CODE \
|
||||
@ -133,7 +129,6 @@ String toString(TargetArch arch);
|
||||
# define DUMMY_FUNCTION_DEFINITION [[maybe_unused]] void _dummy_function_definition();
|
||||
#else
|
||||
|
||||
#define AVX512VBMI2_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi,avx512vbmi2,tune=native")))
|
||||
#define AVX512VBMI_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi,tune=native")))
|
||||
#define AVX512BW_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,tune=native")))
|
||||
#define AVX512_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,tune=native")))
|
||||
@ -142,9 +137,6 @@ String toString(TargetArch arch);
|
||||
#define SSE42_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt",tune=native)))
|
||||
#define DEFAULT_FUNCTION_SPECIFIC_ATTRIBUTE
|
||||
|
||||
# define BEGIN_AVX512VBMI2_SPECIFIC_CODE \
|
||||
_Pragma("GCC push_options") \
|
||||
_Pragma("GCC target(\"sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi,avx512vbmi2,tune=native\")")
|
||||
# define BEGIN_AVX512VBMI_SPECIFIC_CODE \
|
||||
_Pragma("GCC push_options") \
|
||||
_Pragma("GCC target(\"sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi,tune=native\")")
|
||||
@ -225,16 +217,6 @@ namespace TargetSpecific::AVX512VBMI { \
|
||||
} \
|
||||
END_TARGET_SPECIFIC_CODE
|
||||
|
||||
#define DECLARE_AVX512VBMI2_SPECIFIC_CODE(...) \
|
||||
BEGIN_AVX512VBMI2_SPECIFIC_CODE \
|
||||
namespace TargetSpecific::AVX512VBMI2 { \
|
||||
DUMMY_FUNCTION_DEFINITION \
|
||||
using namespace DB::TargetSpecific::AVX512VBMI2; \
|
||||
__VA_ARGS__ \
|
||||
} \
|
||||
END_TARGET_SPECIFIC_CODE
|
||||
|
||||
|
||||
#else
|
||||
|
||||
#define USE_MULTITARGET_CODE 0
|
||||
@ -247,7 +229,6 @@ END_TARGET_SPECIFIC_CODE
|
||||
#define DECLARE_AVX512F_SPECIFIC_CODE(...)
|
||||
#define DECLARE_AVX512BW_SPECIFIC_CODE(...)
|
||||
#define DECLARE_AVX512VBMI_SPECIFIC_CODE(...)
|
||||
#define DECLARE_AVX512VBMI2_SPECIFIC_CODE(...)
|
||||
|
||||
#endif
|
||||
|
||||
@ -264,9 +245,8 @@ DECLARE_SSE42_SPECIFIC_CODE (__VA_ARGS__) \
|
||||
DECLARE_AVX_SPECIFIC_CODE (__VA_ARGS__) \
|
||||
DECLARE_AVX2_SPECIFIC_CODE (__VA_ARGS__) \
|
||||
DECLARE_AVX512F_SPECIFIC_CODE(__VA_ARGS__) \
|
||||
DECLARE_AVX512BW_SPECIFIC_CODE (__VA_ARGS__) \
|
||||
DECLARE_AVX512VBMI_SPECIFIC_CODE (__VA_ARGS__) \
|
||||
DECLARE_AVX512VBMI2_SPECIFIC_CODE (__VA_ARGS__)
|
||||
DECLARE_AVX512BW_SPECIFIC_CODE(__VA_ARGS__) \
|
||||
DECLARE_AVX512VBMI_SPECIFIC_CODE(__VA_ARGS__)
|
||||
|
||||
DECLARE_DEFAULT_CODE(
|
||||
constexpr auto BuildArch = TargetArch::Default; /// NOLINT
|
||||
@ -296,9 +276,6 @@ DECLARE_AVX512VBMI_SPECIFIC_CODE(
|
||||
constexpr auto BuildArch = TargetArch::AVX512VBMI; /// NOLINT
|
||||
) // DECLARE_AVX512VBMI_SPECIFIC_CODE
|
||||
|
||||
DECLARE_AVX512VBMI2_SPECIFIC_CODE(
|
||||
constexpr auto BuildArch = TargetArch::AVX512VBMI2; /// NOLINT
|
||||
) // DECLARE_AVX512VBMI2_SPECIFIC_CODE
|
||||
|
||||
/** Runtime Dispatch helpers for class members.
|
||||
*
|
||||
|
@ -1,54 +0,0 @@
|
||||
#include <gtest/gtest.h>
|
||||
#include <thread>
|
||||
#include <chrono>
|
||||
#include <base/CachedFn.h>
|
||||
|
||||
using namespace std::chrono_literals;
|
||||
|
||||
constexpr int add(int x, int y)
|
||||
{
|
||||
return x + y;
|
||||
}
|
||||
|
||||
int longFunction(int x, int y)
|
||||
{
|
||||
std::this_thread::sleep_for(1s);
|
||||
return x + y;
|
||||
}
|
||||
|
||||
auto f = [](int x, int y) { return x - y; };
|
||||
|
||||
TEST(CachedFn, Basic)
|
||||
{
|
||||
CachedFn<&add> fn;
|
||||
|
||||
const int res = fn(1, 2);
|
||||
EXPECT_EQ(fn(1, 2), res);
|
||||
|
||||
/// In GCC, lambda can't be placed in TEST, producing "<labmda> has no linkage".
|
||||
/// Assuming http://www.open-std.org/jtc1/sc22/wg21/docs/papers/2014/n4268.html,
|
||||
/// this is a GCC bug.
|
||||
CachedFn<+f> fn2;
|
||||
|
||||
const int res2 = fn2(1, 2);
|
||||
EXPECT_EQ(fn2(1, 2), res2);
|
||||
}
|
||||
|
||||
TEST(CachedFn, CachingResults)
|
||||
{
|
||||
CachedFn<&longFunction> fn;
|
||||
|
||||
for (int x = 0; x < 2; ++x)
|
||||
{
|
||||
for (int y = 0; y < 2; ++y)
|
||||
{
|
||||
const int res = fn(x, y);
|
||||
const time_t start = time(nullptr);
|
||||
|
||||
for (int count = 0; count < 1000; ++count)
|
||||
EXPECT_EQ(fn(x, y), res);
|
||||
|
||||
EXPECT_LT(time(nullptr) - start, 10);
|
||||
}
|
||||
}
|
||||
}
|
@ -240,7 +240,6 @@ CachedOnDiskReadBufferFromFile::ImplementationBufferPtr
|
||||
CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & file_segment)
|
||||
{
|
||||
auto download_state = file_segment->state();
|
||||
LOG_TEST(log, "getReadBufferForFileSegment: {}", file_segment->getInfoForLog());
|
||||
|
||||
if (settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache)
|
||||
{
|
||||
@ -251,7 +250,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "Bypassing cache because `read_from_filesystem_cache_if_exists_otherwise_bypass_cache` option is used");
|
||||
LOG_TEST(log, "Bypassing cache because `read_from_filesystem_cache_if_exists_otherwise_bypass_cache` option is used");
|
||||
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
|
||||
return getRemoteFSReadBuffer(*file_segment, read_type);
|
||||
}
|
||||
@ -263,7 +262,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
|
||||
{
|
||||
case FileSegment::State::SKIP_CACHE:
|
||||
{
|
||||
LOG_DEBUG(log, "Bypassing cache because file segment state is `SKIP_CACHE`");
|
||||
LOG_TRACE(log, "Bypassing cache because file segment state is `SKIP_CACHE`");
|
||||
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
|
||||
return getRemoteFSReadBuffer(*file_segment, read_type);
|
||||
}
|
||||
@ -358,7 +357,7 @@ CachedOnDiskReadBufferFromFile::getReadBufferForFileSegment(FileSegmentPtr & fil
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(
|
||||
LOG_TRACE(
|
||||
log,
|
||||
"Bypassing cache because file segment state is `PARTIALLY_DOWNLOADED_NO_CONTINUATION` and downloaded part already used");
|
||||
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
|
||||
@ -658,7 +657,7 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegmentPtr & file_segment)
|
||||
implementation_buffer->setReadUntilPosition(file_segment->range().right + 1); /// [..., range.right]
|
||||
implementation_buffer->seek(file_offset_of_buffer_end, SEEK_SET);
|
||||
|
||||
LOG_TEST(
|
||||
LOG_TRACE(
|
||||
log,
|
||||
"Predownload failed because of space limit. "
|
||||
"Will read from remote filesystem starting from offset: {}",
|
||||
@ -786,10 +785,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
|
||||
assertCorrectness();
|
||||
|
||||
if (file_offset_of_buffer_end == read_until_position)
|
||||
{
|
||||
LOG_TEST(log, "Read finished on offset {}", file_offset_of_buffer_end);
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!initialized)
|
||||
initialize(file_offset_of_buffer_end, getTotalSizeToRead());
|
||||
@ -813,10 +809,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
|
||||
{
|
||||
bool need_complete_file_segment = file_segment->isDownloader();
|
||||
if (need_complete_file_segment)
|
||||
{
|
||||
LOG_TEST(log, "Resetting downloader {} from scope exit", file_segment->getDownloader());
|
||||
file_segment->completePartAndResetDownloader();
|
||||
}
|
||||
}
|
||||
|
||||
chassert(!file_segment->isDownloader());
|
||||
@ -956,12 +949,12 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
|
||||
else
|
||||
{
|
||||
chassert(file_segment->state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
||||
LOG_TEST(log, "Bypassing cache because writeCache method failed");
|
||||
LOG_TRACE(log, "Bypassing cache because writeCache method failed");
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_DEBUG(log, "No space left in cache, will continue without cache download");
|
||||
LOG_TRACE(log, "No space left in cache, will continue without cache download");
|
||||
file_segment->completeWithState(FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
|
||||
}
|
||||
|
||||
|
@ -39,6 +39,7 @@ StoragePolicy::StoragePolicy(
|
||||
const String & config_prefix,
|
||||
DiskSelectorPtr disks)
|
||||
: name(std::move(name_))
|
||||
, log(&Poco::Logger::get("StoragePolicy (" + name + ")"))
|
||||
{
|
||||
Poco::Util::AbstractConfiguration::Keys keys;
|
||||
String volumes_prefix = config_prefix + ".volumes";
|
||||
@ -81,11 +82,15 @@ StoragePolicy::StoragePolicy(
|
||||
throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor) + " in storage policy " + backQuote(name), ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
buildVolumeIndices();
|
||||
LOG_TRACE(log, "Storage policy {} created, total volumes {}", name, volumes.size());
|
||||
}
|
||||
|
||||
|
||||
StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_)
|
||||
: volumes(std::move(volumes_)), name(std::move(name_)), move_factor(move_factor_)
|
||||
: volumes(std::move(volumes_))
|
||||
, name(std::move(name_))
|
||||
, move_factor(move_factor_)
|
||||
, log(&Poco::Logger::get("StoragePolicy (" + name + ")"))
|
||||
{
|
||||
if (volumes.empty())
|
||||
throw Exception("Storage policy " + backQuote(name) + " must contain at least one Volume.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
|
||||
@ -94,6 +99,7 @@ StoragePolicy::StoragePolicy(String name_, Volumes volumes_, double move_factor_
|
||||
throw Exception("Disk move factor have to be in [0., 1.] interval, but set to " + toString(move_factor) + " in storage policy " + backQuote(name), ErrorCodes::LOGICAL_ERROR);
|
||||
|
||||
buildVolumeIndices();
|
||||
LOG_TRACE(log, "Storage policy {} created, total volumes {}", name, volumes.size());
|
||||
}
|
||||
|
||||
|
||||
@ -206,12 +212,16 @@ UInt64 StoragePolicy::getMaxUnreservedFreeSpace() const
|
||||
|
||||
ReservationPtr StoragePolicy::reserve(UInt64 bytes, size_t min_volume_index) const
|
||||
{
|
||||
LOG_TRACE(log, "Reserving bytes {} from volume index {}, total volumes {}", bytes, min_volume_index, volumes.size());
|
||||
for (size_t i = min_volume_index; i < volumes.size(); ++i)
|
||||
{
|
||||
const auto & volume = volumes[i];
|
||||
auto reservation = volume->reserve(bytes);
|
||||
if (reservation)
|
||||
{
|
||||
LOG_TRACE(log, "Successfully reserved {} bytes on volume index {}", bytes, i);
|
||||
return reservation;
|
||||
}
|
||||
}
|
||||
return {};
|
||||
}
|
||||
|
@ -104,6 +104,8 @@ private:
|
||||
double move_factor = 0.1; /// by default move factor is 10%
|
||||
|
||||
void buildVolumeIndices();
|
||||
|
||||
Poco::Logger * log;
|
||||
};
|
||||
|
||||
|
||||
|
@ -1,3 +1,4 @@
|
||||
#include "Common/DNSResolver.h"
|
||||
#include <Common/config.h>
|
||||
|
||||
#if USE_AWS_S3
|
||||
@ -257,6 +258,9 @@ void PocoHTTPClient::makeRequestInternal(
|
||||
|
||||
if (!request_configuration.proxy_host.empty())
|
||||
{
|
||||
if (enable_s3_requests_logging)
|
||||
LOG_TEST(log, "Due to reverse proxy host name ({}) won't be resolved on ClickHouse side", uri);
|
||||
|
||||
/// Reverse proxy can replace host header with resolved ip address instead of host name.
|
||||
/// This can lead to request signature difference on S3 side.
|
||||
session = makeHTTPSession(target_uri, timeouts, /* resolve_host = */ false);
|
||||
@ -443,6 +447,10 @@ void PocoHTTPClient::makeRequestInternal(
|
||||
response->SetClientErrorMessage(getCurrentExceptionMessage(false));
|
||||
|
||||
addMetric(request, S3MetricType::Errors);
|
||||
|
||||
/// Probably this is socket timeout or something more or less related to DNS
|
||||
/// Let's just remove this host from DNS cache to be more safe
|
||||
DNSResolver::instance().removeHostFromCache(Poco::URI(uri).getHost());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,7 @@
|
||||
|
||||
#include <Poco/Util/AbstractConfiguration.h>
|
||||
#include <Common/Exception.h>
|
||||
#include <IO/ReadHelpers.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
@ -16,7 +17,7 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
|
||||
if (!config.has(config_prefix + ".max_size"))
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected cache size (`max_size`) in configuration");
|
||||
|
||||
max_size = config.getUInt64(config_prefix + ".max_size", 0);
|
||||
max_size = parseWithSizeSuffix<uint64_t>(config.getString(config_prefix + ".max_size"));
|
||||
if (max_size == 0)
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Expected non-zero size for cache configuration");
|
||||
|
||||
@ -25,7 +26,10 @@ void FileCacheSettings::loadFromConfig(const Poco::Util::AbstractConfiguration &
|
||||
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Disk Cache requires non-empty `path` field (cache base path) in config");
|
||||
|
||||
max_elements = config.getUInt64(config_prefix + ".max_elements", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_ELEMENTS);
|
||||
max_file_segment_size = config.getUInt64(config_prefix + ".max_file_segment_size", REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE);
|
||||
if (config.has(config_prefix + ".max_file_segment_size"))
|
||||
max_file_segment_size = parseWithSizeSuffix<uint64_t>(config.getString(config_prefix + ".max_file_segment_size"));
|
||||
else
|
||||
max_file_segment_size = REMOTE_FS_OBJECTS_CACHE_DEFAULT_MAX_FILE_SEGMENT_SIZE;
|
||||
|
||||
cache_on_write_operations = config.getUInt64(config_prefix + ".cache_on_write_operations", false);
|
||||
enable_filesystem_query_cache_limit = config.getUInt64(config_prefix + ".enable_filesystem_query_cache_limit", false);
|
||||
|
@ -34,7 +34,7 @@ IFileCachePriority::WriteIterator LRUFileCachePriority::add(const Key & key, siz
|
||||
CurrentMetrics::add(CurrentMetrics::FilesystemCacheSize, size);
|
||||
CurrentMetrics::add(CurrentMetrics::FilesystemCacheElements);
|
||||
|
||||
LOG_DEBUG(log, "Added entry into LRU queue, key: {}, offset: {}", key.toString(), offset);
|
||||
LOG_TRACE(log, "Added entry into LRU queue, key: {}, offset: {}", key.toString(), offset);
|
||||
|
||||
return std::make_shared<LRUFileCacheIterator>(this, iter);
|
||||
}
|
||||
@ -54,7 +54,7 @@ void LRUFileCachePriority::removeAll(std::lock_guard<std::mutex> &)
|
||||
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, cache_size);
|
||||
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements, queue.size());
|
||||
|
||||
LOG_DEBUG(log, "Removed all entries from LRU queue");
|
||||
LOG_TRACE(log, "Removed all entries from LRU queue");
|
||||
|
||||
queue.clear();
|
||||
cache_size = 0;
|
||||
@ -88,7 +88,7 @@ void LRUFileCachePriority::LRUFileCacheIterator::removeAndGetNext(std::lock_guar
|
||||
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheSize, queue_iter->size);
|
||||
CurrentMetrics::sub(CurrentMetrics::FilesystemCacheElements);
|
||||
|
||||
LOG_DEBUG(cache_priority->log, "Removed entry from LRU queue, key: {}, offset: {}", queue_iter->key.toString(), queue_iter->offset);
|
||||
LOG_TRACE(cache_priority->log, "Removed entry from LRU queue, key: {}, offset: {}", queue_iter->key.toString(), queue_iter->offset);
|
||||
|
||||
queue_iter = cache_priority->queue.erase(queue_iter);
|
||||
}
|
||||
|
@ -350,7 +350,10 @@ void ThreadStatus::detachQuery(bool exit_if_already_detached, bool thread_exits)
|
||||
|
||||
/// Avoid leaking of ThreadGroupStatus::finished_threads_counters_memory
|
||||
/// (this is in case someone uses system thread but did not call getProfileEventsCountersAndMemoryForThreads())
|
||||
thread_group->getProfileEventsCountersAndMemoryForThreads();
|
||||
{
|
||||
std::lock_guard guard(thread_group->mutex);
|
||||
auto stats = std::move(thread_group->finished_threads_counters_memory);
|
||||
}
|
||||
|
||||
thread_group.reset();
|
||||
|
||||
|
@ -24,6 +24,10 @@ public:
|
||||
void describeActions(JSONBuilder::JSONMap & map) const override;
|
||||
void describeActions(FormatSettings & settings) const override;
|
||||
|
||||
bool isPreliminary() const { return pre_distinct; }
|
||||
|
||||
UInt64 getLimitHint() const { return limit_hint; }
|
||||
|
||||
private:
|
||||
void updateOutputStream() override;
|
||||
|
||||
|
@ -54,16 +54,20 @@ size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan:
|
||||
/// Update information about prefix sort description in SortingStep.
|
||||
size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes);
|
||||
|
||||
/// Reading in order from MergeTree table if DISTINCT columns match or form a prefix of MergeTree sorting key
|
||||
size_t tryDistinctReadInOrder(QueryPlan::Node * node, QueryPlan::Nodes & nodes);
|
||||
|
||||
inline const auto & getOptimizations()
|
||||
{
|
||||
static const std::array<Optimization, 7> optimizations = {{
|
||||
static const std::array<Optimization, 8> optimizations = {{
|
||||
{tryLiftUpArrayJoin, "liftUpArrayJoin", &QueryPlanOptimizationSettings::optimize_plan},
|
||||
{tryPushDownLimit, "pushDownLimit", &QueryPlanOptimizationSettings::optimize_plan},
|
||||
{trySplitFilter, "splitFilter", &QueryPlanOptimizationSettings::optimize_plan},
|
||||
{tryMergeExpressions, "mergeExpressions", &QueryPlanOptimizationSettings::optimize_plan},
|
||||
{tryPushDownFilter, "pushDownFilter", &QueryPlanOptimizationSettings::filter_push_down},
|
||||
{tryExecuteFunctionsAfterSorting, "liftUpFunctions", &QueryPlanOptimizationSettings::optimize_plan},
|
||||
{tryReuseStorageOrderingForWindowFunctions, "reuseStorageOrderingForWindowFunctions", &QueryPlanOptimizationSettings::optimize_plan}
|
||||
{tryReuseStorageOrderingForWindowFunctions, "reuseStorageOrderingForWindowFunctions", &QueryPlanOptimizationSettings::optimize_plan},
|
||||
{tryDistinctReadInOrder, "distinctReadInOrder", &QueryPlanOptimizationSettings::distinct_in_order},
|
||||
}};
|
||||
|
||||
return optimizations;
|
||||
|
@ -11,6 +11,7 @@ QueryPlanOptimizationSettings QueryPlanOptimizationSettings::fromSettings(const
|
||||
settings.optimize_plan = from.query_plan_enable_optimizations;
|
||||
settings.max_optimizations_to_apply = from.query_plan_max_optimizations_to_apply;
|
||||
settings.filter_push_down = from.query_plan_filter_push_down;
|
||||
settings.distinct_in_order = from.optimize_distinct_in_order;
|
||||
return settings;
|
||||
}
|
||||
|
||||
|
@ -21,6 +21,9 @@ struct QueryPlanOptimizationSettings
|
||||
/// If filter push down optimization is enabled.
|
||||
bool filter_push_down = true;
|
||||
|
||||
/// if distinct in order optimization is enabled
|
||||
bool distinct_in_order = false;
|
||||
|
||||
static QueryPlanOptimizationSettings fromSettings(const Settings & from);
|
||||
static QueryPlanOptimizationSettings fromContext(ContextPtr from);
|
||||
};
|
||||
|
@ -0,0 +1,97 @@
|
||||
#include <memory>
|
||||
#include <Processors/QueryPlan/DistinctStep.h>
|
||||
#include <Processors/QueryPlan/ITransformingStep.h>
|
||||
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
|
||||
#include <Processors/QueryPlan/ReadFromMergeTree.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
|
||||
namespace DB::QueryPlanOptimizations
|
||||
{
|
||||
size_t tryDistinctReadInOrder(QueryPlan::Node * parent_node, QueryPlan::Nodes &)
|
||||
{
|
||||
/// check if it is preliminary distinct node
|
||||
DistinctStep * pre_distinct = nullptr;
|
||||
if (auto * distinct = typeid_cast<DistinctStep *>(parent_node->step.get()); distinct)
|
||||
{
|
||||
if (distinct->isPreliminary())
|
||||
pre_distinct = distinct;
|
||||
}
|
||||
if (!pre_distinct)
|
||||
return 0;
|
||||
|
||||
/// walk through the plan
|
||||
/// (1) check if nodes below preliminary distinct preserve sorting
|
||||
/// (2) gather transforming steps to update their sorting properties later
|
||||
std::vector<ITransformingStep *> steps2update;
|
||||
QueryPlan::Node * node = parent_node;
|
||||
while (!node->children.empty())
|
||||
{
|
||||
auto * step = dynamic_cast<ITransformingStep *>(node->step.get());
|
||||
if (!step)
|
||||
return 0;
|
||||
|
||||
const ITransformingStep::DataStreamTraits & traits = step->getDataStreamTraits();
|
||||
if (!traits.preserves_sorting)
|
||||
return 0;
|
||||
|
||||
steps2update.push_back(step);
|
||||
|
||||
node = node->children.front();
|
||||
}
|
||||
|
||||
/// check if we read from MergeTree
|
||||
auto * read_from_merge_tree = typeid_cast<ReadFromMergeTree *>(node->step.get());
|
||||
if (!read_from_merge_tree)
|
||||
return 0;
|
||||
|
||||
/// find non-const columns in DISTINCT
|
||||
const ColumnsWithTypeAndName & distinct_columns = pre_distinct->getOutputStream().header.getColumnsWithTypeAndName();
|
||||
std::set<std::string_view> non_const_columns;
|
||||
for (const auto & column : distinct_columns)
|
||||
{
|
||||
if (!isColumnConst(*column.column))
|
||||
non_const_columns.emplace(column.name);
|
||||
}
|
||||
|
||||
const Names& sorting_key_columns = read_from_merge_tree->getStorageMetadata()->getSortingKeyColumns();
|
||||
/// check if DISTINCT has the same columns as sorting key
|
||||
size_t number_of_sorted_distinct_columns = 0;
|
||||
for (const auto & column_name : sorting_key_columns)
|
||||
{
|
||||
if (non_const_columns.end() == non_const_columns.find(column_name))
|
||||
break;
|
||||
|
||||
++number_of_sorted_distinct_columns;
|
||||
}
|
||||
/// apply optimization only when distinct columns match or form prefix of sorting key
|
||||
/// todo: check if reading in order optimization would be beneficial when sorting key is prefix of columns in DISTINCT
|
||||
if (number_of_sorted_distinct_columns != non_const_columns.size())
|
||||
return 0;
|
||||
|
||||
/// check if another read in order optimization is already applied
|
||||
/// apply optimization only if another read in order one uses less sorting columns
|
||||
/// example: SELECT DISTINCT a, b FROM t ORDER BY a; -- sorting key: a, b
|
||||
/// if read in order for ORDER BY is already applied, then output sort description will contain only column `a`
|
||||
/// but we need columns `a, b`, applying read in order for distinct will still benefit `order by`
|
||||
const DataStream & output_data_stream = read_from_merge_tree->getOutputStream();
|
||||
const SortDescription & output_sort_desc = output_data_stream.sort_description;
|
||||
if (output_data_stream.sort_scope != DataStream::SortScope::Chunk && number_of_sorted_distinct_columns <= output_sort_desc.size())
|
||||
return 0;
|
||||
|
||||
/// update input order info in read_from_merge_tree step
|
||||
const int direction = 0; /// for DISTINCT direction doesn't matter, ReadFromMergeTree will choose proper one
|
||||
read_from_merge_tree->requestReadingInOrder(number_of_sorted_distinct_columns, direction, pre_distinct->getLimitHint());
|
||||
|
||||
/// update data stream's sorting properties for found transforms
|
||||
const DataStream * input_stream = &read_from_merge_tree->getOutputStream();
|
||||
while (!steps2update.empty())
|
||||
{
|
||||
steps2update.back()->updateInputStream(*input_stream);
|
||||
input_stream = &steps2update.back()->getOutputStream();
|
||||
steps2update.pop_back();
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
}
|
@ -91,8 +91,6 @@ size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node,
|
||||
window->getWindowDescription().full_sort_description,
|
||||
query_info.syntax_analyzer_result);
|
||||
|
||||
read_from_merge_tree->setQueryInfoOrderOptimizer(order_optimizer);
|
||||
|
||||
/// If we don't have filtration, we can pushdown limit to reading stage for optimizations.
|
||||
UInt64 limit = (select_query->hasFiltration() || select_query->groupBy()) ? 0 : InterpreterSelectQuery::getLimitForSorting(*select_query, context);
|
||||
|
||||
@ -103,7 +101,7 @@ size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node,
|
||||
|
||||
if (order_info)
|
||||
{
|
||||
read_from_merge_tree->setQueryInfoInputOrderInfo(order_info);
|
||||
read_from_merge_tree->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
|
||||
sorting->convertToFinishSorting(order_info->sort_description_for_merging);
|
||||
}
|
||||
|
||||
|
@ -153,7 +153,6 @@ ReadFromMergeTree::ReadFromMergeTree(
|
||||
}
|
||||
|
||||
output_stream->sort_description = std::move(sort_description);
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
@ -1019,28 +1018,38 @@ MergeTreeDataSelectAnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
|
||||
return std::make_shared<MergeTreeDataSelectAnalysisResult>(MergeTreeDataSelectAnalysisResult{.result = std::move(result)});
|
||||
}
|
||||
|
||||
void ReadFromMergeTree::setQueryInfoOrderOptimizer(std::shared_ptr<ReadInOrderOptimizer> order_optimizer)
|
||||
void ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, size_t limit)
|
||||
{
|
||||
if (query_info.projection)
|
||||
{
|
||||
query_info.projection->order_optimizer = order_optimizer;
|
||||
}
|
||||
else
|
||||
{
|
||||
query_info.order_optimizer = order_optimizer;
|
||||
}
|
||||
}
|
||||
/// if dirction is not set, use current one
|
||||
if (!direction)
|
||||
direction = getSortDirection();
|
||||
|
||||
void ReadFromMergeTree::setQueryInfoInputOrderInfo(InputOrderInfoPtr order_info)
|
||||
{
|
||||
auto order_info = std::make_shared<InputOrderInfo>(SortDescription{}, prefix_size, direction, limit);
|
||||
if (query_info.projection)
|
||||
{
|
||||
query_info.projection->input_order_info = order_info;
|
||||
}
|
||||
else
|
||||
{
|
||||
query_info.input_order_info = order_info;
|
||||
|
||||
/// update sort info for output stream
|
||||
SortDescription sort_description;
|
||||
const Names & sorting_key_columns = storage_snapshot->getMetadataForQuery()->getSortingKeyColumns();
|
||||
const Block & header = output_stream->header;
|
||||
const int sort_direction = getSortDirection();
|
||||
for (const auto & column_name : sorting_key_columns)
|
||||
{
|
||||
if (std::find_if(header.begin(), header.end(), [&](ColumnWithTypeAndName const & col) { return col.name == column_name; })
|
||||
== header.end())
|
||||
break;
|
||||
sort_description.emplace_back(column_name, sort_direction);
|
||||
}
|
||||
if (sort_description.empty())
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Sort description can't be empty when reading in order");
|
||||
|
||||
const size_t used_prefix_of_sorting_key_size = order_info->used_prefix_of_sorting_key_size;
|
||||
if (sort_description.size() > used_prefix_of_sorting_key_size)
|
||||
sort_description.resize(used_prefix_of_sorting_key_size);
|
||||
output_stream->sort_description = std::move(sort_description);
|
||||
output_stream->sort_scope = DataStream::SortScope::Stream;
|
||||
}
|
||||
|
||||
ReadFromMergeTree::AnalysisResult ReadFromMergeTree::getAnalysisResult() const
|
||||
|
@ -151,8 +151,7 @@ public:
|
||||
const SelectQueryInfo & getQueryInfo() const { return query_info; }
|
||||
StorageMetadataPtr getStorageMetadata() const { return metadata_for_reading; }
|
||||
|
||||
void setQueryInfoOrderOptimizer(std::shared_ptr<ReadInOrderOptimizer> read_in_order_optimizer);
|
||||
void setQueryInfoInputOrderInfo(InputOrderInfoPtr order_info);
|
||||
void requestReadingInOrder(size_t prefix_size, int direction, size_t limit);
|
||||
|
||||
private:
|
||||
int getSortDirection() const
|
||||
|
@ -223,54 +223,69 @@ void DataPartStorageOnDisk::remove(
|
||||
|
||||
/// NOTE relative_path can contain not only part name itself, but also some prefix like
|
||||
/// "moving/all_1_1_1" or "detached/all_2_3_5". We should handle this case more properly.
|
||||
if (part_dir_without_slash.has_parent_path())
|
||||
{
|
||||
auto parent_path = part_dir_without_slash.parent_path();
|
||||
if (parent_path == "detached")
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Trying to remove detached part {} with path {} in remove function. It shouldn't happen", part_dir, root_path);
|
||||
|
||||
part_dir_without_slash = parent_path / ("delete_tmp_" + std::string{part_dir_without_slash.filename()});
|
||||
}
|
||||
else
|
||||
{
|
||||
part_dir_without_slash = ("delete_tmp_" + std::string{part_dir_without_slash.filename()});
|
||||
}
|
||||
|
||||
/// File might be already renamed on previous try
|
||||
bool has_delete_prefix = part_dir_without_slash.filename().string().starts_with("delete_tmp_");
|
||||
std::optional<CanRemoveDescription> can_remove_description;
|
||||
auto disk = volume->getDisk();
|
||||
fs::path to = fs::path(root_path) / part_dir_without_slash;
|
||||
|
||||
std::optional<CanRemoveDescription> can_remove_description;
|
||||
|
||||
auto disk = volume->getDisk();
|
||||
if (disk->exists(to))
|
||||
if (!has_delete_prefix)
|
||||
{
|
||||
LOG_WARNING(log, "Directory {} (to which part must be renamed before removing) already exists. Most likely this is due to unclean restart or race condition. Removing it.", fullPath(disk, to));
|
||||
if (part_dir_without_slash.has_parent_path())
|
||||
{
|
||||
auto parent_path = part_dir_without_slash.parent_path();
|
||||
if (parent_path == "detached")
|
||||
throw Exception(
|
||||
ErrorCodes::LOGICAL_ERROR,
|
||||
"Trying to remove detached part {} with path {} in remove function. It shouldn't happen",
|
||||
part_dir,
|
||||
root_path);
|
||||
|
||||
part_dir_without_slash = parent_path / ("delete_tmp_" + std::string{part_dir_without_slash.filename()});
|
||||
}
|
||||
else
|
||||
{
|
||||
part_dir_without_slash = ("delete_tmp_" + std::string{part_dir_without_slash.filename()});
|
||||
}
|
||||
|
||||
to = fs::path(root_path) / part_dir_without_slash;
|
||||
|
||||
if (disk->exists(to))
|
||||
{
|
||||
LOG_WARNING(log, "Directory {} (to which part must be renamed before removing) already exists. "
|
||||
"Most likely this is due to unclean restart or race condition. Removing it.", fullPath(disk, to));
|
||||
try
|
||||
{
|
||||
can_remove_description.emplace(can_remove_callback());
|
||||
disk->removeSharedRecursive(
|
||||
fs::path(to) / "", !can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove);
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
LOG_ERROR(
|
||||
log, "Cannot recursively remove directory {}. Exception: {}", fullPath(disk, to), getCurrentExceptionMessage(false));
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
can_remove_description.emplace(can_remove_callback());
|
||||
disk->removeSharedRecursive(fs::path(to) / "", !can_remove_description->can_remove_anything, can_remove_description->files_not_to_remove);
|
||||
disk->moveDirectory(from, to);
|
||||
onRename(root_path, part_dir_without_slash);
|
||||
}
|
||||
catch (...)
|
||||
catch (const fs::filesystem_error & e)
|
||||
{
|
||||
LOG_ERROR(log, "Cannot recursively remove directory {}. Exception: {}", fullPath(disk, to), getCurrentExceptionMessage(false));
|
||||
if (e.code() == std::errc::no_such_file_or_directory)
|
||||
{
|
||||
LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. "
|
||||
"Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, to));
|
||||
return;
|
||||
}
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
try
|
||||
{
|
||||
disk->moveDirectory(from, to);
|
||||
onRename(root_path, part_dir_without_slash);
|
||||
}
|
||||
catch (const fs::filesystem_error & e)
|
||||
{
|
||||
if (e.code() == std::errc::no_such_file_or_directory)
|
||||
{
|
||||
LOG_ERROR(log, "Directory {} (part to remove) doesn't exist or one of nested files has gone. Most likely this is due to manual removing. This should be discouraged. Ignoring.", fullPath(disk, to));
|
||||
return;
|
||||
}
|
||||
throw;
|
||||
}
|
||||
|
||||
if (!can_remove_description)
|
||||
can_remove_description.emplace(can_remove_callback());
|
||||
|
||||
|
@ -100,8 +100,10 @@ struct ReplicatedFetchReadCallback
|
||||
}
|
||||
|
||||
|
||||
Service::Service(StorageReplicatedMergeTree & data_) :
|
||||
data(data_), log(&Poco::Logger::get(data.getLogName() + " (Replicated PartsService)")) {}
|
||||
Service::Service(StorageReplicatedMergeTree & data_)
|
||||
: data(data_)
|
||||
, log(&Poco::Logger::get(data.getStorageID().getNameForLogs() + " (Replicated PartsService)"))
|
||||
{}
|
||||
|
||||
std::string Service::getId(const std::string & node_id) const
|
||||
{
|
||||
@ -444,6 +446,11 @@ MergeTreeData::DataPartPtr Service::findPart(const String & name)
|
||||
throw Exception(ErrorCodes::NO_SUCH_DATA_PART, "No part {} in table", name);
|
||||
}
|
||||
|
||||
Fetcher::Fetcher(StorageReplicatedMergeTree & data_)
|
||||
: data(data_)
|
||||
, log(&Poco::Logger::get(data.getStorageID().getNameForLogs() + " (Fetcher)"))
|
||||
{}
|
||||
|
||||
MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
|
||||
const StorageMetadataPtr & metadata_snapshot,
|
||||
ContextPtr context,
|
||||
@ -494,6 +501,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
|
||||
|
||||
if (disk)
|
||||
{
|
||||
LOG_TRACE(log, "Will fetch to disk {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
|
||||
UInt64 revision = disk->getRevision();
|
||||
if (revision)
|
||||
uri.addQueryParameter("disk_revision", toString(revision));
|
||||
@ -504,13 +512,21 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
|
||||
{
|
||||
if (!disk)
|
||||
{
|
||||
LOG_TRACE(log, "Trying to fetch with zero-copy replication, but disk is not provided, will try to select");
|
||||
Disks disks = data.getDisks();
|
||||
for (const auto & data_disk : disks)
|
||||
{
|
||||
LOG_TRACE(log, "Checking disk {} with type {}", data_disk->getName(), toString(data_disk->getDataSourceDescription().type));
|
||||
if (data_disk->supportZeroCopyReplication())
|
||||
{
|
||||
LOG_TRACE(log, "Disk {} (with type {}) supports zero-copy replication", data_disk->getName(), toString(data_disk->getDataSourceDescription().type));
|
||||
capability.push_back(toString(data_disk->getDataSourceDescription().type));
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (disk->supportZeroCopyReplication())
|
||||
{
|
||||
LOG_TRACE(log, "Trying to fetch with zero copy replication, provided disk {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
|
||||
capability.push_back(toString(disk->getDataSourceDescription().type));
|
||||
}
|
||||
}
|
||||
@ -562,29 +578,47 @@ MergeTreeData::MutableDataPartPtr Fetcher::fetchSelectedPart(
|
||||
ReadBufferFromString ttl_infos_buffer(ttl_infos_string);
|
||||
assertString("ttl format version: 1\n", ttl_infos_buffer);
|
||||
ttl_infos.read(ttl_infos_buffer);
|
||||
|
||||
if (!disk)
|
||||
{
|
||||
LOG_TRACE(log, "Disk for fetch is not provided, reserving space using storage balanced reservation");
|
||||
reservation
|
||||
= data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, &ttl_infos, true);
|
||||
if (!reservation)
|
||||
{
|
||||
LOG_TRACE(log, "Disk for fetch is not provided, reserving space using TTL rules");
|
||||
reservation
|
||||
= data.reserveSpacePreferringTTLRules(metadata_snapshot, sum_files_size, ttl_infos, std::time(nullptr), 0, true);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (!disk)
|
||||
{
|
||||
LOG_TRACE(log, "Making balanced reservation");
|
||||
reservation = data.balancedReservation(metadata_snapshot, sum_files_size, 0, part_name, part_info, {}, tagger_ptr, nullptr);
|
||||
if (!reservation)
|
||||
{
|
||||
LOG_TRACE(log, "Making simple reservation");
|
||||
reservation = data.reserveSpace(sum_files_size);
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (!disk)
|
||||
{
|
||||
LOG_TRACE(log, "Making reservation on the largest disk");
|
||||
/// We don't know real size of part because sender server version is too old
|
||||
reservation = data.makeEmptyReservationOnLargestDisk();
|
||||
}
|
||||
|
||||
if (!disk)
|
||||
{
|
||||
disk = reservation->getDisk();
|
||||
LOG_INFO(log, "Disk for fetch is not provided, getting disk from reservation {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_INFO(log, "Disk for fetch is disk {} with type {}", disk->getName(), toString(disk->getDataSourceDescription().type));
|
||||
}
|
||||
|
||||
UInt64 revision = parse<UInt64>(in->getResponseCookie("disk_revision", "0"));
|
||||
if (revision)
|
||||
@ -989,7 +1023,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToDiskRemoteMeta(
|
||||
|
||||
if (!disk->supportZeroCopyReplication() || !disk->checkUniqueId(part_id))
|
||||
{
|
||||
throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {}.", part_name, part_id, disk->getName());
|
||||
throw Exception(ErrorCodes::ZERO_COPY_REPLICATION_ERROR, "Part {} unique id {} doesn't exist on {} (with type {}).", part_name, part_id, disk->getName(), toString(disk->getDataSourceDescription().type));
|
||||
}
|
||||
|
||||
LOG_DEBUG(log, "Downloading Part {} unique id {} metadata onto disk {}.",
|
||||
|
@ -67,7 +67,7 @@ private:
|
||||
class Fetcher final : private boost::noncopyable
|
||||
{
|
||||
public:
|
||||
explicit Fetcher(StorageReplicatedMergeTree & data_) : data(data_), log(&Poco::Logger::get("Fetcher")) {}
|
||||
explicit Fetcher(StorageReplicatedMergeTree & data_);
|
||||
|
||||
/// Downloads a part to tmp_directory. If to_detached - downloads to the `detached` directory.
|
||||
MergeTreeData::MutableDataPartPtr fetchSelectedPart(
|
||||
|
@ -4909,12 +4909,14 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
|
||||
{
|
||||
expected_size = std::max(RESERVATION_MIN_ESTIMATION_SIZE, expected_size);
|
||||
|
||||
LOG_TRACE(log, "Trying reserve {} bytes preffering TTL rules", expected_size);
|
||||
ReservationPtr reservation;
|
||||
|
||||
auto move_ttl_entry = selectTTLDescriptionForTTLInfos(metadata_snapshot->getMoveTTLs(), ttl_infos.moves_ttl, time_of_move, true);
|
||||
|
||||
if (move_ttl_entry)
|
||||
{
|
||||
LOG_TRACE(log, "Got move TTL entry, will try to reserver destination for move");
|
||||
SpacePtr destination_ptr = getDestinationForMoveTTL(*move_ttl_entry, is_insert);
|
||||
if (!destination_ptr)
|
||||
{
|
||||
@ -4935,10 +4937,15 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
|
||||
}
|
||||
else
|
||||
{
|
||||
LOG_TRACE(log, "Reserving bytes on selected destination");
|
||||
reservation = destination_ptr->reserve(expected_size);
|
||||
if (reservation)
|
||||
{
|
||||
LOG_TRACE(log, "Reservation successful");
|
||||
return reservation;
|
||||
}
|
||||
else
|
||||
{
|
||||
if (move_ttl_entry->destination_type == DataDestinationType::VOLUME)
|
||||
LOG_WARNING(
|
||||
log,
|
||||
@ -4951,15 +4958,22 @@ ReservationPtr MergeTreeData::tryReserveSpacePreferringTTLRules(
|
||||
"Would like to reserve space on disk '{}' by TTL rule of table '{}' but there is not enough space",
|
||||
move_ttl_entry->destination_name,
|
||||
*std::atomic_load(&log_name));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Prefer selected_disk
|
||||
if (selected_disk)
|
||||
{
|
||||
LOG_DEBUG(log, "Disk for reservation provided: {} (with type {})", selected_disk->getName(), toString(selected_disk->getDataSourceDescription().type));
|
||||
reservation = selected_disk->reserve(expected_size);
|
||||
}
|
||||
|
||||
if (!reservation)
|
||||
{
|
||||
LOG_DEBUG(log, "No reservation, reserving using storage policy from min volume index {}", min_volume_index);
|
||||
reservation = getStoragePolicy()->reserve(expected_size, min_volume_index);
|
||||
}
|
||||
|
||||
return reservation;
|
||||
}
|
||||
|
@ -202,7 +202,7 @@ InputOrderInfoPtr ReadInOrderOptimizer::getInputOrderImpl(
|
||||
const ContextPtr & context,
|
||||
UInt64 limit) const
|
||||
{
|
||||
auto sorting_key_columns = metadata_snapshot->getSortingKeyColumns();
|
||||
const Names & sorting_key_columns = metadata_snapshot->getSortingKeyColumns();
|
||||
int read_direction = description.at(0).direction;
|
||||
|
||||
auto fixed_sorting_columns = getFixedSortingColumns(query, sorting_key_columns, context);
|
||||
|
@ -131,6 +131,15 @@ CI_CONFIG = {
|
||||
"tidy": "disable",
|
||||
"with_coverage": False,
|
||||
},
|
||||
"binary_aarch64_v80compat": {
|
||||
"compiler": "clang-15-aarch64-v80compat",
|
||||
"build_type": "",
|
||||
"sanitizer": "",
|
||||
"package_type": "binary",
|
||||
"libraries": "static",
|
||||
"tidy": "disable",
|
||||
"with_coverage": False,
|
||||
},
|
||||
"binary_freebsd": {
|
||||
"compiler": "clang-15-freebsd",
|
||||
"build_type": "",
|
||||
@ -189,6 +198,7 @@ CI_CONFIG = {
|
||||
"binary_shared",
|
||||
"binary_darwin",
|
||||
"binary_aarch64",
|
||||
"binary_aarch64_v80compat",
|
||||
"binary_freebsd",
|
||||
"binary_darwin_aarch64",
|
||||
"binary_ppc64le",
|
||||
|
@ -164,7 +164,7 @@ def gen_versions(
|
||||
# The order is important, PR number is used as cache during the build
|
||||
versions = [str(pr_info.number), pr_commit_version]
|
||||
result_version = pr_commit_version
|
||||
if pr_info.number == 0 and pr_info.base_name == "master":
|
||||
if pr_info.number == 0 and pr_info.base_ref == "master":
|
||||
# First get the latest for cache
|
||||
versions.insert(0, "latest")
|
||||
|
||||
|
@ -99,11 +99,11 @@ class TestDockerImageCheck(unittest.TestCase):
|
||||
|
||||
def test_gen_version(self):
|
||||
pr_info = PRInfo(PRInfo.default_event.copy())
|
||||
pr_info.base_name = "anything-else"
|
||||
pr_info.base_ref = "anything-else"
|
||||
versions, result_version = di.gen_versions(pr_info, None)
|
||||
self.assertEqual(versions, ["0", "0-HEAD"])
|
||||
self.assertEqual(result_version, "0-HEAD")
|
||||
pr_info.base_name = "master"
|
||||
pr_info.base_ref = "master"
|
||||
versions, result_version = di.gen_versions(pr_info, None)
|
||||
self.assertEqual(versions, ["latest", "0", "0-HEAD"])
|
||||
self.assertEqual(result_version, "0-HEAD")
|
||||
|
@ -132,9 +132,13 @@ class PRInfo:
|
||||
self.commit_html_url = f"{repo_prefix}/commits/{self.sha}"
|
||||
self.pr_html_url = f"{repo_prefix}/pull/{self.number}"
|
||||
|
||||
# master or backport/xx.x/xxxxx - where the PR will be merged
|
||||
self.base_ref = github_event["pull_request"]["base"]["ref"]
|
||||
# ClickHouse/ClickHouse
|
||||
self.base_name = github_event["pull_request"]["base"]["repo"]["full_name"]
|
||||
# any_branch-name - the name of working branch name
|
||||
self.head_ref = github_event["pull_request"]["head"]["ref"]
|
||||
# UserName/ClickHouse or ClickHouse/ClickHouse
|
||||
self.head_name = github_event["pull_request"]["head"]["repo"]["full_name"]
|
||||
self.body = github_event["pull_request"]["body"]
|
||||
self.labels = {
|
||||
|
@ -13,9 +13,7 @@
|
||||
<policies>
|
||||
<s3>
|
||||
<volumes>
|
||||
<main>
|
||||
<disk>s3</disk>
|
||||
</main>
|
||||
<main><disk>s3</disk></main>
|
||||
</volumes>
|
||||
</s3>
|
||||
</policies>
|
||||
|
@ -48,8 +48,8 @@
|
||||
<s3_cache>
|
||||
<type>cache</type>
|
||||
<disk>s3_disk</disk>
|
||||
<path>s3_disk_cache/</path>
|
||||
<max_size>22548578304</max_size>
|
||||
<path>s3_cache/</path>
|
||||
<max_size>2147483648</max_size>
|
||||
<cache_on_write_operations>1</cache_on_write_operations>
|
||||
<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>
|
||||
</s3_cache>
|
||||
@ -57,8 +57,9 @@
|
||||
<type>cache</type>
|
||||
<disk>s3_disk_2</disk>
|
||||
<path>s3_cache_2/</path>
|
||||
<max_size>22548578304</max_size>
|
||||
<max_size>2Gi</max_size>
|
||||
<do_not_evict_index_and_mark_files>0</do_not_evict_index_and_mark_files>
|
||||
<max_file_segment_size>100Mi</max_file_segment_size>
|
||||
</s3_cache_2>
|
||||
<s3_cache_3>
|
||||
<type>cache</type>
|
||||
|
@ -4,15 +4,10 @@
|
||||
<fill_query>INSERT INTO distinct_cardinality_high SELECT number % 1e6, number % 1e4, number % 1e2 FROM numbers_mt(1e8)</fill_query>
|
||||
|
||||
<query>SELECT DISTINCT high FROM distinct_cardinality_high FORMAT Null</query>
|
||||
<query>SELECT DISTINCT high, low FROM distinct_cardinality_high FORMAT Null</query>
|
||||
<query>SELECT DISTINCT high, medium FROM distinct_cardinality_high FORMAT Null</query>
|
||||
<query>SELECT DISTINCT high, medium, low FROM distinct_cardinality_high FORMAT Null</query>
|
||||
|
||||
<query>SELECT DISTINCT high, medium FROM distinct_cardinality_high ORDER BY high, medium FORMAT Null</query>
|
||||
<query>SELECT DISTINCT high, medium FROM distinct_cardinality_high ORDER BY high FORMAT Null</query>
|
||||
<query>SELECT DISTINCT high, medium FROM distinct_cardinality_high ORDER BY medium FORMAT Null</query>
|
||||
<query>SELECT DISTINCT high, low FROM distinct_cardinality_high ORDER BY low FORMAT Null</query>
|
||||
<query>SELECT DISTINCT high, medium, low FROM distinct_cardinality_high ORDER BY low FORMAT Null</query>
|
||||
|
||||
<drop_query>DROP TABLE IF EXISTS distinct_cardinality_high</drop_query>
|
||||
|
||||
@ -22,14 +17,9 @@
|
||||
|
||||
<query>SELECT DISTINCT low FROM distinct_cardinality_low FORMAT Null</query>
|
||||
<query>SELECT DISTINCT low, medium FROM distinct_cardinality_low FORMAT Null</query>
|
||||
<query>SELECT DISTINCT low, high FROM distinct_cardinality_low FORMAT Null</query>
|
||||
<query>SELECT DISTINCT low, medium, high FROM distinct_cardinality_low FORMAT Null</query>
|
||||
|
||||
<query>SELECT DISTINCT low, medium FROM distinct_cardinality_low ORDER BY low, medium FORMAT Null</query>
|
||||
<query>SELECT DISTINCT low, medium FROM distinct_cardinality_low ORDER BY low FORMAT Null</query>
|
||||
<query>SELECT DISTINCT low, medium FROM distinct_cardinality_low ORDER BY medium FORMAT Null</query>
|
||||
<query>SELECT DISTINCT low, high FROM distinct_cardinality_low ORDER BY high FORMAT Null</query>
|
||||
<query>SELECT DISTINCT low, medium, high FROM distinct_cardinality_low ORDER BY high FORMAT Null</query>
|
||||
|
||||
<drop_query>DROP TABLE IF EXISTS distinct_cardinality_low</drop_query>
|
||||
</test>
|
||||
|
@ -15,6 +15,8 @@ INSERT INTO partslost_0 SELECT toString(number) AS x from system.numbers LIMIT 1
|
||||
|
||||
ALTER TABLE partslost_0 ADD INDEX idx x TYPE tokenbf_v1(285000, 3, 12345) GRANULARITY 3;
|
||||
|
||||
SET mutations_sync = 2;
|
||||
|
||||
ALTER TABLE partslost_0 MATERIALIZE INDEX idx;
|
||||
|
||||
-- In worst case doesn't check anything, but it's not flaky
|
||||
|
@ -104,3 +104,9 @@ select distinct a, b, x, y from (select a, b, 1 as x, 2 as y from distinct_in_or
|
||||
0 3 1 2
|
||||
0 4 1 2
|
||||
-- check that distinct in order returns the same result as ordinary distinct
|
||||
-- check that distinct in order WITH order by returns the same result as ordinary distinct
|
||||
0
|
||||
-- check that distinct in order WITHOUT order by returns the same result as ordinary distinct
|
||||
0
|
||||
-- check that distinct in order WITHOUT order by and WITH filter returns the same result as ordinary distinct
|
||||
0
|
||||
|
@ -65,11 +65,32 @@ INSERT INTO distinct_cardinality_low SELECT number % 1e1, number % 1e2, number %
|
||||
drop table if exists distinct_in_order sync;
|
||||
drop table if exists ordinary_distinct sync;
|
||||
|
||||
select '-- check that distinct in order WITH order by returns the same result as ordinary distinct';
|
||||
create table distinct_in_order (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
|
||||
insert into distinct_in_order select distinct * from distinct_cardinality_low order by high settings optimize_distinct_in_order=1;
|
||||
create table ordinary_distinct (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
|
||||
insert into ordinary_distinct select distinct * from distinct_cardinality_low order by high settings optimize_distinct_in_order=0;
|
||||
select distinct * from distinct_in_order except select * from ordinary_distinct;
|
||||
select count() as diff from (select distinct * from distinct_in_order except select * from ordinary_distinct);
|
||||
|
||||
drop table if exists distinct_in_order sync;
|
||||
drop table if exists ordinary_distinct sync;
|
||||
|
||||
select '-- check that distinct in order WITHOUT order by returns the same result as ordinary distinct';
|
||||
create table distinct_in_order (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
|
||||
insert into distinct_in_order select distinct * from distinct_cardinality_low settings optimize_distinct_in_order=1;
|
||||
create table ordinary_distinct (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
|
||||
insert into ordinary_distinct select distinct * from distinct_cardinality_low settings optimize_distinct_in_order=0;
|
||||
select count() as diff from (select distinct * from distinct_in_order except select * from ordinary_distinct);
|
||||
|
||||
drop table if exists distinct_in_order;
|
||||
drop table if exists ordinary_distinct;
|
||||
|
||||
select '-- check that distinct in order WITHOUT order by and WITH filter returns the same result as ordinary distinct';
|
||||
create table distinct_in_order (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
|
||||
insert into distinct_in_order select distinct * from distinct_cardinality_low where low > 0 settings optimize_distinct_in_order=1;
|
||||
create table ordinary_distinct (low UInt64, medium UInt64, high UInt64) engine=MergeTree() order by (low, medium);
|
||||
insert into ordinary_distinct select distinct * from distinct_cardinality_low where low > 0 settings optimize_distinct_in_order=0;
|
||||
select count() as diff from (select distinct * from distinct_in_order except select * from ordinary_distinct);
|
||||
|
||||
drop table if exists distinct_in_order;
|
||||
drop table if exists ordinary_distinct;
|
||||
|
@ -15,6 +15,9 @@ DistinctSortedChunkTransform
|
||||
-- distinct with primary key prefix and order by the same columns -> pre-distinct and final distinct optimization
|
||||
DistinctSortedStreamTransform
|
||||
DistinctSortedChunkTransform
|
||||
-- distinct with primary key prefix and order by columns are prefix of distinct columns -> pre-distinct and final distinct optimization
|
||||
DistinctSortedTransform
|
||||
DistinctSortedChunkTransform
|
||||
-- distinct with primary key prefix and order by column in distinct but non-primary key prefix -> pre-distinct and final distinct optimization
|
||||
DistinctSortedTransform
|
||||
DistinctSortedChunkTransform
|
||||
@ -33,3 +36,48 @@ DistinctTransform
|
||||
-- distinct with non-primary key prefix and order by _const_ column in distinct -> ordinary distinct
|
||||
DistinctTransform
|
||||
DistinctTransform
|
||||
-- Check reading in order for distinct
|
||||
-- disabled, distinct columns match sorting key
|
||||
MergeTreeThread
|
||||
-- enabled, distinct columns match sorting key
|
||||
MergeTreeInOrder
|
||||
-- enabled, distinct columns form prefix of sorting key
|
||||
MergeTreeInOrder
|
||||
-- enabled, distinct columns DON't form prefix of sorting key
|
||||
MergeTreeThread
|
||||
-- enabled, distinct columns contains constant columns, non-const columns form prefix of sorting key
|
||||
MergeTreeInOrder
|
||||
-- enabled, distinct columns contains constant columns, non-const columns match prefix of sorting key
|
||||
MergeTreeInOrder
|
||||
-- enabled, only part of distinct columns form prefix of sorting key
|
||||
MergeTreeThread
|
||||
-- enabled, check that sorting properties are propagated from ReadFromMergeTree till preliminary distinct
|
||||
Sorting (Stream): a ASC, b ASC
|
||||
Sorting (Stream): a ASC, b ASC
|
||||
Sorting (Stream): a ASC, b ASC
|
||||
Sorting (Stream): a ASC, b ASC
|
||||
-- check that reading in order optimization for ORDER BY and DISTINCT applied correctly in the same query
|
||||
-- disabled, check that sorting description for ReadFromMergeTree match ORDER BY columns
|
||||
Sorting (Stream): a ASC
|
||||
Sorting (Stream): a ASC
|
||||
Sorting (Stream): a ASC
|
||||
-- enabled, check that ReadFromMergeTree sorting description is overwritten by DISTINCT optimization i.e. it contains columns from DISTINCT clause
|
||||
Sorting (Stream): a ASC, b ASC
|
||||
Sorting (Stream): a ASC, b ASC
|
||||
Sorting (Stream): a ASC, b ASC
|
||||
-- enabled, check that ReadFromMergeTree sorting description is overwritten by DISTINCT optimization, but direction used from ORDER BY clause
|
||||
Sorting (Stream): a DESC, b DESC
|
||||
Sorting (Stream): a DESC, b DESC
|
||||
Sorting (Stream): a DESC, b DESC
|
||||
-- enabled, check that ReadFromMergeTree sorting description is NOT overwritten by DISTINCT optimization (1), - it contains columns from ORDER BY clause
|
||||
Sorting (Stream): a ASC, b ASC
|
||||
Sorting (Stream): a ASC, b ASC
|
||||
Sorting (Stream): a ASC, b ASC
|
||||
-- enabled, check that ReadFromMergeTree sorting description is NOT overwritten by DISTINCT optimization (2), - direction used from ORDER BY clause
|
||||
Sorting (Stream): a DESC, b DESC
|
||||
Sorting (Stream): a DESC, b DESC
|
||||
Sorting (Stream): a DESC, b DESC
|
||||
-- enabled, check that disabling other 'read in order' optimizations do not disable distinct in order optimization
|
||||
Sorting (Stream): a ASC, b ASC
|
||||
Sorting (Stream): a ASC, b ASC
|
||||
Sorting (Stream): a ASC, b ASC
|
||||
|
@ -10,11 +10,16 @@ DISABLE_OPTIMIZATION="set optimize_distinct_in_order=0"
|
||||
ENABLE_OPTIMIZATION="set optimize_distinct_in_order=1"
|
||||
GREP_DISTINCT="grep 'DistinctSortedChunkTransform\|DistinctSortedStreamTransform\|DistinctSortedTransform\|DistinctTransform'"
|
||||
TRIM_LEADING_SPACES="sed -e 's/^[ \t]*//'"
|
||||
FIND_DISTINCT="$GREP_DISTINCT | $TRIM_LEADING_SPACES"
|
||||
REMOVE_NON_LETTERS="sed 's/[^a-zA-Z]//g'"
|
||||
FIND_DISTINCT="$GREP_DISTINCT | $TRIM_LEADING_SPACES | $REMOVE_NON_LETTERS"
|
||||
FIND_READING_IN_ORDER="grep 'MergeTreeInOrder' | $TRIM_LEADING_SPACES | $REMOVE_NON_LETTERS"
|
||||
FIND_READING_DEFAULT="grep 'MergeTreeThread' | $TRIM_LEADING_SPACES | $REMOVE_NON_LETTERS"
|
||||
FIND_SORTING_PROPERTIES="grep 'Sorting (Stream)' | $TRIM_LEADING_SPACES"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "drop table if exists distinct_in_order_explain sync"
|
||||
$CLICKHOUSE_CLIENT -q "create table distinct_in_order_explain (a int, b int, c int) engine=MergeTree() order by (a, b)"
|
||||
$CLICKHOUSE_CLIENT -q "insert into distinct_in_order_explain select number % number, number % 5, number % 10 from numbers(1,10)"
|
||||
$CLICKHOUSE_CLIENT -q "insert into distinct_in_order_explain select number % number, number % 5, number % 10 from numbers(1,10)"
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "select '-- disable optimize_distinct_in_order'"
|
||||
$CLICKHOUSE_CLIENT -q "select '-- distinct all primary key columns -> ordinary distinct'"
|
||||
@ -33,6 +38,9 @@ $CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct a,
|
||||
$CLICKHOUSE_CLIENT -q "select '-- distinct with primary key prefix and order by the same columns -> pre-distinct and final distinct optimization'"
|
||||
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct a, b from distinct_in_order_explain order by a, b" | eval $FIND_DISTINCT
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "select '-- distinct with primary key prefix and order by columns are prefix of distinct columns -> pre-distinct and final distinct optimization'"
|
||||
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct a, b from distinct_in_order_explain order by a" | eval $FIND_DISTINCT
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "select '-- distinct with primary key prefix and order by column in distinct but non-primary key prefix -> pre-distinct and final distinct optimization'"
|
||||
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct a, b, c from distinct_in_order_explain order by c" | eval $FIND_DISTINCT
|
||||
|
||||
@ -51,4 +59,40 @@ $CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct b,
|
||||
$CLICKHOUSE_CLIENT -q "select '-- distinct with non-primary key prefix and order by _const_ column in distinct -> ordinary distinct'"
|
||||
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct b, 1 as x from distinct_in_order_explain order by x" | eval $FIND_DISTINCT
|
||||
|
||||
echo "-- Check reading in order for distinct"
|
||||
echo "-- disabled, distinct columns match sorting key"
|
||||
$CLICKHOUSE_CLIENT --max_threads=0 -nq "$DISABLE_OPTIMIZATION;explain pipeline select distinct a, b from distinct_in_order_explain" | eval $FIND_READING_DEFAULT
|
||||
echo "-- enabled, distinct columns match sorting key"
|
||||
# read_in_order_two_level_merge_threshold is set here to avoid repeating MergeTreeInOrder in output
|
||||
$CLICKHOUSE_CLIENT --read_in_order_two_level_merge_threshold=2 -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct a, b from distinct_in_order_explain" | eval $FIND_READING_IN_ORDER
|
||||
echo "-- enabled, distinct columns form prefix of sorting key"
|
||||
$CLICKHOUSE_CLIENT --read_in_order_two_level_merge_threshold=2 -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct a, b from distinct_in_order_explain" | eval $FIND_READING_IN_ORDER
|
||||
echo "-- enabled, distinct columns DON't form prefix of sorting key"
|
||||
$CLICKHOUSE_CLIENT --max_threads=0 -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct b from distinct_in_order_explain" | eval $FIND_READING_DEFAULT
|
||||
echo "-- enabled, distinct columns contains constant columns, non-const columns form prefix of sorting key"
|
||||
$CLICKHOUSE_CLIENT --read_in_order_two_level_merge_threshold=2 -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct 1, a from distinct_in_order_explain" | eval $FIND_READING_IN_ORDER
|
||||
echo "-- enabled, distinct columns contains constant columns, non-const columns match prefix of sorting key"
|
||||
$CLICKHOUSE_CLIENT --read_in_order_two_level_merge_threshold=2 -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct 1, b, a from distinct_in_order_explain" | eval $FIND_READING_IN_ORDER
|
||||
echo "-- enabled, only part of distinct columns form prefix of sorting key"
|
||||
$CLICKHOUSE_CLIENT --max_threads=0 -nq "$ENABLE_OPTIMIZATION;explain pipeline select distinct a, c from distinct_in_order_explain" | eval $FIND_READING_DEFAULT
|
||||
|
||||
echo "-- enabled, check that sorting properties are propagated from ReadFromMergeTree till preliminary distinct"
|
||||
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;explain plan sorting=1 select distinct b, a from distinct_in_order_explain where a > 0" | eval $FIND_SORTING_PROPERTIES
|
||||
|
||||
echo "-- check that reading in order optimization for ORDER BY and DISTINCT applied correctly in the same query"
|
||||
ENABLE_READ_IN_ORDER="set optimize_read_in_order=1"
|
||||
echo "-- disabled, check that sorting description for ReadFromMergeTree match ORDER BY columns"
|
||||
$CLICKHOUSE_CLIENT -nq "$DISABLE_OPTIMIZATION;$ENABLE_READ_IN_ORDER;explain plan sorting=1 select distinct b, a from distinct_in_order_explain order by a" | eval $FIND_SORTING_PROPERTIES
|
||||
echo "-- enabled, check that ReadFromMergeTree sorting description is overwritten by DISTINCT optimization i.e. it contains columns from DISTINCT clause"
|
||||
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;$ENABLE_READ_IN_ORDER;explain plan sorting=1 select distinct b, a from distinct_in_order_explain order by a" | eval $FIND_SORTING_PROPERTIES
|
||||
echo "-- enabled, check that ReadFromMergeTree sorting description is overwritten by DISTINCT optimization, but direction used from ORDER BY clause"
|
||||
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;$ENABLE_READ_IN_ORDER;explain plan sorting=1 select distinct b, a from distinct_in_order_explain order by a DESC" | eval $FIND_SORTING_PROPERTIES
|
||||
echo "-- enabled, check that ReadFromMergeTree sorting description is NOT overwritten by DISTINCT optimization (1), - it contains columns from ORDER BY clause"
|
||||
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;$ENABLE_READ_IN_ORDER;explain plan sorting=1 select distinct a from distinct_in_order_explain order by a, b" | eval $FIND_SORTING_PROPERTIES
|
||||
echo "-- enabled, check that ReadFromMergeTree sorting description is NOT overwritten by DISTINCT optimization (2), - direction used from ORDER BY clause"
|
||||
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;$ENABLE_READ_IN_ORDER;explain plan sorting=1 select distinct b, a from distinct_in_order_explain order by a DESC, b DESC" | eval $FIND_SORTING_PROPERTIES
|
||||
|
||||
echo "-- enabled, check that disabling other 'read in order' optimizations do not disable distinct in order optimization"
|
||||
$CLICKHOUSE_CLIENT -nq "$ENABLE_OPTIMIZATION;set optimize_read_in_order=0;set optimize_aggregation_in_order=0;set optimize_read_in_window_order=0;explain plan sorting=1 select distinct a,b from distinct_in_order_explain" | eval $FIND_SORTING_PROPERTIES
|
||||
|
||||
$CLICKHOUSE_CLIENT -q "drop table if exists distinct_in_order_explain sync"
|
||||
|
@ -1 +1,2 @@
|
||||
22548578304 1048576 104857600 1 0 0 0 s3_disk_cache/ 0
|
||||
2147483648 1048576 104857600 1 0 0 0 s3_cache/ 0
|
||||
2147483648 1048576 104857600 0 0 0 0 s3_cache_2/ 0
|
||||
|
@ -1,3 +1,4 @@
|
||||
-- Tags: no-fasttest
|
||||
|
||||
DESCRIBE FILESYSTEM CACHE 's3_cache';
|
||||
DESCRIBE FILESYSTEM CACHE 's3_cache_2';
|
||||
|
@ -1,4 +1,4 @@
|
||||
-- Tags: no-fasttest, no-ubsan, no-cpu-aarch64
|
||||
-- Tags: no-fasttest, no-ubsan, no-cpu-aarch64, no-backward-compatibility-check
|
||||
|
||||
SET allow_experimental_annoy_index = 1;
|
||||
|
||||
|
@ -1,3 +1,5 @@
|
||||
-- Tags: no-backward-compatibility-check
|
||||
|
||||
drop table if exists test_02381;
|
||||
create table test_02381(a UInt64, b UInt64) ENGINE = MergeTree order by (a, b);
|
||||
insert into test_02381 select number, number * 10 from system.numbers limit 1000000;
|
||||
|
174
tests/queries/0_stateless/02452_json_utf8_validation.reference
Normal file
174
tests/queries/0_stateless/02452_json_utf8_validation.reference
Normal file
@ -0,0 +1,174 @@
|
||||
JSONCompact
|
||||
{
|
||||
"meta":
|
||||
[
|
||||
{
|
||||
"name": "s",
|
||||
"type": "String"
|
||||
}
|
||||
],
|
||||
|
||||
"data":
|
||||
[
|
||||
["� �"]
|
||||
],
|
||||
|
||||
"rows": 1
|
||||
}
|
||||
JSON
|
||||
{
|
||||
"meta":
|
||||
[
|
||||
{
|
||||
"name": "s",
|
||||
"type": "String"
|
||||
}
|
||||
],
|
||||
|
||||
"data":
|
||||
[
|
||||
{
|
||||
"s": "� �"
|
||||
}
|
||||
],
|
||||
|
||||
"rows": 1
|
||||
}
|
||||
XML
|
||||
<?xml version='1.0' encoding='UTF-8' ?>
|
||||
<result>
|
||||
<meta>
|
||||
<columns>
|
||||
<column>
|
||||
<name>s</name>
|
||||
<type>String</type>
|
||||
</column>
|
||||
</columns>
|
||||
</meta>
|
||||
<data>
|
||||
<row>
|
||||
<s>� �</s>
|
||||
</row>
|
||||
</data>
|
||||
<rows>1</rows>
|
||||
</result>
|
||||
JSONColumnsWithMetadata
|
||||
{
|
||||
"meta":
|
||||
[
|
||||
{
|
||||
"name": "s",
|
||||
"type": "String"
|
||||
}
|
||||
],
|
||||
|
||||
"data":
|
||||
{
|
||||
"s": ["� �"]
|
||||
},
|
||||
|
||||
"rows": 1
|
||||
}
|
||||
JSONEachRow
|
||||
{"s":"� �"}
|
||||
JSONCompactEachRow
|
||||
["� �"]
|
||||
JSONColumns
|
||||
{
|
||||
"s": ["� �"]
|
||||
}
|
||||
JSONCompactColumns
|
||||
[
|
||||
["� �"]
|
||||
]
|
||||
JSONObjectEachRow
|
||||
{
|
||||
"row_1": {"s":"� �"}
|
||||
}
|
||||
JSONCompact
|
||||
{
|
||||
"meta":
|
||||
[
|
||||
{
|
||||
"name": "s",
|
||||
"type": "String"
|
||||
}
|
||||
],
|
||||
|
||||
"data":
|
||||
[
|
||||
["� �"]
|
||||
],
|
||||
|
||||
"rows": 1
|
||||
}
|
||||
JSON
|
||||
{
|
||||
"meta":
|
||||
[
|
||||
{
|
||||
"name": "s",
|
||||
"type": "String"
|
||||
}
|
||||
],
|
||||
|
||||
"data":
|
||||
[
|
||||
{
|
||||
"s": "� �"
|
||||
}
|
||||
],
|
||||
|
||||
"rows": 1
|
||||
}
|
||||
XML
|
||||
<?xml version='1.0' encoding='UTF-8' ?>
|
||||
<result>
|
||||
<meta>
|
||||
<columns>
|
||||
<column>
|
||||
<name>s</name>
|
||||
<type>String</type>
|
||||
</column>
|
||||
</columns>
|
||||
</meta>
|
||||
<data>
|
||||
<row>
|
||||
<s>� �</s>
|
||||
</row>
|
||||
</data>
|
||||
<rows>1</rows>
|
||||
</result>
|
||||
JSONColumnsWithMetadata
|
||||
{
|
||||
"meta":
|
||||
[
|
||||
{
|
||||
"name": "s",
|
||||
"type": "String"
|
||||
}
|
||||
],
|
||||
|
||||
"data":
|
||||
{
|
||||
"s": ["� �"]
|
||||
},
|
||||
|
||||
"rows": 1
|
||||
}
|
||||
JSONEachRow
|
||||
{"s":"í ¨"}
|
||||
JSONCompactEachRow
|
||||
["í ¨"]
|
||||
JSONColumns
|
||||
{
|
||||
"s": ["í ¨"]
|
||||
}
|
||||
JSONCompactColumns
|
||||
[
|
||||
["í ¨"]
|
||||
]
|
||||
JSONObjectEachRow
|
||||
{
|
||||
"row_1": {"s":"í ¨"}
|
||||
}
|
42
tests/queries/0_stateless/02452_json_utf8_validation.sql
Normal file
42
tests/queries/0_stateless/02452_json_utf8_validation.sql
Normal file
@ -0,0 +1,42 @@
|
||||
SET output_format_write_statistics = 0;
|
||||
SET output_format_json_validate_utf8 = 1;
|
||||
SELECT 'JSONCompact';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSONCompact;
|
||||
SELECT 'JSON';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSON;
|
||||
SELECT 'XML';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT XML;
|
||||
SELECT 'JSONColumnsWithMetadata';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSONColumnsWithMetadata;
|
||||
SELECT 'JSONEachRow';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSONEachRow;
|
||||
SELECT 'JSONCompactEachRow';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSONCompactEachRow;
|
||||
SELECT 'JSONColumns';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSONColumns;
|
||||
SELECT 'JSONCompactColumns';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSONCompactColumns;
|
||||
SELECT 'JSONObjectEachRow';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSONObjectEachRow;
|
||||
|
||||
SET output_format_json_validate_utf8 = 0;
|
||||
SELECT 'JSONCompact';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSONCompact;
|
||||
SELECT 'JSON';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSON;
|
||||
SELECT 'XML';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT XML;
|
||||
SELECT 'JSONColumnsWithMetadata';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSONColumnsWithMetadata;
|
||||
SELECT 'JSONEachRow';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSONEachRow;
|
||||
SELECT 'JSONCompactEachRow';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSONCompactEachRow;
|
||||
SELECT 'JSONColumns';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSONColumns;
|
||||
SELECT 'JSONCompactColumns';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSONCompactColumns;
|
||||
SELECT 'JSONObjectEachRow';
|
||||
SELECT '\xED\x20\xA8' AS s FORMAT JSONObjectEachRow;
|
||||
|
||||
|
@ -1,3 +1,5 @@
|
||||
v22.9.2.7-stable 2022-09-23
|
||||
v22.9.1.2603-stable 2022-09-22
|
||||
v22.8.5.29-lts 2022-09-13
|
||||
v22.8.4.7-lts 2022-08-31
|
||||
v22.8.3.13-lts 2022-08-29
|
||||
|
|
Loading…
Reference in New Issue
Block a user