Merge branch 'master' into escape_diag_creds

This commit is contained in:
Alexey Milovidov 2022-08-04 01:59:50 +03:00 committed by GitHub
commit 834cbbedce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
99 changed files with 2448 additions and 547 deletions

View File

@ -3348,6 +3348,10 @@ jobs:
###################################### JEPSEN TESTS #########################################
#############################################################################################
Jepsen:
# This is special test NOT INCLUDED in FinishCheck
# When it's skipped, all dependent tasks will be skipped too.
# DO NOT add it there
if: contains(github.event.pull_request.labels.*.name, 'jepsen-test')
needs: [BuilderBinRelease]
uses: ./.github/workflows/jepsen.yml
@ -3419,7 +3423,6 @@ jobs:
- SharedBuildSmokeTest
- CompatibilityCheck
- IntegrationTestsFlakyCheck
- Jepsen
runs-on: [self-hosted, style-checker]
steps:
- name: Clear repository

View File

@ -29,7 +29,7 @@ jobs:
fetch-depth: 0
- name: Generate versions
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GITHUB_TOKEN: ${{ secrets.ROBOT_CLICKHOUSE_COMMIT_TOKEN }}
run: |
./utils/list-versions/list-versions.sh > ./utils/list-versions/version_date.tsv
GID=$(id -g "${UID}")

2
.gitmodules vendored
View File

@ -201,7 +201,7 @@
[submodule "contrib/boringssl"]
path = contrib/boringssl
url = https://github.com/ClickHouse/boringssl.git
branch = MergeWithUpstream
branch = unknown_branch_from_artur
[submodule "contrib/NuRaft"]
path = contrib/NuRaft
url = https://github.com/ClickHouse/NuRaft.git

View File

@ -10,9 +10,10 @@ The following versions of ClickHouse server are currently being supported with s
| Version | Supported |
|:-|:-|
| 22.7 | ✔️ |
| 22.6 | ✔️ |
| 22.5 | ✔️ |
| 22.4 | ✔️ |
| 22.4 | |
| 22.3 | ✔️ |
| 22.2 | ❌ |
| 22.1 | ❌ |
@ -57,5 +58,5 @@ As the security issue moves from triage, to identified fix, to release planning
## Public Disclosure Timing
A public disclosure date is negotiated by the ClickHouse maintainers and the bug submitter. We prefer to fully disclose the bug as soon as possible once a user mitigation is available. It is reasonable to delay disclosure when the bug or the fix is not yet fully understood, the solution is not well-tested, or for vendor coordination. The timeframe for disclosure is from immediate (especially if it's already publicly known) to 90 days. For a vulnerability with a straightforward mitigation, we expect report date to disclosure date to be on the order of 7 days.
A public disclosure date is negotiated by the ClickHouse maintainers and the bug submitter. We prefer to fully disclose the bug as soon as possible once a user mitigation is available. It is reasonable to delay disclosure when the bug or the fix is not yet fully understood, the solution is not well-tested, or for vendor coordination. The timeframe for disclosure is from immediate (especially if it's already publicly known) to 90 days. For a vulnerability with a straightforward mitigation, we expect the report date to disclosure date to be on the order of 7 days.

2
contrib/NuRaft vendored

@ -1 +1 @@
Subproject commit e1dc47c1cfd529801a8c94a396a3921a71ae3ccf
Subproject commit 2ef198694e10c86175ee6ead389346d199060437

2
contrib/boringssl vendored

@ -1 +1 @@
Subproject commit c1e01a441d6db234f4f12e63a7657d1f9e6db9c1
Subproject commit 8061ac62d67953e61b793042e33baf1352e67510

View File

@ -44,6 +44,8 @@
#define HAVE_SETJMP_H
#define HAVE_SYS_STAT_H
#define HAVE_UNISTD_H
#define HAVE_POLL_H
#define HAVE_PTHREAD_H
#define ENABLE_IPV6
#define USE_OPENSSL

2
contrib/krb5 vendored

@ -1 +1 @@
Subproject commit 5149dea4e2be0f67707383d2682b897c14631374
Subproject commit d879821c7a4c70b0c3ad739d9951d1a2b1903df7

2
contrib/nats-io vendored

@ -1 +1 @@
Subproject commit 6b2227f36757da090321e2d317569d2bd42c4cc1
Subproject commit 1e2597c54616015077e53a26d56b6bac448eb1b6

View File

@ -18,6 +18,8 @@ elseif(WIN32)
set(NATS_PLATFORM_INCLUDE "apple")
endif()
add_definitions(-DNATS_HAS_TLS)
file(GLOB PS_SOURCES "${NATS_IO_SOURCE_DIR}/${NATS_PLATFORM_INCLUDE}/*.c")
set(SRCS
"${NATS_IO_SOURCE_DIR}/asynccb.c"

View File

@ -29,6 +29,7 @@
"docker/test/util": {
"name": "clickhouse/test-util",
"dependent": [
"docker/packager/binary",
"docker/test/base",
"docker/test/fasttest"
]

View File

@ -1,62 +1,7 @@
# rebuild in #33610
# docker build -t clickhouse/binary-builder .
FROM ubuntu:20.04
# ARG for quick switch to a given ubuntu mirror
ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
ENV DEBIAN_FRONTEND=noninteractive LLVM_VERSION=14
RUN apt-get update \
&& apt-get install \
apt-transport-https \
apt-utils \
ca-certificates \
dnsutils \
gnupg \
iputils-ping \
lsb-release \
wget \
--yes --no-install-recommends --verbose-versions \
&& export LLVM_PUBKEY_HASH="bda960a8da687a275a2078d43c111d66b1c6a893a3275271beedf266c1ff4a0cdecb429c7a5cccf9f486ea7aa43fd27f" \
&& wget -nv -O /tmp/llvm-snapshot.gpg.key https://apt.llvm.org/llvm-snapshot.gpg.key \
&& echo "${LLVM_PUBKEY_HASH} /tmp/llvm-snapshot.gpg.key" | sha384sum -c \
&& apt-key add /tmp/llvm-snapshot.gpg.key \
&& export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
&& echo "deb [trusted=yes] https://apt.llvm.org/${CODENAME}/ llvm-toolchain-${CODENAME}-${LLVM_VERSION} main" >> \
/etc/apt/sources.list \
&& apt-get clean
# initial packages
RUN apt-get update \
&& apt-get install \
bash \
build-essential \
ccache \
clang-${LLVM_VERSION} \
clang-tidy-${LLVM_VERSION} \
cmake \
curl \
fakeroot \
gdb \
git \
gperf \
lld-${LLVM_VERSION} \
llvm-${LLVM_VERSION} \
llvm-${LLVM_VERSION}-dev \
moreutils \
ninja-build \
pigz \
rename \
software-properties-common \
tzdata \
nasm \
--yes --no-install-recommends \
&& apt-get clean
# This symlink required by gcc to find lld compiler
RUN ln -s /usr/bin/lld-${LLVM_VERSION} /usr/bin/ld.lld
ARG FROM_TAG=latest
FROM clickhouse/test-util:$FROM_TAG
ENV CC=clang-${LLVM_VERSION}
ENV CXX=clang++-${LLVM_VERSION}
@ -119,18 +64,18 @@ ENV GOCACHE=/workdir/
RUN mkdir /workdir && chmod 777 /workdir
WORKDIR /workdir
# FIXME: thread sanitizer is broken in clang-14, we have to build it with clang-13
# NOTE: thread sanitizer is broken in clang-14, we have to build it with clang-15
# https://github.com/ClickHouse/ClickHouse/pull/39450
# https://github.com/google/sanitizers/issues/1540
# https://github.com/google/sanitizers/issues/1552
RUN export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
&& echo "deb [trusted=yes] https://apt.llvm.org/${CODENAME}/ llvm-toolchain-${CODENAME}-13 main" >> \
&& echo "deb [trusted=yes] https://apt.llvm.org/${CODENAME}/ llvm-toolchain-${CODENAME}-15 main" >> \
/etc/apt/sources.list.d/clang.list \
&& apt-get update \
&& apt-get install \
clang-13 \
clang-tidy-13 \
clang-15 \
clang-tidy-15 \
--yes --no-install-recommends \
&& apt-get clean

View File

@ -3,7 +3,7 @@ set -x -e
exec &> >(ts)
cache_status () {
ccache_status () {
ccache --show-config ||:
ccache --show-stats ||:
}
@ -48,7 +48,7 @@ if [ -n "$MAKE_DEB" ]; then
fi
cache_status
ccache_status
# clear cache stats
ccache --zero-stats ||:
@ -92,7 +92,7 @@ $SCAN_WRAPPER ninja $NINJA_FLAGS $BUILD_TARGET
ls -la ./programs
cache_status
ccache_status
if [ -n "$MAKE_DEB" ]; then
# No quotes because I want it to expand to nothing if empty.
@ -178,7 +178,8 @@ then
mv "coverity-scan.tgz" /output
fi
cache_status
ccache_status
ccache --evict-older-than 1d
if [ "${CCACHE_DEBUG:-}" == "1" ]
then

View File

@ -215,7 +215,7 @@ def parse_env_variables(
cmake_flags.append(f"-DCMAKE_C_COMPILER={cc}")
cmake_flags.append(f"-DCMAKE_CXX_COMPILER={cxx}")
# Create combined output archive for split build and for performance tests.
# Create combined output archive for shared library build and for performance tests.
if package_type == "coverity":
result.append("COMBINED_OUTPUT=coverity")
result.append('COVERITY_TOKEN="$COVERITY_TOKEN"')
@ -234,6 +234,7 @@ def parse_env_variables(
if cache:
result.append("CCACHE_DIR=/ccache")
result.append("CCACHE_COMPRESSLEVEL=5")
result.append("CCACHE_BASEDIR=/build")
result.append("CCACHE_NOHASHDIR=true")
result.append("CCACHE_COMPILERCHECK=content")
@ -242,7 +243,6 @@ def parse_env_variables(
# 15G is not enough for tidy build
cache_maxsize = "25G"
result.append(f"CCACHE_MAXSIZE={cache_maxsize}")
# result.append("CCACHE_UMASK=777")
if distcc_hosts:
hosts_with_params = [f"{host}/24,lzo" for host in distcc_hosts] + [
@ -269,8 +269,8 @@ def parse_env_variables(
"-DUSE_STATIC_LIBRARIES=0 -DSPLIT_SHARED_LIBRARIES=1"
)
# We can't always build utils because it requires too much space, but
# we have to build them at least in some way in CI. The split build is
# probably the least heavy disk-wise.
# we have to build them at least in some way in CI. The shared library
# build is probably the least heavy disk-wise.
cmake_flags.append("-DENABLE_UTILS=1")
# utils are not included into clickhouse-bundle, so build everything
build_target = "all"
@ -333,7 +333,7 @@ if __name__ == "__main__":
parser.add_argument(
"--compiler",
choices=(
"clang-13", # For TSAN builds, see #39450
"clang-15", # For TSAN builds, see #39450
"clang-14",
"clang-14-darwin",
"clang-14-darwin-aarch64",

View File

@ -3,59 +3,12 @@
ARG FROM_TAG=latest
FROM clickhouse/test-util:$FROM_TAG
# ARG for quick switch to a given ubuntu mirror
ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
ENV DEBIAN_FRONTEND=noninteractive LLVM_VERSION=14
RUN apt-get update \
&& apt-get install ca-certificates lsb-release wget gnupg apt-transport-https \
--yes --no-install-recommends --verbose-versions \
&& export LLVM_PUBKEY_HASH="bda960a8da687a275a2078d43c111d66b1c6a893a3275271beedf266c1ff4a0cdecb429c7a5cccf9f486ea7aa43fd27f" \
&& wget -nv -O /tmp/llvm-snapshot.gpg.key https://apt.llvm.org/llvm-snapshot.gpg.key \
&& echo "${LLVM_PUBKEY_HASH} /tmp/llvm-snapshot.gpg.key" | sha384sum -c \
&& apt-key add /tmp/llvm-snapshot.gpg.key \
&& export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
&& echo "deb [trusted=yes] http://apt.llvm.org/${CODENAME}/ llvm-toolchain-${CODENAME}-${LLVM_VERSION} main" >> \
/etc/apt/sources.list
# initial packages
RUN apt-get update \
&& apt-get install \
bash \
fakeroot \
ccache \
curl \
software-properties-common \
--yes --no-install-recommends
# Architecture of the image when BuildKit/buildx is used
ARG TARGETARCH
# Special dpkg-deb (https://github.com/ClickHouse-Extras/dpkg) version which is able
# to compress files using pigz (https://zlib.net/pigz/) instead of gzip.
# Significantly increase deb packaging speed and compatible with old systems
RUN arch=${TARGETARCH:-amd64} \
&& curl -Lo /usr/bin/dpkg-deb https://github.com/ClickHouse-Extras/dpkg/releases/download/1.21.1-clickhouse/dpkg-deb-${arch}
RUN apt-get update \
&& apt-get install \
clang-${LLVM_VERSION} \
debhelper \
devscripts \
gdb \
git \
gperf \
lcov \
llvm-${LLVM_VERSION} \
moreutils \
netbase \
perl \
pigz \
pkg-config \
tzdata \
pv \
nasm \
--yes --no-install-recommends
# Sanitizer options for services (clickhouse-server)

View File

@ -3,83 +3,23 @@
ARG FROM_TAG=latest
FROM clickhouse/test-util:$FROM_TAG
# ARG for quick switch to a given ubuntu mirror
ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
ENV DEBIAN_FRONTEND=noninteractive LLVM_VERSION=14
RUN apt-get update \
&& apt-get install ca-certificates lsb-release wget gnupg apt-transport-https \
--yes --no-install-recommends --verbose-versions \
&& export LLVM_PUBKEY_HASH="bda960a8da687a275a2078d43c111d66b1c6a893a3275271beedf266c1ff4a0cdecb429c7a5cccf9f486ea7aa43fd27f" \
&& wget -nv -O /tmp/llvm-snapshot.gpg.key https://apt.llvm.org/llvm-snapshot.gpg.key \
&& echo "${LLVM_PUBKEY_HASH} /tmp/llvm-snapshot.gpg.key" | sha384sum -c \
&& apt-key add /tmp/llvm-snapshot.gpg.key \
&& export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
&& echo "deb [trusted=yes] https://apt.llvm.org/${CODENAME}/ llvm-toolchain-${CODENAME}-${LLVM_VERSION} main" >> \
/etc/apt/sources.list
# initial packages
RUN apt-get update \
&& apt-get install \
bash \
fakeroot \
ccache \
curl \
software-properties-common \
--yes --no-install-recommends
# Architecture of the image when BuildKit/buildx is used
ARG TARGETARCH
# Special dpkg-deb (https://github.com/ClickHouse-Extras/dpkg) version which is able
# to compress files using pigz (https://zlib.net/pigz/) instead of gzip.
# Significantly increase deb packaging speed and compatible with old systems
RUN arch=${TARGETARCH:-amd64} \
&& curl -Lo /usr/bin/dpkg-deb https://github.com/ClickHouse-Extras/dpkg/releases/download/1.21.1-clickhouse/dpkg-deb-${arch}
RUN apt-get update \
&& apt-get install \
apt-transport-https \
bash \
brotli \
build-essential \
ca-certificates \
ccache \
clang-${LLVM_VERSION} \
clang-tidy-${LLVM_VERSION} \
cmake \
curl \
expect \
fakeroot \
gdb \
git \
gperf \
lld-${LLVM_VERSION} \
llvm-${LLVM_VERSION} \
file \
lsof \
moreutils \
ninja-build \
psmisc \
python3 \
python3-lxml \
python3-pip \
python3-requests \
python3-termcolor \
rename \
software-properties-common \
tzdata \
unixodbc \
file \
nasm \
--yes --no-install-recommends
RUN pip3 install numpy scipy pandas Jinja2
# This symlink required by gcc to find lld compiler
RUN ln -s /usr/bin/lld-${LLVM_VERSION} /usr/bin/ld.lld
ARG odbc_driver_url="https://github.com/ClickHouse/clickhouse-odbc/releases/download/v1.1.4.20200302/clickhouse-odbc-1.1.4-Linux.tar.gz"
RUN mkdir -p /tmp/clickhouse-odbc-tmp \

View File

@ -160,9 +160,8 @@ function run_cmake
"-DENABLE_REPLXX=1"
)
# TODO remove this? we don't use ccache anyway. An option would be to download it
# from S3 simultaneously with cloning.
export CCACHE_DIR="$FASTTEST_WORKSPACE/ccache"
export CCACHE_COMPRESSLEVEL=5
export CCACHE_BASEDIR="$FASTTEST_SOURCE"
export CCACHE_NOHASHDIR=true
export CCACHE_COMPILERCHECK=content
@ -191,6 +190,7 @@ function build
gzip "$FASTTEST_OUTPUT/clickhouse-stripped"
fi
ccache --show-stats ||:
ccache --evict-older-than 1d ||:
)
}

View File

@ -218,6 +218,12 @@ clickhouse-client --query "SELECT 'Server successfully started', 'OK'" >> /test_
|| (echo -e 'Server failed to start (see application_errors.txt and clickhouse-server.clean.log)\tFAIL' >> /test_output/test_results.tsv \
&& grep -a "<Error>.*Application" /var/log/clickhouse-server/clickhouse-server.log > /test_output/application_errors.txt)
echo "Get previous release tag"
previous_release_tag=$(clickhouse-client --query="SELECT version()" | get_previous_release_tag)
echo $previous_release_tag
stop
[ -f /var/log/clickhouse-server/clickhouse-server.log ] || echo -e "Server log does not exist\tFAIL"
[ -f /var/log/clickhouse-server/stderr.log ] || echo -e "Stderr log does not exist\tFAIL"
@ -265,10 +271,6 @@ zgrep -Fa " received signal " /test_output/gdb.log > /dev/null \
echo -e "Backward compatibility check\n"
echo "Get previous release tag"
previous_release_tag=$(clickhouse-client --query="SELECT version()" | get_previous_release_tag)
echo $previous_release_tag
echo "Clone previous release repository"
git clone https://github.com/ClickHouse/ClickHouse.git --no-tags --progress --branch=$previous_release_tag --no-recurse-submodules --depth=1 previous_release_repository
@ -278,7 +280,6 @@ mkdir previous_release_package_folder
echo $previous_release_tag | download_release_packets && echo -e 'Download script exit code\tOK' >> /test_output/test_results.tsv \
|| echo -e 'Download script failed\tFAIL' >> /test_output/test_results.tsv
stop
mv /var/log/clickhouse-server/clickhouse-server.log /var/log/clickhouse-server/clickhouse-server.clean.log
# Check if we cloned previous release repository successfully

View File

@ -1,5 +1,82 @@
# rebuild in #33610
# docker build -t clickhouse/test-util .
FROM ubuntu:20.04
# ARG for quick switch to a given ubuntu mirror
ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
ENV DEBIAN_FRONTEND=noninteractive LLVM_VERSION=14
RUN apt-get update \
&& apt-get install \
apt-transport-https \
apt-utils \
ca-certificates \
dnsutils \
gnupg \
iputils-ping \
lsb-release \
wget \
--yes --no-install-recommends --verbose-versions \
&& export LLVM_PUBKEY_HASH="bda960a8da687a275a2078d43c111d66b1c6a893a3275271beedf266c1ff4a0cdecb429c7a5cccf9f486ea7aa43fd27f" \
&& wget -nv -O /tmp/llvm-snapshot.gpg.key https://apt.llvm.org/llvm-snapshot.gpg.key \
&& echo "${LLVM_PUBKEY_HASH} /tmp/llvm-snapshot.gpg.key" | sha384sum -c \
&& apt-key add /tmp/llvm-snapshot.gpg.key \
&& export CODENAME="$(lsb_release --codename --short | tr 'A-Z' 'a-z')" \
&& echo "deb [trusted=yes] https://apt.llvm.org/${CODENAME}/ llvm-toolchain-${CODENAME}-${LLVM_VERSION} main" >> \
/etc/apt/sources.list \
&& apt-get clean
# initial packages
RUN apt-get update \
&& apt-get install \
bash \
bsdmainutils \
build-essential \
clang-${LLVM_VERSION} \
clang-tidy-${LLVM_VERSION} \
cmake \
curl \
fakeroot \
gdb \
git \
gperf \
lld-${LLVM_VERSION} \
llvm-${LLVM_VERSION} \
llvm-${LLVM_VERSION}-dev \
moreutils \
nasm \
ninja-build \
pigz \
rename \
software-properties-common \
tzdata \
--yes --no-install-recommends \
&& apt-get clean
# This symlink required by gcc to find lld compiler
RUN ln -s /usr/bin/lld-${LLVM_VERSION} /usr/bin/ld.lld
ARG CCACHE_VERSION=4.6.1
RUN mkdir /tmp/ccache \
&& cd /tmp/ccache \
&& curl -L \
-O https://github.com/ccache/ccache/releases/download/v$CCACHE_VERSION/ccache-$CCACHE_VERSION.tar.xz \
-O https://github.com/ccache/ccache/releases/download/v$CCACHE_VERSION/ccache-$CCACHE_VERSION.tar.xz.asc \
&& gpg --recv-keys --keyserver hkps://keyserver.ubuntu.com 5A939A71A46792CF57866A51996DDA075594ADB8 \
&& gpg --verify ccache-4.6.1.tar.xz.asc \
&& tar xf ccache-$CCACHE_VERSION.tar.xz \
&& cd /tmp/ccache/ccache-$CCACHE_VERSION \
&& cmake -DCMAKE_INSTALL_PREFIX=/usr \
-DCMAKE_BUILD_TYPE=None \
-DZSTD_FROM_INTERNET=ON \
-DREDIS_STORAGE_BACKEND=OFF \
-Wno-dev \
-B build \
-S . \
&& make VERBOSE=1 -C build \
&& make install -C build \
&& cd / \
&& rm -rf /tmp/ccache
COPY process_functional_tests_result.py /

View File

@ -0,0 +1,24 @@
---
sidebar_position: 1
sidebar_label: 2022
---
# 2022 Changelog
### ClickHouse release v22.7.2.15-stable (f843089624e) FIXME as compared to v22.7.1.2484-stable (f4f05ec786a)
#### Bug Fix
* Backported in [#39750](https://github.com/ClickHouse/ClickHouse/issues/39750): Fix seeking while reading from encrypted disk. This PR fixes [#38381](https://github.com/ClickHouse/ClickHouse/issues/38381). [#39687](https://github.com/ClickHouse/ClickHouse/pull/39687) ([Vitaly Baranov](https://github.com/vitlibar)).
#### Bug Fix (user-visible misbehavior in official stable or prestable release)
* Backported in [#39591](https://github.com/ClickHouse/ClickHouse/issues/39591): Fix data race and possible heap-buffer-overflow in Avro format. Closes [#39094](https://github.com/ClickHouse/ClickHouse/issues/39094) Closes [#33652](https://github.com/ClickHouse/ClickHouse/issues/33652). [#39498](https://github.com/ClickHouse/ClickHouse/pull/39498) ([Kruglov Pavel](https://github.com/Avogar)).
* Backported in [#39613](https://github.com/ClickHouse/ClickHouse/issues/39613): Fix bug with maxsplit argument for splitByChar, which was not working correctly. [#39552](https://github.com/ClickHouse/ClickHouse/pull/39552) ([filimonov](https://github.com/filimonov)).
* Backported in [#39792](https://github.com/ClickHouse/ClickHouse/issues/39792): Fix wrong index analysis with tuples and operator `IN`, which could lead to wrong query result. [#39752](https://github.com/ClickHouse/ClickHouse/pull/39752) ([Anton Popov](https://github.com/CurtizJ)).
* Backported in [#39837](https://github.com/ClickHouse/ClickHouse/issues/39837): Fix `CANNOT_READ_ALL_DATA` exception with `local_filesystem_read_method=pread_threadpool`. This bug affected only Linux kernel version 5.9 and 5.10 according to [man](https://manpages.debian.org/testing/manpages-dev/preadv2.2.en.html#BUGS). [#39800](https://github.com/ClickHouse/ClickHouse/pull/39800) ([Anton Popov](https://github.com/CurtizJ)).
#### NOT FOR CHANGELOG / INSIGNIFICANT
* Replace MemoryTrackerBlockerInThread to LockMemoryExceptionInThread [#39619](https://github.com/ClickHouse/ClickHouse/pull/39619) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Change mysql-odbc url [#39702](https://github.com/ClickHouse/ClickHouse/pull/39702) ([Mikhail f. Shiryaev](https://github.com/Felixoid)).

View File

@ -29,7 +29,7 @@ ENGINE = PostgreSQL('host:port', 'database', 'user', 'password'[, `schema`, `use
## Data Types Support {#data_types-support}
| PostgerSQL | ClickHouse |
| PostgreSQL | ClickHouse |
|------------------|--------------------------------------------------------------|
| DATE | [Date](../../sql-reference/data-types/date.md) |
| TIMESTAMP | [DateTime](../../sql-reference/data-types/datetime.md) |

View File

@ -22,7 +22,7 @@ Consider using the [sipHash64](#hash_functions-siphash64) function instead.
**Arguments**
The function takes a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md).
The function takes a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md). For some data types calculated value of hash function may be the same for the same values even if types of arguments differ (integers of different size, named and unnamed `Tuple` with the same data, `Map` and the corresponding `Array(Tuple(key, value))` type with the same data).
**Returned Value**
@ -69,7 +69,7 @@ Function [interprets](../../sql-reference/functions/type-conversion-functions.md
**Arguments**
The function takes a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md).
The function takes a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md). For some data types calculated value of hash function may be the same for the same values even if types of arguments differ (integers of different size, named and unnamed `Tuple` with the same data, `Map` and the corresponding `Array(Tuple(key, value))` type with the same data).
**Returned Value**
@ -99,7 +99,7 @@ sipHash128(par1,...)
**Arguments**
The function takes a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md).
The function takes a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md). For some data types calculated value of hash function may be the same for the same values even if types of arguments differ (integers of different size, named and unnamed `Tuple` with the same data, `Map` and the corresponding `Array(Tuple(key, value))` type with the same data).
**Returned value**
@ -135,7 +135,7 @@ This is a fast non-cryptographic hash function. It uses the CityHash algorithm f
**Arguments**
The function takes a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md).
The function takes a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md). For some data types calculated value of hash function may be the same for the same values even if types of arguments differ (integers of different size, named and unnamed `Tuple` with the same data, `Map` and the corresponding `Array(Tuple(key, value))` type with the same data).
**Returned Value**
@ -275,7 +275,7 @@ These functions use the `Fingerprint64` and `Hash64` methods respectively from a
**Arguments**
The function takes a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md).
The function takes a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md). For some data types calculated value of hash function may be the same for the same values even if types of arguments differ (integers of different size, named and unnamed `Tuple` with the same data, `Map` and the corresponding `Array(Tuple(key, value))` type with the same data)..
**Returned Value**
@ -401,7 +401,7 @@ metroHash64(par1, ...)
**Arguments**
The function takes a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md).
The function takes a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md). For some data types calculated value of hash function may be the same for the same values even if types of arguments differ (integers of different size, named and unnamed `Tuple` with the same data, `Map` and the corresponding `Array(Tuple(key, value))` type with the same data).
**Returned Value**
@ -436,7 +436,7 @@ murmurHash2_64(par1, ...)
**Arguments**
Both functions take a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md).
Both functions take a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md). For some data types calculated value of hash function may be the same for the same values even if types of arguments differ (integers of different size, named and unnamed `Tuple` with the same data, `Map` and the corresponding `Array(Tuple(key, value))` type with the same data).
**Returned Value**
@ -504,7 +504,7 @@ murmurHash3_64(par1, ...)
**Arguments**
Both functions take a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md).
Both functions take a variable number of input parameters. Arguments can be any of the [supported data types](../../sql-reference/data-types/index.md). For some data types calculated value of hash function may be the same for the same values even if types of arguments differ (integers of different size, named and unnamed `Tuple` with the same data, `Map` and the corresponding `Array(Tuple(key, value))` type with the same data).
**Returned Value**

View File

@ -29,7 +29,7 @@ ENGINE = PostgreSQL('host:port', 'database', 'user', 'password'[, `schema`, `use
## Поддерживаемые типы данных {#data_types-support}
| PostgerSQL | ClickHouse |
| PostgreSQL | ClickHouse |
|------------------|--------------------------------------------------------------|
| DATE | [Date](../../sql-reference/data-types/date.md) |
| TIMESTAMP | [DateTime](../../sql-reference/data-types/datetime.md) |

View File

@ -29,7 +29,7 @@ ENGINE = PostgreSQL('host:port', 'database', 'user', 'password'[, `use_table_cac
## 支持的数据类型 {#data_types-support}
| PostgerSQL | ClickHouse |
| PostgreSQL | ClickHouse |
|------------------|--------------------------------------------------------------|
| DATE | [Date](../../sql-reference/data-types/date.md) |
| TIMESTAMP | [DateTime](../../sql-reference/data-types/datetime.md) |

View File

@ -0,0 +1,8 @@
position: 10
label: '优化查询性能'
collapsible: true
collapsed: true
link:
type: generated-index
title: Improving Query Performance
slug: /zh/guides/improving-query-performance

View File

@ -0,0 +1,167 @@
---
sidebar_label: Data Skipping Indexes
sidebar_position: 2
---
# 深入理解ClickHouse跳数索引
### 跳数索引
影响ClickHouse查询性能的因素很多。在大多数场景中关键因素是ClickHouse在计算查询WHERE子句条件时是否可以使用主键。因此选择适用于最常见查询模式的主键对于表的设计至关重要。
然而无论如何仔细地调优主键不可避免地会出现不能有效使用它的查询用例。用户通常依赖于ClickHouse获得时间序列类型的数据但他们通常希望根据其他业务维度(如客户id、网站URL或产品编号)分析同一批数据。在这种情况下查询性能可能会相当差因为应用WHERE子句条件可能需要对每个列值进行完整扫描。虽然ClickHouse在这些情况下仍然相对较快但计算数百万或数十亿个单独的值将导致“非索引”查询的执行速度比基于主键的查询慢得多。
在传统的关系数据库中解决这个问题的一种方法是将一个或多个“二级”索引附加到表上。这是一个b-树结构允许数据库在O(log(n))时间内找到磁盘上所有匹配的行而不是O(n)时间(一次表扫描)其中n是行数。但是这种类型的二级索引不适用于ClickHouse(或其他面向列的数据库),因为磁盘上没有单独的行可以添加到索引中。
相反ClickHouse提供了一种不同类型的索引在特定情况下可以显著提高查询速度。这些结构被标记为跳数索引因为它们使ClickHouse能够跳过保证没有匹配值的数据块。
### 基本操作
用户只能在MergeTree表引擎上使用数据跳数索引。每个跳数索引都有四个主要参数
- 索引名称。索引名用于在每个分区中创建索引文件。此外,在删除或具体化索引时需要将其作为参数。
- 索引的表达式。索引表达式用于计算存储在索引中的值集。它可以是列、简单操作符、函数的子集的组合。
- 类型。索引的类型控制计算,该计算决定是否可以跳过读取和计算每个索引块。
- GRANULARITY。每个索引块由颗粒granule组成。例如如果主表索引粒度为8192行GRANULARITY为4则每个索引“块”将为32768行。
当用户创建数据跳数索引时,表的每个数据部分目录中将有两个额外的文件。
- skp_idx_{index_name}.idx包含排序的表达式值。
- skp_idx_{index_name}.mrk2包含关联数据列文件中的相应偏移量。
如果在执行查询并读取相关列文件时WHERE子句过滤条件的某些部分与跳数索引表达式匹配ClickHouse将使用索引文件数据来确定每个相关的数据块是必须被处理还是可以被绕过(假设块还没有通过应用主键索引被排除)。这里用一个非常简单的示例:考虑以下加载了可预测数据的表。
```
CREATE TABLE skip_table
(
my_key UInt64,
my_value UInt64
)
ENGINE MergeTree primary key my_key
SETTINGS index_granularity=8192;
INSERT INTO skip_table SELECT number, intDiv(number,4096) FROM numbers(100000000);
```
当执行一个不使用主键的简单查询时将扫描my_value列所有的一亿条记录
```
SELECT * FROM skip_table WHERE my_value IN (125, 700)
┌─my_key─┬─my_value─┐
│ 512000 │ 125 │
│ 512001 │ 125 │
│ ... | ... |
└────────┴──────────┘
8192 rows in set. Elapsed: 0.079 sec. Processed 100.00 million rows, 800.10 MB (1.26 billion rows/s., 10.10 GB/s.
```
增加一个基本的跳数索引:
```
ALTER TABLE skip_table ADD INDEX vix my_value TYPE set(100) GRANULARITY 2;
```
通常,跳数索引只应用于新插入的数据,所以仅仅添加索引不会影响上述查询。
要使已经存在的数据生效,那执行:
```
ALTER TABLE skip_table MATERIALIZE INDEX vix;
```
重跑SQL
```
SELECT * FROM skip_table WHERE my_value IN (125, 700)
┌─my_key─┬─my_value─┐
│ 512000 │ 125 │
│ 512001 │ 125 │
│ ... | ... |
└────────┴──────────┘
8192 rows in set. Elapsed: 0.051 sec. Processed 32.77 thousand rows, 360.45 KB (643.75 thousand rows/s., 7.08 MB/s.)
```
这次没有再去处理1亿行800MB的数据ClickHouse只读取和分析32768行360KB的数据—4个granule的数据。
下图是更直观的展示这就是如何读取和选择my_value为125的4096行以及如何跳过以下行而不从磁盘读取:
![Simple Skip](../../../en/guides/improving-query-performance/images/simple_skip.svg)
通过在执行查询时启用跟踪用户可以看到关于跳数索引使用情况的详细信息。在clickhouse-client中设置send_logs_level:
```
SET send_logs_level='trace';
```
这将在尝试调优查询SQL和表索引时提供有用的调试信息。上面的例子中调试日志显示跳数索引过滤了大部分granule只读取了两个:
```
<Debug> default.skip_table (933d4b2c-8cea-4bf9-8c93-c56e900eefd1) (SelectExecutor): Index `vix` has dropped 6102/6104 granules.
```
### 跳数索引类型
#### minmax
这种轻量级索引类型不需要参数。它存储每个块的索引表达式的最小值和最大值(如果表达式是一个元组,它分别存储元组元素的每个成员的值)。对于倾向于按值松散排序的列,这种类型非常理想。在查询处理期间,这种索引类型的开销通常是最小的。
这种类型的索引只适用于标量或元组表达式——索引永远不适用于返回数组或map数据类型的表达式。
#### set
这种轻量级索引类型接受单个参数max_size即每个块的值集(0允许无限数量的离散值)。这个集合包含块中的所有值(如果值的数量超过max_size则为空)。这种索引类型适用于每组颗粒中基数较低(本质上是“聚集在一起”)但总体基数较高的列。
该索引的成本、性能和有效性取决于块中的基数。如果每个块包含大量惟一值那么针对大型索引集计算查询条件将非常昂贵或者由于索引超过max_size而为空因此索引将不应用。
#### Bloom Filter Types
Bloom filter是一种数据结构它允许对集合成员进行高效的是否存在测试但代价是有轻微的误报。在跳数索引的使用场景假阳性不是一个大问题因为惟一的问题只是读取一些不必要的块。潜在的假阳性意味着索引表达式应该为真否则有效的数据可能会被跳过。
因为Bloom filter可以更有效地处理大量离散值的测试所以它们可以适用于大量条件表达式判断的场景。特别的是Bloom filter索引可以应用于数组数组中的每个值都被测试也可以应用于map通过使用mapKeys或mapValues函数将键或值转换为数组。
有三种基于Bloom过滤器的数据跳数索引类型
* 基本的**bloom_filter**接受一个可选参数该参数表示在0到1之间允许的“假阳性”率(如果未指定,则使用.025)。
* 更专业的**tokenbf_v1**。需要三个参数用来优化布隆过滤器1过滤器的大小字节(大过滤器有更少的假阳性,有更高的存储成本)2哈希函数的个数(更多的散列函数可以减少假阳性)。3布隆过滤器哈希函数的种子。有关这些参数如何影响布隆过滤器功能的更多细节请参阅 [这里](https://hur.st/bloomfilter/) 。此索引仅适用于String、FixedString和Map类型的数据。输入表达式被分割为由非字母数字字符分隔的字符序列。例如列值`This is a candidate for a "full text" search`将被分割为`This` `is` `a` `candidate` `for` `full` `text` `search`。它用于LIKE、EQUALS、in、hasToken()和类似的长字符串中单词和其他值的搜索。例如,一种可能的用途是在非结构的应用程序日志行列中搜索少量的类名或行号。
* 更专业的**ngrambf_v1**。该索引的功能与tokenbf_v1相同。在Bloom filter设置之前需要一个额外的参数即要索引的ngram的大小。一个ngram是长度为n的任何字符串比如如果n是4`A short string`会被分割为`A sh`` sho`, `shor`, `hort`, `ort s`, `or st`, `r str`, ` stri`, `trin`, `ring`。这个索引对于文本搜索也很有用,特别是没有单词间断的语言,比如中文。
### 跳数索引函数
跳数索引核心目的是限制流行查询分析的数据量。鉴于ClickHouse数据的分析特性这些查询的模式在大多数情况下都包含函数表达式。因此跳数索引必须与常用函数正确交互才能提高效率。这种情况可能发生在:
* 插入数据并将索引定义为一个函数表达式(表达式的结果存储在索引文件中)或者
* 处理查询,并将表达式应用于存储的索引值,以确定是否排除数据块。
每种类型的跳数索引支持的函数列表可以查看 [这里](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree/#functions-support) 。通常集合索引和基于Bloom filter的索引(另一种类型的集合索引)都是无序的因此不能用于范围。相反最大最小值索引在范围中工作得特别好因为确定范围是否相交非常快。部分匹配函数LIKE、startsWith、endsWith和hasToken的有效性取决于使用的索引类型、索引表达式和数据的特定形状。
### 跳数索引的配置
有两个可用的设置可应用于跳数索引。
* **use_skip_indexes** (0或1默认为1)。不是所有查询都可以有效地使用跳过索引。如果一个特定的过滤条件可能包含很多颗粒那么应用数据跳过索引将导致不必要的、有时甚至是非常大的成本。对于不太可能从任何跳过索引中获益的查询将该值设置为0。
* **force_data_skipping_indexes** (以逗号分隔的索引名列表)。此设置可用于防止某些类型的低效查询。在某些情况下,除非使用跳过索引,否则查询表的开销太大,如果将此设置与一个或多个索引名一起使用,则对于任何没有使用所列索引的查询将返回一个异常。这将防止编写糟糕的查询消耗服务器资源。
### 最佳实践
跳数索引并不直观特别是对于来自RDMS领域并且习惯二级行索引或来自文档存储的反向索引的用户来说。要获得任何优化应用ClickHouse数据跳数索引必须避免足够多的颗粒读取以抵消计算索引的成本。关键是如果一个值在一个索引块中只出现一次就意味着整个块必须读入内存并计算而索引开销是不必要的。
考虑以下数据分布:
![Bad Skip!](../../../en/guides/improving-query-performance/images/bad_skip_1.svg)
假设主键/顺序是时间戳并且在visitor_id上有一个索引。考虑下面的查询:
`SELECT timestamp, url FROM table WHERE visitor_id = 1001`
对于这种数据分布传统的二级索引非常有利。不是读取所有的32678行来查找具有请求的visitor_id的5行而是二级索引只包含5行位置并且只从磁盘读取这5行。ClickHouse数据跳过索引的情况正好相反。无论跳转索引的类型是什么visitor_id列中的所有32678值都将被测试。
因此试图通过简单地向键列添加索引来加速ClickHouse查询的冲动通常是不正确的。只有在研究了其他替代方法之后才应该使用此高级功能例如修改主键(查看 [如何选择主键](../improving-query-performance/sparse-primary-indexes.md))、使用投影或使用实体化视图。即使跳数索引是合适的,也经常需要对索引和表进行仔细的调优。
在大多数情况下,一个有用的跳数索引需要主键和目标的非主列/表达式之间具有很强的相关性。如果没有相关性(如上图所示),那么在包含数千个值的块中,至少有一行满足过滤条件的可能性很高,并且只有几个块会被跳过。相反,如果主键的值范围(如一天中的时间)与潜在索引列中的值强相关(如电视观众年龄)则最小值类型的索引可能是有益的。注意在插入数据时可以增加这种相关性方法是在sort /ORDER by键中包含额外的列或者以在插入时对与主键关联的值进行分组的方式对插入进行批处理。例如即使主键是一个包含大量站点事件的时间戳特定site_id的所有事件也都可以被分组并由写入进程插入到一起这将导致许多只包含少量站点id的颗粒因此当根据特定的site_id值搜索时可以跳过许多块。
跳数索引的另一个候选者是高基数表达式其中任何一个值在数据中都相对稀疏。一个可能的例子是跟踪API请求中的错误代码的可观察性平台。某些错误代码虽然在数据中很少出现但对搜索来说可能特别重要。error_code列上的set skip索引将允许绕过绝大多数不包含错误的块从而显著改善针对错误的查询。
最后关键的最佳实践是测试、测试、再测试。同样与用于搜索文档的b-树二级索引或倒排索引不同,跳数索引行为是不容易预测的。将它们添加到表中会在数据摄取和查询方面产生很大的成本,这些查询由于各种原因不能从索引中受益。它们应该总是在真实世界的数据类型上进行测试,测试应该包括类型、粒度大小和其他参数的变化。测试通常会暴露仅仅通过思考不能发现的陷阱。

File diff suppressed because it is too large Load Diff

View File

@ -26,7 +26,7 @@ if [ "$1" = configure ] || [ -n "$not_deb_os" ]; then
${CLICKHOUSE_GENERIC_PROGRAM} install --user "${CLICKHOUSE_USER}" --group "${CLICKHOUSE_GROUP}" --pid-path "${CLICKHOUSE_PIDDIR}" --config-path "${CLICKHOUSE_CONFDIR}" --binary-path "${CLICKHOUSE_BINDIR}" --log-path "${CLICKHOUSE_LOGDIR}" --data-path "${CLICKHOUSE_DATADIR}"
if [ -x "/bin/systemctl" ] && [ -f /etc/systemd/system/clickhouse-server.service ] && [ -d /run/systemd/system ]; then
if [ -x "/bin/systemctl" ] && [ -f /lib/systemd/system/clickhouse-server.service ] && [ -d /run/systemd/system ]; then
# if old rc.d service present - remove it
if [ -x "/etc/init.d/clickhouse-server" ] && [ -x "/usr/sbin/update-rc.d" ]; then
/usr/sbin/update-rc.d clickhouse-server remove
@ -44,4 +44,16 @@ if [ "$1" = configure ] || [ -n "$not_deb_os" ]; then
fi
fi
fi
# /etc/systemd/system/clickhouse-server.service shouldn't be distributed by the package, but it was
# here we delete the service file if it was from our package
if [ -f /etc/systemd/system/clickhouse-server.service ]; then
SHA256=$(sha256sum /etc/systemd/system/clickhouse-server.service | cut -d' ' -f1)
for ref_sum in 7769a14773e811a56f67fd70f7960147217f5e68f746010aec96722e24d289bb 22890012047ea84fbfcebd6e291fe2ef2185cbfdd94a0294e13c8bf9959f58f8 b7790ae57156663c723f92e75ac2508453bf0a7b7e8313bb8081da99e5e88cd3 d1dcc1dbe92dab3ae17baa395f36abf1876b4513df272bf021484923e0111eef ac29ddd32a02eb31670bf5f0018c5d8a3cc006ca7ea572dcf717cb42310dcad7 c62d23052532a70115414833b500b266647d3924eb006a6f3eb673ff0d55f8fa b6b200ffb517afc2b9cf9e25ad8a4afdc0dad5a045bddbfb0174f84cc5a959ed; do
if [ "$SHA256" = "$ref_sum" ]; then
rm /etc/systemd/system/clickhouse-server.service
break
fi
done
fi
fi

View File

@ -323,12 +323,28 @@ void LocalServer::setupUsers()
auto & access_control = global_context->getAccessControl();
access_control.setNoPasswordAllowed(config().getBool("allow_no_password", true));
access_control.setPlaintextPasswordAllowed(config().getBool("allow_plaintext_password", true));
if (config().has("users_config") || config().has("config-file") || fs::exists("config.xml"))
if (config().has("config-file") || fs::exists("config.xml"))
{
const auto users_config_path = config().getString("users_config", config().getString("config-file", "config.xml"));
ConfigProcessor config_processor(users_config_path);
const auto loaded_config = config_processor.loadConfig();
users_config = loaded_config.configuration;
String config_path = config().getString("config-file", "");
bool has_user_directories = config().has("user_directories");
const auto config_dir = fs::path{config_path}.remove_filename().string();
String users_config_path = config().getString("users_config", "");
if (users_config_path.empty() && has_user_directories)
{
users_config_path = config().getString("user_directories.users_xml.path");
if (fs::path(users_config_path).is_relative() && fs::exists(fs::path(config_dir) / users_config_path))
users_config_path = fs::path(config_dir) / users_config_path;
}
if (users_config_path.empty())
users_config = getConfigurationFromXMLString(minimal_default_user_xml);
else
{
ConfigProcessor config_processor(users_config_path);
const auto loaded_config = config_processor.loadConfig();
users_config = loaded_config.configuration;
}
}
else
users_config = getConfigurationFromXMLString(minimal_default_user_xml);
@ -338,7 +354,6 @@ void LocalServer::setupUsers()
throw Exception("Can't load config for users", ErrorCodes::CANNOT_LOAD_CONFIG);
}
void LocalServer::connect()
{
connection_parameters = ConnectionParameters(config());

View File

@ -152,8 +152,8 @@ template <typename Data>
class AggregateFunctionDistinct : public IAggregateFunctionDataHelper<Data, AggregateFunctionDistinct<Data>>
{
private:
static constexpr auto prefix_size = sizeof(Data);
AggregateFunctionPtr nested_func;
size_t prefix_size;
size_t arguments_num;
AggregateDataPtr getNestedPlace(AggregateDataPtr __restrict place) const noexcept
@ -170,7 +170,11 @@ public:
AggregateFunctionDistinct(AggregateFunctionPtr nested_func_, const DataTypes & arguments, const Array & params_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionDistinct>(arguments, params_)
, nested_func(nested_func_)
, arguments_num(arguments.size()) {}
, arguments_num(arguments.size())
{
size_t nested_size = nested_func->alignOfData();
prefix_size = (sizeof(Data) + nested_size - 1) / nested_size * nested_size;
}
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{

View File

@ -12,12 +12,14 @@
#include <Common/RadixSort.h>
#include <Common/SipHash.h>
#include <Common/WeakHash.h>
#include <Common/TargetSpecific.h>
#include <Common/assert_cast.h>
#include <base/sort.h>
#include <base/unaligned.h>
#include <base/bit_cast.h>
#include <base/scope_guard.h>
#include <bit>
#include <cmath>
#include <cstring>
@ -25,6 +27,10 @@
# include <emmintrin.h>
#endif
#if USE_MULTITARGET_CODE
# include <immintrin.h>
#endif
#if USE_EMBEDDED_COMPILER
#include <DataTypes/Native.h>
#include <llvm/IR/IRBuilder.h>
@ -471,6 +477,115 @@ void ColumnVector<T>::insertRangeFrom(const IColumn & src, size_t start, size_t
memcpy(data.data() + old_size, &src_vec.data[start], length * sizeof(data[0]));
}
static inline UInt64 blsr(UInt64 mask)
{
#ifdef __BMI__
return _blsr_u64(mask);
#else
return mask & (mask-1);
#endif
}
DECLARE_DEFAULT_CODE(
template <typename T, typename Container, size_t SIMD_BYTES>
inline void doFilterAligned(const UInt8 *& filt_pos, const UInt8 *& filt_end_aligned, const T *& data_pos, Container & res_data)
{
while (filt_pos < filt_end_aligned)
{
UInt64 mask = bytes64MaskToBits64Mask(filt_pos);
if (0xffffffffffffffff == mask)
{
res_data.insert(data_pos, data_pos + SIMD_BYTES);
}
else
{
while (mask)
{
size_t index = std::countr_zero(mask);
res_data.push_back(data_pos[index]);
mask = blsr(mask);
}
}
filt_pos += SIMD_BYTES;
data_pos += SIMD_BYTES;
}
}
)
DECLARE_AVX512VBMI2_SPECIFIC_CODE(
template <size_t ELEMENT_WIDTH>
inline void compressStoreAVX512(const void *src, void *dst, const UInt64 mask)
{
__m512i vsrc = _mm512_loadu_si512(src);
if constexpr (ELEMENT_WIDTH == 1)
_mm512_mask_compressstoreu_epi8(dst, static_cast<__mmask64>(mask), vsrc);
else if constexpr (ELEMENT_WIDTH == 2)
_mm512_mask_compressstoreu_epi16(dst, static_cast<__mmask32>(mask), vsrc);
else if constexpr (ELEMENT_WIDTH == 4)
_mm512_mask_compressstoreu_epi32(dst, static_cast<__mmask16>(mask), vsrc);
else if constexpr (ELEMENT_WIDTH == 8)
_mm512_mask_compressstoreu_epi64(dst, static_cast<__mmask8>(mask), vsrc);
}
template <typename T, typename Container, size_t SIMD_BYTES>
inline void doFilterAligned(const UInt8 *& filt_pos, const UInt8 *& filt_end_aligned, const T *& data_pos, Container & res_data)
{
static constexpr size_t VEC_LEN = 64; /// AVX512 vector length - 64 bytes
static constexpr size_t ELEMENT_WIDTH = sizeof(T);
static constexpr size_t ELEMENTS_PER_VEC = VEC_LEN / ELEMENT_WIDTH;
static constexpr UInt64 KMASK = 0xffffffffffffffff >> (64 - ELEMENTS_PER_VEC);
size_t current_offset = res_data.size();
size_t reserve_size = res_data.size();
size_t alloc_size = SIMD_BYTES * 2;
while (filt_pos < filt_end_aligned)
{
/// to avoid calling resize too frequently, resize to reserve buffer.
if (reserve_size - current_offset < SIMD_BYTES)
{
reserve_size += alloc_size;
res_data.resize(reserve_size);
alloc_size *= 2;
}
UInt64 mask = bytes64MaskToBits64Mask(filt_pos);
if (0xffffffffffffffff == mask)
{
for (size_t i = 0; i < SIMD_BYTES; i += ELEMENTS_PER_VEC)
_mm512_storeu_si512(reinterpret_cast<void *>(&res_data[current_offset + i]),
_mm512_loadu_si512(reinterpret_cast<const void *>(data_pos + i)));
current_offset += SIMD_BYTES;
}
else
{
if (mask)
{
for (size_t i = 0; i < SIMD_BYTES; i += ELEMENTS_PER_VEC)
{
compressStoreAVX512<ELEMENT_WIDTH>(reinterpret_cast<const void *>(data_pos + i),
reinterpret_cast<void *>(&res_data[current_offset]), mask & KMASK);
current_offset += std::popcount(mask & KMASK);
/// prepare mask for next iter, if ELEMENTS_PER_VEC = 64, no next iter
if (ELEMENTS_PER_VEC < 64)
{
mask >>= ELEMENTS_PER_VEC;
}
}
}
}
filt_pos += SIMD_BYTES;
data_pos += SIMD_BYTES;
}
/// resize to the real size.
res_data.resize(current_offset);
}
)
template <typename T>
ColumnPtr ColumnVector<T>::filter(const IColumn::Filter & filt, ssize_t result_size_hint) const
{
@ -496,31 +611,13 @@ ColumnPtr ColumnVector<T>::filter(const IColumn::Filter & filt, ssize_t result_s
static constexpr size_t SIMD_BYTES = 64;
const UInt8 * filt_end_aligned = filt_pos + size / SIMD_BYTES * SIMD_BYTES;
while (filt_pos < filt_end_aligned)
{
UInt64 mask = bytes64MaskToBits64Mask(filt_pos);
if (0xffffffffffffffff == mask)
{
res_data.insert(data_pos, data_pos + SIMD_BYTES);
}
else
{
while (mask)
{
size_t index = std::countr_zero(mask);
res_data.push_back(data_pos[index]);
#ifdef __BMI__
mask = _blsr_u64(mask);
#else
mask = mask & (mask-1);
#endif
}
}
filt_pos += SIMD_BYTES;
data_pos += SIMD_BYTES;
}
#if USE_MULTITARGET_CODE
static constexpr bool VBMI2_CAPABLE = sizeof(T) == 1 || sizeof(T) == 2 || sizeof(T) == 4 || sizeof(T) == 8;
if (VBMI2_CAPABLE && isArchSupported(TargetArch::AVX512VBMI2))
TargetSpecific::AVX512VBMI2::doFilterAligned<T, Container, SIMD_BYTES>(filt_pos, filt_end_aligned, data_pos, res_data);
else
#endif
TargetSpecific::Default::doFilterAligned<T, Container, SIMD_BYTES>(filt_pos, filt_end_aligned, data_pos, res_data);
while (filt_pos < filt_end)
{

View File

@ -0,0 +1,91 @@
#include <typeinfo>
#include <vector>
#include <Columns/ColumnsNumber.h>
#include <Common/randomSeed.h>
#include <gtest/gtest.h>
using namespace DB;
static pcg64 rng(randomSeed());
static constexpr int error_code = 12345;
static constexpr size_t TEST_RUNS = 500;
static constexpr size_t MAX_ROWS = 10000;
static const std::vector<size_t> filter_ratios = {1, 2, 5, 11, 32, 64, 100, 1000};
static const size_t K = filter_ratios.size();
template <typename T>
static MutableColumnPtr createColumn(size_t n)
{
auto column = ColumnVector<T>::create();
auto & values = column->getData();
for (size_t i = 0; i < n; ++i)
{
values.push_back(i);
}
return column;
}
bool checkFilter(const PaddedPODArray<UInt8> &flit, const IColumn & src, const IColumn & dst)
{
size_t n = flit.size();
size_t dst_size = dst.size();
size_t j = 0; /// index of dest
for (size_t i = 0; i < n; ++i)
{
if (flit[i] != 0)
{
if ((dst_size <= j) || (src.compareAt(i, j, dst, 0) != 0))
return false;
j++;
}
}
return dst_size == j; /// filtered size check
}
template <typename T>
static void testFilter()
{
auto test_case = [&](size_t rows, size_t filter_ratio)
{
auto vector_column = createColumn<T>(rows);
PaddedPODArray<UInt8> flit(rows);
for (size_t i = 0; i < rows; ++i)
flit[i] = rng() % filter_ratio == 0;
auto res_column = vector_column->filter(flit, -1);
if (!checkFilter(flit, *vector_column, *res_column))
throw Exception(error_code, "VectorColumn filter failure, type: {}", typeid(T).name());
};
try
{
for (size_t i = 0; i < TEST_RUNS; ++i)
{
size_t rows = rng() % MAX_ROWS + 1;
size_t filter_ratio = filter_ratios[rng() % K];
test_case(rows, filter_ratio);
}
}
catch (const Exception & e)
{
FAIL() << e.displayText();
}
}
TEST(ColumnVector, Filter)
{
testFilter<UInt8>();
testFilter<Int16>();
testFilter<UInt32>();
testFilter<Int64>();
testFilter<UInt128>();
testFilter<Int256>();
testFilter<Float32>();
testFilter<Float64>();
testFilter<UUID>();
}

View File

@ -13,8 +13,6 @@ namespace fs = std::filesystem;
namespace DB
{
constexpr decltype(ConfigReloader::reload_interval) ConfigReloader::reload_interval;
ConfigReloader::ConfigReloader(
const std::string & path_,
const std::string & include_from_path_,

View File

@ -82,6 +82,7 @@ inline bool cpuid(UInt32 op, UInt32 * res) noexcept /// NOLINT
OP(AVX512BW) \
OP(AVX512VL) \
OP(AVX512VBMI) \
OP(AVX512VBMI2) \
OP(PREFETCHWT1) \
OP(SHA) \
OP(ADX) \
@ -302,6 +303,11 @@ bool haveAVX512VBMI() noexcept
return haveAVX512F() && ((CpuInfo(0x7, 0).registers.ecx >> 1) & 1u);
}
bool haveAVX512VBMI2() noexcept
{
return haveAVX512F() && ((CpuInfo(0x7, 0).registers.ecx >> 6) & 1u);
}
bool haveRDRAND() noexcept
{
return CpuInfo(0x0).registers.eax >= 0x7 && ((CpuInfo(0x1).registers.ecx >> 30) & 1u);

View File

@ -57,17 +57,17 @@ inline std::string_view toDescription(OvercommitResult result)
switch (result)
{
case OvercommitResult::NONE:
return "Memory overcommit isn't used. OvercommitTracker isn't set.";
return "Memory overcommit isn't used. OvercommitTracker isn't set";
case OvercommitResult::DISABLED:
return "Memory overcommit isn't used. Waiting time or orvercommit denominator are set to zero.";
return "Memory overcommit isn't used. Waiting time or overcommit denominator are set to zero";
case OvercommitResult::MEMORY_FREED:
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "OvercommitResult::MEMORY_FREED shouldn't be asked for description");
case OvercommitResult::SELECTED:
return "Query was selected to stop by OvercommitTracker.";
return "Query was selected to stop by OvercommitTracker";
case OvercommitResult::TIMEOUTED:
return "Waiting timeout for memory to be freed is reached.";
return "Waiting timeout for memory to be freed is reached";
case OvercommitResult::NOT_ENOUGH_FREED:
return "Memory overcommit has freed not enough memory.";
return "Memory overcommit has freed not enough memory";
}
}

View File

@ -20,6 +20,8 @@ UInt32 getSupportedArchs()
result |= static_cast<UInt32>(TargetArch::AVX512BW);
if (Cpu::CpuFlagsCache::have_AVX512VBMI)
result |= static_cast<UInt32>(TargetArch::AVX512VBMI);
if (Cpu::CpuFlagsCache::have_AVX512VBMI2)
result |= static_cast<UInt32>(TargetArch::AVX512VBMI2);
return result;
}
@ -38,8 +40,9 @@ String toString(TargetArch arch)
case TargetArch::AVX: return "avx";
case TargetArch::AVX2: return "avx2";
case TargetArch::AVX512F: return "avx512f";
case TargetArch::AVX512BW: return "avx512bw";
case TargetArch::AVX512VBMI: return "avx512vbmi";
case TargetArch::AVX512BW: return "avx512bw";
case TargetArch::AVX512VBMI: return "avx512vbmi";
case TargetArch::AVX512VBMI2: return "avx512vbmi";
}
__builtin_unreachable();

View File

@ -31,7 +31,7 @@
* int funcImpl() {
* return 2;
* }
* ) // DECLARE_DEFAULT_CODE
* ) // DECLARE_AVX2_SPECIFIC_CODE
*
* int func() {
* #if USE_MULTITARGET_CODE
@ -80,8 +80,9 @@ enum class TargetArch : UInt32
AVX = (1 << 1),
AVX2 = (1 << 2),
AVX512F = (1 << 3),
AVX512BW = (1 << 4),
AVX512VBMI = (1 << 5),
AVX512BW = (1 << 4),
AVX512VBMI = (1 << 5),
AVX512VBMI2 = (1 << 6),
};
/// Runtime detection.
@ -100,6 +101,7 @@ String toString(TargetArch arch);
#if defined(__clang__)
#define AVX512VBMI2_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi,avx512vbmi2")))
#define AVX512VBMI_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi")))
#define AVX512BW_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw")))
#define AVX512_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f")))
@ -108,6 +110,8 @@ String toString(TargetArch arch);
#define SSE42_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt")))
#define DEFAULT_FUNCTION_SPECIFIC_ATTRIBUTE
# define BEGIN_AVX512VBMI2_SPECIFIC_CODE \
_Pragma("clang attribute push(__attribute__((target(\"sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi,avx512vbmi2\"))),apply_to=function)")
# define BEGIN_AVX512VBMI_SPECIFIC_CODE \
_Pragma("clang attribute push(__attribute__((target(\"sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi\"))),apply_to=function)")
# define BEGIN_AVX512BW_SPECIFIC_CODE \
@ -129,6 +133,7 @@ String toString(TargetArch arch);
# define DUMMY_FUNCTION_DEFINITION [[maybe_unused]] void _dummy_function_definition();
#else
#define AVX512VBMI2_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi,avx512vbmi2,tune=native")))
#define AVX512VBMI_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi,tune=native")))
#define AVX512BW_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,tune=native")))
#define AVX512_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,tune=native")))
@ -137,6 +142,9 @@ String toString(TargetArch arch);
#define SSE42_FUNCTION_SPECIFIC_ATTRIBUTE __attribute__((target("sse,sse2,sse3,ssse3,sse4,popcnt",tune=native)))
#define DEFAULT_FUNCTION_SPECIFIC_ATTRIBUTE
# define BEGIN_AVX512VBMI2_SPECIFIC_CODE \
_Pragma("GCC push_options") \
_Pragma("GCC target(\"sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi,avx512vbmi2,tune=native\")")
# define BEGIN_AVX512VBMI_SPECIFIC_CODE \
_Pragma("GCC push_options") \
_Pragma("GCC target(\"sse,sse2,sse3,ssse3,sse4,popcnt,avx,avx2,avx512f,avx512bw,avx512vl,avx512vbmi,tune=native\")")
@ -217,6 +225,16 @@ namespace TargetSpecific::AVX512VBMI { \
} \
END_TARGET_SPECIFIC_CODE
#define DECLARE_AVX512VBMI2_SPECIFIC_CODE(...) \
BEGIN_AVX512VBMI2_SPECIFIC_CODE \
namespace TargetSpecific::AVX512VBMI2 { \
DUMMY_FUNCTION_DEFINITION \
using namespace DB::TargetSpecific::AVX512VBMI2; \
__VA_ARGS__ \
} \
END_TARGET_SPECIFIC_CODE
#else
#define USE_MULTITARGET_CODE 0
@ -229,6 +247,7 @@ END_TARGET_SPECIFIC_CODE
#define DECLARE_AVX512F_SPECIFIC_CODE(...)
#define DECLARE_AVX512BW_SPECIFIC_CODE(...)
#define DECLARE_AVX512VBMI_SPECIFIC_CODE(...)
#define DECLARE_AVX512VBMI2_SPECIFIC_CODE(...)
#endif
@ -245,8 +264,9 @@ DECLARE_SSE42_SPECIFIC_CODE (__VA_ARGS__) \
DECLARE_AVX_SPECIFIC_CODE (__VA_ARGS__) \
DECLARE_AVX2_SPECIFIC_CODE (__VA_ARGS__) \
DECLARE_AVX512F_SPECIFIC_CODE(__VA_ARGS__) \
DECLARE_AVX512BW_SPECIFIC_CODE(__VA_ARGS__) \
DECLARE_AVX512VBMI_SPECIFIC_CODE(__VA_ARGS__)
DECLARE_AVX512BW_SPECIFIC_CODE (__VA_ARGS__) \
DECLARE_AVX512VBMI_SPECIFIC_CODE (__VA_ARGS__) \
DECLARE_AVX512VBMI2_SPECIFIC_CODE (__VA_ARGS__)
DECLARE_DEFAULT_CODE(
constexpr auto BuildArch = TargetArch::Default; /// NOLINT
@ -276,6 +296,9 @@ DECLARE_AVX512VBMI_SPECIFIC_CODE(
constexpr auto BuildArch = TargetArch::AVX512VBMI; /// NOLINT
) // DECLARE_AVX512VBMI_SPECIFIC_CODE
DECLARE_AVX512VBMI2_SPECIFIC_CODE(
constexpr auto BuildArch = TargetArch::AVX512VBMI2; /// NOLINT
) // DECLARE_AVX512VBMI2_SPECIFIC_CODE
/** Runtime Dispatch helpers for class members.
*

View File

@ -573,6 +573,19 @@ nuraft::cb_func::ReturnCode KeeperServer::callbackFunc(nuraft::cb_func::Type typ
entry = nuraft::cs_new<nuraft::log_entry>(entry->get_term(), getZooKeeperLogEntry(request_for_session), entry->get_val_type());
break;
}
case nuraft::cb_func::AppendLogFailed:
{
// we are relying on the fact that request are being processed under a mutex
// and not a RW lock
auto & entry = *static_cast<LogEntryPtr *>(param->ctx);
assert(entry->get_val_type() == nuraft::app_log);
auto & entry_buf = entry->get_buf();
auto request_for_session = state_machine->parseRequest(entry_buf);
state_machine->rollbackRequest(request_for_session, true);
break;
}
default:
break;
}

View File

@ -125,9 +125,8 @@ void assertDigest(
{
LOG_FATAL(
&Poco::Logger::get("KeeperStateMachine"),
"Digest for nodes is not matching after {} request of type '{}'.\nExpected digest - {}, actual digest - {} (digest version "
"{}). Keeper will "
"terminate to avoid inconsistencies.\nExtra information about the request:\n{}",
"Digest for nodes is not matching after {} request of type '{}'.\nExpected digest - {}, actual digest - {} (digest "
"{}). Keeper will terminate to avoid inconsistencies.\nExtra information about the request:\n{}",
committing ? "committing" : "preprocessing",
request.getOpNum(),
first.value,
@ -196,13 +195,21 @@ void KeeperStateMachine::preprocess(const KeeperStorage::RequestForSession & req
return;
std::lock_guard lock(storage_and_responses_lock);
storage->preprocessRequest(
request_for_session.request,
request_for_session.session_id,
request_for_session.time,
request_for_session.zxid,
true /* check_acl */,
request_for_session.digest);
try
{
storage->preprocessRequest(
request_for_session.request,
request_for_session.session_id,
request_for_session.time,
request_for_session.zxid,
true /* check_acl */,
request_for_session.digest);
}
catch (...)
{
rollbackRequest(request_for_session, true);
throw;
}
if (keeper_context->digest_enabled && request_for_session.digest)
assertDigest(*request_for_session.digest, storage->getNodesDigest(false), *request_for_session.request, false);
@ -311,11 +318,16 @@ void KeeperStateMachine::rollback(uint64_t log_idx, nuraft::buffer & data)
if (!request_for_session.zxid)
request_for_session.zxid = log_idx;
rollbackRequest(request_for_session, false);
}
void KeeperStateMachine::rollbackRequest(const KeeperStorage::RequestForSession & request_for_session, bool allow_missing)
{
if (request_for_session.request->getOpNum() == Coordination::OpNum::SessionID)
return;
std::lock_guard lock(storage_and_responses_lock);
storage->rollbackRequest(request_for_session.zxid);
storage->rollbackRequest(request_for_session.zxid, allow_missing);
}
nuraft::ptr<nuraft::snapshot> KeeperStateMachine::last_snapshot()

View File

@ -44,6 +44,10 @@ public:
void rollback(uint64_t log_idx, nuraft::buffer & data) override;
// allow_missing - whether the transaction we want to rollback can be missing from storage
// (can happen in case of exception during preprocessing)
void rollbackRequest(const KeeperStorage::RequestForSession & request_for_session, bool allow_missing);
uint64_t last_commit_index() override { return last_committed_idx; }
/// Apply preliminarily saved (save_logical_snp_obj) snapshot to our state.

View File

@ -6,7 +6,7 @@
#include <boost/algorithm/string.hpp>
#include <Poco/Base64Encoder.h>
#include <Poco/SHA1Engine.h>
#include "Common/ZooKeeper/ZooKeeperCommon.h"
#include <Common/ZooKeeper/ZooKeeperCommon.h>
#include <Common/SipHash.h>
#include <Common/ZooKeeper/ZooKeeperConstants.h>
#include <Common/StringUtils/StringUtils.h>
@ -14,6 +14,7 @@
#include <Common/hex.h>
#include <Common/logger_useful.h>
#include <Common/setThreadName.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Coordination/pathUtils.h>
#include <Coordination/KeeperConstants.h>
#include <sstream>
@ -365,10 +366,10 @@ void KeeperStorage::UncommittedState::addDeltas(std::vector<Delta> new_deltas)
{
for (auto & delta : new_deltas)
{
if (!delta.path.empty())
applyDelta(delta);
const auto & added_delta = deltas.emplace_back(std::move(delta));
deltas.push_back(std::move(delta));
if (!added_delta.path.empty())
applyDelta(added_delta);
}
}
@ -2113,14 +2114,30 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(
return results;
}
void KeeperStorage::rollbackRequest(int64_t rollback_zxid)
void KeeperStorage::rollbackRequest(int64_t rollback_zxid, bool allow_missing)
{
if (allow_missing && (uncommitted_transactions.empty() || uncommitted_transactions.back().zxid < rollback_zxid))
return;
if (uncommitted_transactions.empty() || uncommitted_transactions.back().zxid != rollback_zxid)
{
throw Exception(
ErrorCodes::LOGICAL_ERROR, "Trying to rollback invalid ZXID ({}). It should be the last preprocessed.", rollback_zxid);
}
uncommitted_transactions.pop_back();
uncommitted_state.rollback(rollback_zxid);
// if an exception occurs during rollback, the best option is to terminate because we can end up in an inconsistent state
// we block memory tracking so we can avoid terminating if we're rollbacking because of memory limit
MemoryTrackerBlockerInThread temporarily_ignore_any_memory_limits;
try
{
uncommitted_transactions.pop_back();
uncommitted_state.rollback(rollback_zxid);
}
catch (...)
{
LOG_FATAL(&Poco::Logger::get("KeeperStorage"), "Failed to rollback log. Terminating to avoid incosistencies");
std::terminate();
}
}
KeeperStorage::Digest KeeperStorage::getNodesDigest(bool committed) const

View File

@ -73,10 +73,11 @@ public:
enum DigestVersion : uint8_t
{
NO_DIGEST = 0,
V0 = 1
V1 = 1,
V2 = 2 // added system nodes that modify the digest on startup so digest from V0 is invalid
};
static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V0;
static constexpr auto CURRENT_DIGEST_VERSION = DigestVersion::V2;
struct ResponseForSession
{
@ -380,7 +381,7 @@ public:
int64_t new_last_zxid,
bool check_acl = true,
std::optional<Digest> digest = std::nullopt);
void rollbackRequest(int64_t rollback_zxid);
void rollbackRequest(int64_t rollback_zxid, bool allow_missing);
void finalize();

View File

@ -344,7 +344,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, max_temporary_non_const_columns, 0, "", 0) \
\
M(UInt64, max_subquery_depth, 100, "", 0) \
M(UInt64, max_pipeline_depth, 10000, "", 0) \
M(UInt64, max_pipeline_depth, 1000, "", 0) \
M(UInt64, max_ast_depth, 1000, "Maximum depth of query syntax tree. Checked after parsing.", 0) \
M(UInt64, max_ast_elements, 50000, "Maximum size of query syntax tree in number of nodes. Checked after parsing.", 0) \
M(UInt64, max_expanded_ast_elements, 500000, "Maximum size of query syntax tree in number of nodes after expansion of aliases and the asterisk.", 0) \

View File

@ -1018,6 +1018,14 @@ void BaseDaemon::setupWatchdog()
logger().setChannel(log);
}
/// Cuncurrent writing logs to the same file from two threads is questionable on its own,
/// but rotating them from two threads is disastrous.
if (auto * channel = dynamic_cast<DB::OwnSplitChannel *>(logger().getChannel()))
{
channel->setChannelProperty("log", Poco::FileChannel::PROP_ROTATION, "never");
channel->setChannelProperty("log", Poco::FileChannel::PROP_ROTATEONOPEN, "false");
}
logger().information(fmt::format("Will watch for the process with pid {}", pid));
/// Forward signals to the child process.

View File

@ -131,7 +131,6 @@ void SlabsPolygonIndex::indexBuild(const std::vector<Polygon> & polygons)
/** Map of interesting edge ids to the index of left x, the index of right x */
std::vector<size_t> edge_left(m, n), edge_right(m, n);
size_t total_index_edges = 0;
size_t edges_it = 0;
for (size_t l = 0, r = 1; r < sorted_x.size(); ++l, ++r)
{
@ -170,12 +169,10 @@ void SlabsPolygonIndex::indexBuild(const std::vector<Polygon> & polygons)
if (l & 1)
{
edges_index_tree[l++].emplace_back(all_edges[i]);
++total_index_edges;
}
if (r & 1)
{
edges_index_tree[--r].emplace_back(all_edges[i]);
++total_index_edges;
}
}
}

View File

@ -1,4 +1,5 @@
#include "ThreadPoolReader.h"
#include <Common/VersionNumber.h>
#include <Common/assert_cast.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
@ -7,6 +8,7 @@
#include <Common/setThreadName.h>
#include <Common/MemorySanitizer.h>
#include <Common/CurrentThread.h>
#include <Poco/Environment.h>
#include <base/errnoToString.h>
#include <Poco/Event.h>
#include <future>
@ -71,6 +73,16 @@ namespace ErrorCodes
}
#if defined(OS_LINUX)
/// According to man, Linux 5.9 and 5.10 have a bug in preadv2() with the RWF_NOWAIT.
/// https://manpages.debian.org/testing/manpages-dev/preadv2.2.en.html#BUGS
/// We also disable it for older Linux kernels, because according to user's reports, RedHat-patched kernels might be also affected.
static bool hasBugInPreadV2()
{
VersionNumber linux_version(Poco::Environment::osVersion());
return linux_version < VersionNumber{5, 11, 0};
}
#endif
ThreadPoolReader::ThreadPoolReader(size_t pool_size, size_t queue_size_)
: pool(pool_size, pool_size, queue_size_)
@ -88,7 +100,11 @@ std::future<IAsynchronousReader::Result> ThreadPoolReader::submit(Request reques
/// Check if data is already in page cache with preadv2 syscall.
/// We don't want to depend on new Linux kernel.
static std::atomic<bool> has_pread_nowait_support{true};
/// But kernels 5.9 and 5.10 have a bug where preadv2() with the
/// RWF_NOWAIT flag may return 0 even when not at end of file.
/// It can't be distinguished from the real eof, so we have to
/// disable pread with nowait.
static std::atomic<bool> has_pread_nowait_support = !hasBugInPreadV2();
if (has_pread_nowait_support.load(std::memory_order_relaxed))
{

View File

@ -31,12 +31,14 @@
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeEnum.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeMap.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnTuple.h>
#include <Columns/ColumnMap.h>
#include <Functions/IFunction.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/PerformanceAdaptors.h>
@ -1085,6 +1087,16 @@ private:
executeForArgument(tuple_types[i].get(), tmp.get(), vec_to, is_first);
}
}
else if (const auto * map = checkAndGetColumn<ColumnMap>(column))
{
const auto & type_map = assert_cast<const DataTypeMap &>(*type);
executeForArgument(type_map.getNestedType().get(), map->getNestedColumnPtr().get(), vec_to, is_first);
}
else if (const auto * const_map = checkAndGetColumnConstData<ColumnMap>(column))
{
const auto & type_map = assert_cast<const DataTypeMap &>(*type);
executeForArgument(type_map.getNestedType().get(), const_map->getNestedColumnPtr().get(), vec_to, is_first);
}
else
{
if (is_first)

View File

@ -42,6 +42,11 @@ public:
return 1;
}
bool useDefaultImplementationForLowCardinalityColumns() const override
{
return false;
}
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override

View File

@ -1513,8 +1513,7 @@ ActionsDAG::SplitResult ActionsDAG::splitActionsBeforeArrayJoin(const NameSet &
}
auto res = split(split_nodes);
/// Do not remove array joined columns if they are not used.
/// res.first->project_input = false;
res.second->project_input = project_input;
return res;
}

View File

@ -911,7 +911,7 @@ void Aggregator::mergeOnBlockSmall(
mergeStreamsImpl(result.aggregates_pool, *result.NAME, result.NAME->data, \
result.without_key, /* no_more_keys= */ false, \
row_begin, row_end, \
aggregate_columns_data, key_columns);
aggregate_columns_data, key_columns, result.aggregates_pool);
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
@ -2647,19 +2647,23 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns) const
const ColumnRawPtrs & key_columns,
Arena * arena_for_keys) const
{
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
std::unique_ptr<AggregateDataPtr[]> places(new AggregateDataPtr[row_end]);
if (!arena_for_keys)
arena_for_keys = aggregates_pool;
for (size_t i = row_begin; i < row_end; ++i)
{
AggregateDataPtr aggregate_data = nullptr;
if (!no_more_keys)
{
auto emplace_result = state.emplaceKey(data, i, *aggregates_pool);
auto emplace_result = state.emplaceKey(data, i, *arena_for_keys); // NOLINT
if (emplace_result.isInserted())
{
emplace_result.setMapped(nullptr);
@ -2674,7 +2678,7 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
}
else
{
auto find_result = state.findKey(data, i, *aggregates_pool);
auto find_result = state.findKey(data, i, *arena_for_keys);
if (find_result.isFound())
aggregate_data = find_result.getMapped();
}
@ -2703,21 +2707,14 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
Method & method,
Table & data,
AggregateDataPtr overflow_row,
bool no_more_keys) const
bool no_more_keys,
Arena * arena_for_keys) const
{
const AggregateColumnsConstData & aggregate_columns_data = params.makeAggregateColumnsData(block);
const ColumnRawPtrs & key_columns = params.makeRawKeyColumns(block);
mergeStreamsImpl<Method, Table>(
aggregates_pool,
method,
data,
overflow_row,
no_more_keys,
0,
block.rows(),
aggregate_columns_data,
key_columns);
aggregates_pool, method, data, overflow_row, no_more_keys, 0, block.rows(), aggregate_columns_data, key_columns, arena_for_keys);
}
template <typename Method, typename Table>
@ -2730,12 +2727,15 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns) const
const ColumnRawPtrs & key_columns,
Arena * arena_for_keys) const
{
if (!no_more_keys)
mergeStreamsImplCase<false>(aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns);
mergeStreamsImplCase<false>(
aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns, arena_for_keys);
else
mergeStreamsImplCase<true>(aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns);
mergeStreamsImplCase<true>(
aggregates_pool, method, data, overflow_row, row_begin, row_end, aggregate_columns_data, key_columns, arena_for_keys);
}
@ -3015,17 +3015,26 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
result.keys_size = params.keys_size;
result.key_sizes = key_sizes;
size_t source_rows = 0;
/// In some aggregation methods (e.g. serialized) aggregates pools are used also to store serialized aggregation keys.
/// Memory occupied by them will have the same lifetime as aggregate function states, while it is not actually necessary and leads to excessive memory consumption.
/// To avoid this we use a separate arena to allocate memory for aggregation keys. Its memory will be freed at this function return.
auto arena_for_keys = std::make_shared<Arena>();
for (Block & block : blocks)
{
source_rows += block.rows();
if (bucket_num >= 0 && block.info.bucket_num != bucket_num)
bucket_num = -1;
if (result.type == AggregatedDataVariants::Type::without_key || is_overflows)
mergeBlockWithoutKeyStreamsImpl(std::move(block), result);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false);
#define M(NAME, IS_TWO_LEVEL) \
else if (result.type == AggregatedDataVariants::Type::NAME) \
mergeStreamsImpl(std::move(block), result.aggregates_pool, *result.NAME, result.NAME->data, nullptr, false, arena_for_keys.get());
APPLY_FOR_AGGREGATED_VARIANTS(M)
#undef M
@ -3049,9 +3058,15 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
size_t rows = block.rows();
size_t bytes = block.bytes();
double elapsed_seconds = watch.elapsedSeconds();
LOG_DEBUG(log, "Merged partially aggregated blocks. {} rows, {}. in {} sec. ({:.3f} rows/sec., {}/sec.)",
rows, ReadableSize(bytes),
elapsed_seconds, rows / elapsed_seconds,
LOG_DEBUG(
log,
"Merged partially aggregated blocks for bucket #{}. Got {} rows, {} from {} source rows in {} sec. ({:.3f} rows/sec., {}/sec.)",
bucket_num,
rows,
ReadableSize(bytes),
source_rows,
elapsed_seconds,
rows / elapsed_seconds,
ReadableSize(bytes / elapsed_seconds));
block.info.bucket_num = bucket_num;

View File

@ -1348,8 +1348,11 @@ private:
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns) const;
const ColumnRawPtrs & key_columns,
Arena * arena_for_keys) const;
/// `arena_for_keys` used to store serialized aggregation keys (in methods like `serialized`) to save some space.
/// If not provided, aggregates_pool is used instead. Refer to mergeBlocks() for an usage example.
template <typename Method, typename Table>
void mergeStreamsImpl(
Block block,
@ -1357,7 +1360,9 @@ private:
Method & method,
Table & data,
AggregateDataPtr overflow_row,
bool no_more_keys) const;
bool no_more_keys,
Arena * arena_for_keys = nullptr) const;
template <typename Method, typename Table>
void mergeStreamsImpl(
Arena * aggregates_pool,
@ -1368,7 +1373,8 @@ private:
size_t row_begin,
size_t row_end,
const AggregateColumnsConstData & aggregate_columns_data,
const ColumnRawPtrs & key_columns) const;
const ColumnRawPtrs & key_columns,
Arena * arena_for_keys) const;
void mergeBlockWithoutKeyStreamsImpl(
Block block,

View File

@ -367,27 +367,6 @@ public:
// Top-level OpenTelemetry trace context for the query. Makes sense only for a query context.
OpenTelemetryTraceContext query_trace_context;
/// Some counters for current query execution.
/// Most of them are workarounds and should be removed in the future.
struct KitchenSink
{
std::atomic<size_t> analyze_counter = 0;
KitchenSink() = default;
KitchenSink(const KitchenSink & rhs)
: analyze_counter(rhs.analyze_counter.load())
{}
KitchenSink & operator=(const KitchenSink & rhs)
{
analyze_counter = rhs.analyze_counter.load();
return *this;
}
};
KitchenSink kitchen_sink;
private:
using SampleBlockCache = std::unordered_map<std::string, Block>;
mutable SampleBlockCache sample_block_cache;

View File

@ -0,0 +1,44 @@
#pragma once
#include <DataTypes/Serializations/ISerialization.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
namespace DB
{
/// Checks from bottom to top if a function's alias shadows the name
/// of one of it's arguments, e.g.
/// SELECT toString(dummy) as dummy FROM system.one GROUP BY dummy;
class FunctionMaskingArgumentCheckMatcher
{
public:
struct Data
{
const String& alias;
bool is_rejected = false;
void reject() { is_rejected = true; }
};
static void visit(const ASTPtr & ast, Data & data)
{
if (data.is_rejected)
return;
if (const auto & identifier = ast->as<ASTIdentifier>())
visit(*identifier, data);
}
static void visit(const ASTIdentifier & ast, Data & data)
{
if (ast.getAliasOrColumnName() == data.alias)
data.reject();
}
static bool needChildVisit(const ASTPtr &, const ASTPtr &) { return true; }
};
using FunctionMaskingArgumentCheckVisitor = ConstInDepthNodeVisitor<FunctionMaskingArgumentCheckMatcher, false>;
}

View File

@ -98,7 +98,6 @@ namespace ErrorCodes
extern const int SAMPLING_NOT_SUPPORTED;
extern const int ILLEGAL_FINAL;
extern const int ILLEGAL_PREWHERE;
extern const int TOO_DEEP_PIPELINE;
extern const int TOO_MANY_COLUMNS;
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
@ -499,14 +498,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
auto analyze = [&] (bool try_move_to_prewhere)
{
if (context->hasQueryContext())
{
std::atomic<size_t> & current_query_analyze_count = context->getQueryContext()->kitchen_sink.analyze_counter;
++current_query_analyze_count;
if (settings.max_pipeline_depth && current_query_analyze_count >= settings.max_pipeline_depth)
throw DB::Exception(ErrorCodes::TOO_DEEP_PIPELINE, "Query analyze overflow. Try to increase `max_pipeline_depth` or simplify the query");
}
/// Allow push down and other optimizations for VIEW: replace with subquery and rewrite it.
ASTPtr view_table;
if (view)
@ -645,7 +636,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
analyze(shouldMoveToPrewhere());
bool need_analyze_again = false;
if (analysis_result.prewhere_constant_filter_description.always_false || analysis_result.prewhere_constant_filter_description.always_true)
{
if (analysis_result.prewhere_constant_filter_description.always_true)
@ -654,7 +644,6 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query.setExpression(ASTSelectQuery::Expression::PREWHERE, std::make_shared<ASTLiteral>(0u));
need_analyze_again = true;
}
if (analysis_result.where_constant_filter_description.always_false || analysis_result.where_constant_filter_description.always_true)
{
if (analysis_result.where_constant_filter_description.always_true)

View File

@ -229,7 +229,6 @@ bool needRewrite(ASTSelectQuery & select, std::vector<const ASTTableExpression *
return false;
size_t num_array_join = 0;
size_t num_using = 0;
table_expressions.reserve(num_tables);
for (size_t i = 0; i < num_tables; ++i)
@ -256,9 +255,6 @@ bool needRewrite(ASTSelectQuery & select, std::vector<const ASTTableExpression *
const auto & join = table->table_join->as<ASTTableJoin &>();
if (join.kind == JoinKind::Comma)
throw Exception("COMMA to CROSS JOIN rewriter is not enabled or cannot rewrite query", ErrorCodes::NOT_IMPLEMENTED);
if (join.using_expression_list)
++num_using;
}
if (num_tables - num_array_join <= 2)

View File

@ -162,9 +162,8 @@ void getProfileEvents(
dumpMemoryTracker(group_snapshot, columns, server_display_name);
Block curr_block;
size_t rows = 0;
for (; profile_queue->tryPop(curr_block); ++rows)
while (profile_queue->tryPop(curr_block))
{
auto curr_columns = curr_block.getColumns();
for (size_t j = 0; j < curr_columns.size(); ++j)

View File

@ -182,12 +182,10 @@ void SessionLogElement::appendToBlock(MutableColumns & columns) const
auto & names_col = *settings_tuple_col.getColumnPtr(0)->assumeMutable();
auto & values_col = assert_cast<ColumnString &>(*settings_tuple_col.getColumnPtr(1)->assumeMutable());
size_t items_added = 0;
for (const auto & kv : settings)
{
names_col.insert(kv.first);
values_col.insert(kv.second);
++items_added;
}
auto & offsets = settings_array_col.getOffsets();

View File

@ -13,6 +13,7 @@
#include <Interpreters/AggregateFunctionOfGroupByKeysVisitor.h>
#include <Interpreters/RewriteAnyFunctionVisitor.h>
#include <Interpreters/RemoveInjectiveFunctionsVisitor.h>
#include <Interpreters/FunctionMaskingArgumentCheckVisitor.h>
#include <Interpreters/RedundantFunctionsInOrderByVisitor.h>
#include <Interpreters/RewriteCountVariantsVisitor.h>
#include <Interpreters/MonotonicityCheckVisitor.h>
@ -153,6 +154,19 @@ void optimizeGroupBy(ASTSelectQuery * select_query, ContextPtr context)
continue;
}
}
/// don't optimise functions that shadow any of it's arguments, e.g.:
/// SELECT toString(dummy) as dummy FROM system.one GROUP BY dummy;
if (!function->alias.empty())
{
FunctionMaskingArgumentCheckVisitor::Data data{.alias=function->alias};
FunctionMaskingArgumentCheckVisitor(data).visit(function->arguments);
if (data.is_rejected)
{
++i;
continue;
}
}
/// copy shared pointer to args in order to ensure lifetime
auto args_ast = function->arguments;

View File

@ -38,6 +38,12 @@ public:
pChannel->close();
}
void setProperty(const std::string& name, const std::string& value) override
{
if (pChannel)
pChannel->setProperty(name, value);
}
void log(const Poco::Message & msg) override;
void logExtended(const ExtendedLogMessage & msg) override;

View File

@ -169,4 +169,14 @@ void OwnSplitChannel::setLevel(const std::string & name, int level)
}
}
void OwnSplitChannel::setChannelProperty(const std::string& channel_name, const std::string& name, const std::string& value)
{
auto it = channels.find(channel_name);
if (it != channels.end())
{
if (auto * channel = dynamic_cast<DB::OwnFormattingChannel *>(it->second.first.get()))
channel->setProperty(name, value);
}
}
}

View File

@ -24,6 +24,9 @@ class OwnSplitChannel : public Poco::Channel
public:
/// Makes an extended message from msg and passes it to the client logs queue and child (if possible)
void log(const Poco::Message & msg) override;
void setChannelProperty(const std::string& channel_name, const std::string& name, const std::string& value);
/// Adds a child channel
void addChannel(Poco::AutoPtr<Poco::Channel> channel, const std::string & name);

View File

@ -32,7 +32,7 @@ ASTIdentifier::ASTIdentifier(std::vector<String> && name_parts_, bool special, s
semantic->legacy_compound = true;
if (!name_params.empty())
{
size_t params = 0;
[[maybe_unused]] size_t params = 0;
for (const auto & part [[maybe_unused]] : name_parts)
{
if (part.empty())

View File

@ -344,6 +344,25 @@ const ASTTablesInSelectQueryElement * ASTSelectQuery::join() const
return getFirstTableJoin(*this);
}
bool ASTSelectQuery::hasJoin() const
{
if (!tables())
return false;
const auto & tables_in_select_query = tables()->as<ASTTablesInSelectQuery &>();
if (tables_in_select_query.children.empty())
return false;
for (const auto & child : tables_in_select_query.children)
{
const auto & tables_element = child->as<ASTTablesInSelectQueryElement &>();
if (tables_element.table_join)
return true;
}
return false;
}
static String getTableExpressionAlias(const ASTTableExpression * table_expression)
{
if (table_expression->subquery)

View File

@ -131,6 +131,7 @@ public:
std::pair<ASTPtr, bool> arrayJoinExpressionList() const;
const ASTTablesInSelectQueryElement * join() const;
bool hasJoin() const;
bool final() const;
bool withFill() const;
void replaceDatabaseAndTable(const String & database_name, const String & table_name);

View File

@ -1,9 +1,10 @@
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <limits>
#include <Interpreters/Aggregator.h>
#include <Processors/ISimpleTransform.h>
#include <Processors/ResizeProcessor.h>
#include <Processors/Transforms/AggregatingInOrderTransform.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <QueryPipeline/Pipe.h>
#include <Interpreters/Aggregator.h>
namespace DB
{
@ -367,7 +368,7 @@ SortingAggregatedTransform::SortingAggregatedTransform(size_t num_inputs_, Aggre
: IProcessor(InputPorts(num_inputs_, params_->getHeader()), {params_->getHeader()})
, num_inputs(num_inputs_)
, params(std::move(params_))
, last_bucket_number(num_inputs, -1)
, last_bucket_number(num_inputs, std::numeric_limits<Int32>::min())
, is_input_finished(num_inputs, false)
{
}
@ -462,7 +463,13 @@ IProcessor::Status SortingAggregatedTransform::prepare()
continue;
}
//all_finished = false;
/// We want to keep not more than `num_inputs` buckets in memory (and there will be only a single chunk with the given `bucket_id`).
const bool bucket_from_this_input_still_in_memory = chunks.contains(last_bucket_number[input_num]);
if (bucket_from_this_input_still_in_memory)
{
all_finished = false;
continue;
}
in->setNeeded();

View File

@ -5489,7 +5489,7 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
// In order to properly analyze joins, aliases should be recognized. However, aliases get lost during projection analysis.
// Let's disable projection if there are any JOIN clauses.
// TODO: We need a better identifier resolution mechanism for projection analysis.
if (select_query->join())
if (select_query->hasJoin())
return std::nullopt;
// INTERPOLATE expressions may include aliases, so aliases should be preserved

3
tests/ci/.gitignore vendored Normal file
View File

@ -0,0 +1,3 @@
*_lambda/lambda-venv
*_lambda/lambda-package
*_lambda/lambda-package.zip

View File

@ -206,7 +206,8 @@ Merge it only if you intend to backport changes to the target branch, otherwise
)
self.cherrypick_pr.add_to_labels(Labels.LABEL_CHERRYPICK)
self.cherrypick_pr.add_to_labels(Labels.LABEL_DO_NOT_TEST)
self.cherrypick_pr.add_to_assignees(self.pr.assignee)
if self.pr.assignee is not None:
self.cherrypick_pr.add_to_assignees(self.pr.assignee)
self.cherrypick_pr.add_to_assignees(self.pr.user)
def create_backport(self):
@ -238,7 +239,8 @@ Merge it only if you intend to backport changes to the target branch, otherwise
head=self.backport_branch,
)
self.backport_pr.add_to_labels(Labels.LABEL_BACKPORT)
self.backport_pr.add_to_assignees(self.pr.assignee)
if self.pr.assignee is not None:
self.cherrypick_pr.add_to_assignees(self.pr.assignee)
self.backport_pr.add_to_assignees(self.pr.user)
@property

View File

@ -63,7 +63,7 @@ CI_CONFIG = {
"with_coverage": False,
},
"package_tsan": {
"compiler": "clang-13",
"compiler": "clang-15",
"build_type": "",
"sanitizer": "thread",
"package_type": "deb",

View File

@ -5,11 +5,12 @@ from datetime import date, datetime, timedelta
from pathlib import Path
from os import path as p
from time import sleep
from typing import List, Optional
from typing import List, Optional, Tuple
import github
from github.GithubException import RateLimitExceededException
from github.Issue import Issue
from github.NamedUser import NamedUser
from github.PullRequest import PullRequest
from github.Repository import Repository
@ -120,21 +121,15 @@ class GitHub(github.Github):
return
def get_pull_cached(
self, repo: Repository, number: int, updated_at: Optional[datetime] = None
self, repo: Repository, number: int, obj_updated_at: Optional[datetime] = None
) -> PullRequest:
pr_cache_file = self.cache_path / f"{number}.pickle"
if updated_at is None:
updated_at = datetime.now() - timedelta(hours=-1)
cache_file = self.cache_path / f"pr-{number}.pickle"
def _get_pr(path: Path) -> PullRequest:
with open(path, "rb") as prfd:
return self.load(prfd) # type: ignore
if pr_cache_file.is_file():
cached_pr = _get_pr(pr_cache_file)
if updated_at <= cached_pr.updated_at:
if cache_file.is_file():
is_updated, cached_pr = self._is_cache_updated(cache_file, obj_updated_at)
if is_updated:
logger.debug("Getting PR #%s from cache", number)
return cached_pr
return cached_pr # type: ignore
logger.debug("Getting PR #%s from API", number)
for i in range(self.retries):
try:
@ -144,11 +139,56 @@ class GitHub(github.Github):
if i == self.retries - 1:
raise
self.sleep_on_rate_limit()
logger.debug("Caching PR #%s from API in %s", number, pr_cache_file)
with open(pr_cache_file, "wb") as prfd:
logger.debug("Caching PR #%s from API in %s", number, cache_file)
with open(cache_file, "wb") as prfd:
self.dump(pr, prfd) # type: ignore
return pr
def get_user_cached(
self, login: str, obj_updated_at: Optional[datetime] = None
) -> NamedUser:
cache_file = self.cache_path / f"user-{login}.pickle"
if cache_file.is_file():
is_updated, cached_user = self._is_cache_updated(cache_file, obj_updated_at)
if is_updated:
logger.debug("Getting user %s from cache", login)
return cached_user # type: ignore
logger.debug("Getting PR #%s from API", login)
for i in range(self.retries):
try:
user = self.get_user(login)
break
except RateLimitExceededException:
if i == self.retries - 1:
raise
self.sleep_on_rate_limit()
logger.debug("Caching user %s from API in %s", login, cache_file)
with open(cache_file, "wb") as prfd:
self.dump(user, prfd) # type: ignore
return user
def _get_cached(self, path: Path):
with open(path, "rb") as ob_fd:
return self.load(ob_fd) # type: ignore
def _is_cache_updated(
self, cache_file: Path, obj_updated_at: Optional[datetime]
) -> Tuple[bool, object]:
cached_obj = self._get_cached(cache_file)
# We don't want the cache_updated being always old,
# for example in cases when the user is not updated for ages
cache_updated = max(
datetime.fromtimestamp(cache_file.stat().st_mtime), cached_obj.updated_at
)
if obj_updated_at is None:
# When we don't know about the object is updated or not,
# we update it once per hour
obj_updated_at = datetime.now() - timedelta(hours=1)
if obj_updated_at <= cache_updated:
return True, cached_obj
return False, cached_obj
@property
def cache_path(self):
return self._cache_path

View File

@ -335,7 +335,7 @@ class Release:
yield
except (Exception, KeyboardInterrupt):
logging.warning("Rolling back checked out %s for %s", ref, orig_ref)
self.run(f"git reset --hard; git checkout {orig_ref}")
self.run(f"git reset --hard; git checkout -f {orig_ref}")
raise
else:
if with_checkout_back and need_rollback:

View File

@ -1,2 +0,0 @@
lambda-venv
lambda-package.zip

View File

@ -1,15 +1,24 @@
#!/usr/bin/env bash
set -xeo pipefail
WORKDIR=$(dirname "$0")
cd "$WORKDIR"
PY_EXEC=python3.9
LAMBDA_NAME=$(basename "$PWD")
LAMBDA_NAME=${LAMBDA_NAME//_/-}
VENV=lambda-venv
py_exec=$(which python3)
py_version=$(basename "$(readlink -f "$py_exec")")
rm -rf "$VENV" lambda-package.zip
virtualenv "$VENV"
"$PY_EXEC" -m venv "$VENV"
#virtualenv "$VENV"
# shellcheck disable=SC1091
source "$VENV/bin/activate"
pip install -r requirements.txt
PACKAGES="$VENV/lib/$py_version/site-packages"
cp app.py "$PACKAGES/"
( cd "$PACKAGES" && zip -r ../../../../lambda-package.zip . )
PACKAGE=lambda-package
rm -rf "$PACKAGE" "$PACKAGE".zip
cp -r "$VENV/lib/$PY_EXEC/site-packages" "$PACKAGE"
cp app.py "$PACKAGE"
rm -r "$PACKAGE"/{pip,pip-*,setuptools,setuptools-*}
( cd "$PACKAGE" && zip -r ../"$PACKAGE".zip . )
aws lambda update-function-code --function-name team-keys-lambda --zip-file fileb://lambda-package.zip
aws lambda update-function-code --function-name "$LAMBDA_NAME" --zip-file fileb://"$PACKAGE".zip

View File

@ -1,13 +0,0 @@
FROM public.ecr.aws/lambda/python:3.9
# Install the function's dependencies using file requirements.txt
# from your project folder.
COPY requirements.txt .
RUN pip3 install -r requirements.txt --target "${LAMBDA_TASK_ROOT}"
# Copy function code
COPY app.py ${LAMBDA_TASK_ROOT}
# Set the CMD to your handler (could also be done as a parameter override outside of the Dockerfile)
CMD [ "app.handler" ]

View File

@ -402,10 +402,27 @@ def main(event):
workflow_description = get_workflow_description_from_event(event_data)
print("Got workflow description", workflow_description)
if (
workflow_description.action == "completed"
and workflow_description.conclusion == "failure"
):
if workflow_description.action == "completed":
attempt = 0
# Nice and reliable GH API sends from time to time such events, e.g:
# action='completed', conclusion=None, status='in_progress',
# So let's try receiving a real workflow data
while workflow_description.conclusion is None and attempt < MAX_RETRY:
progressive_sleep = 3 * sum(i + 1 for i in range(attempt))
time.sleep(progressive_sleep)
event_data["workflow_run"] = _exec_get_with_retry(
workflow_description.api_url
)
workflow_description = get_workflow_description_from_event(event_data)
attempt += 1
if workflow_description.conclusion != "failure":
print(
"Workflow finished with status "
f"{workflow_description.conclusion}, exiting"
)
return
print(
"Workflow",
workflow_description.url,
@ -438,11 +455,9 @@ def main(event):
)
print("Got pull requests for workflow", len(pull_requests))
if len(pull_requests) > 1:
raise Exception("Received more than one PR for workflow run")
if len(pull_requests) < 1:
raise Exception("Cannot find any pull requests for workflow run")
if len(pull_requests) != 1:
print(f"Can't continue with non-uniq PRs: {pull_requests}")
return
pull_request = pull_requests[0]
print("Pull request for workflow number", pull_request["number"])
@ -467,4 +482,8 @@ def main(event):
def handler(event, _):
main(event)
try:
main(event)
except Exception:
print("Received event: ", event)
raise

View File

@ -0,0 +1 @@
../team_keys_lambda/build_and_deploy_archive.sh

View File

@ -1,2 +1,22 @@
49999995000000 10000000
499999500000 1000000 15
100033 2
100034 2
100035 2
100036 2
100037 2
100038 2
100039 2
10004 2
100040 2
100041 2
100033 2
100034 2
100035 2
100036 2
100037 2
100038 2
100039 2
10004 2
100040 2
100041 2

View File

@ -7,3 +7,18 @@ SET group_by_two_level_threshold_bytes = 50000000;
SELECT sum(k), sum(c) FROM (SELECT number AS k, count() AS c FROM (SELECT * FROM system.numbers LIMIT 10000000) GROUP BY k);
SELECT sum(k), sum(c), max(u) FROM (SELECT number AS k, count() AS c, uniqArray(range(number % 16)) AS u FROM (SELECT * FROM system.numbers LIMIT 1000000) GROUP BY k);
SET group_by_two_level_threshold = 100000;
SET max_bytes_before_external_group_by = '1Mi';
-- method: key_string & key_string_two_level
CREATE TABLE t_00284_str(s String) ENGINE = MergeTree() ORDER BY tuple();
INSERT INTO t_00284_str SELECT toString(number) FROM numbers_mt(1e6);
INSERT INTO t_00284_str SELECT toString(number) FROM numbers_mt(1e6);
SELECT s, count() FROM t_00284_str GROUP BY s ORDER BY s LIMIT 10 OFFSET 42;
-- method: low_cardinality_key_string & low_cardinality_key_string_two_level
CREATE TABLE t_00284_lc_str(s LowCardinality(String)) ENGINE = MergeTree() ORDER BY tuple();
INSERT INTO t_00284_lc_str SELECT toString(number) FROM numbers_mt(1e6);
INSERT INTO t_00284_lc_str SELECT toString(number) FROM numbers_mt(1e6);
SELECT s, count() FROM t_00284_lc_str GROUP BY s ORDER BY s LIMIT 10 OFFSET 42;

View File

@ -28,3 +28,8 @@ WHERE number IN
SELECT number
FROM numbers(5)
) order by label, number;
SELECT NULL FROM
(SELECT [1048575, NULL] AS ax, 2147483648 AS c) t1 ARRAY JOIN ax
INNER JOIN (SELECT NULL AS c) t2 USING (c);

View File

@ -1,5 +1,5 @@
#!/usr/bin/env bash
# Tags: disabled
# Tags: long, no-parallel
# Tag: no-parallel - to heavy
# Tag: long - to heavy
@ -33,11 +33,14 @@ $CLICKHOUSE_CLIENT "${client_opts[@]}" -nm -q "
create materialized view mv_02232 to out_02232 as select * from in_02232;
"
# 600 is the default timeout of clickhouse-test, and 30 is just a safe padding,
# to avoid hung query check triggering
insert_timeout=$((600-30))
# Increase timeouts to avoid timeout during trying to send Log packet to
# the remote side, when the socket is full.
insert_client_opts=(
# Increase timeouts to avoid timeout during trying to send Log packet to
# the remote side, when the socket is full.
--send_timeout 86400
--receive_timeout 86400
--send_timeout "$insert_timeout"
--receive_timeout "$insert_timeout"
)
# 250 seconds is enough to trigger the query hung (even in debug build)
#

View File

@ -1,5 +1,19 @@
arrays
14617701568871014978
12913842429399915005
8351543757058688770
12732328028874882204
12371801021764949421 Array(Tuple(UInt8, Array(Tuple(UInt8, Tuple(UInt8, UInt8, Array(Tuple(UInt8, UInt8)))))))
14617701568871014978
12913842429399915005
8351543757058688770
12732328028874882204
maps
14617701568871014978
12913842429399915005
8351543757058688770
12732328028874882204
14617701568871014978
12913842429399915005
8351543757058688770
12732328028874882204

View File

@ -1,6 +1,25 @@
SELECT 'arrays';
SELECT cityHash64([(1, 'a'), (2, 'b')]);
SELECT cityHash64([(1, 'c'), (2, 'b')]);
SELECT sipHash64([(1, 'a'), (2, 'b')]);
SELECT murmurHash2_64([(1, 'a'), (2, 'b'), (3, 'c')]);
SELECT cityHash64([(1, [(1, (3, 4, [(5, 6), (7, 8)]))]), (2, [])] AS c), toTypeName(c);
SELECT cityHash64(materialize([(1, 'a'), (2, 'b')]));
SELECT cityHash64(materialize([(1, 'c'), (2, 'b')]));
SELECT sipHash64(materialize([(1, 'a'), (2, 'b')]));
SELECT murmurHash2_64(materialize([(1, 'a'), (2, 'b'), (3, 'c')]));
SELECT 'maps';
SELECT cityHash64(map(1, 'a', 2, 'b'));
SELECT cityHash64(map(1, 'c', 2, 'b'));
SELECT sipHash64(map(1, 'a', 2, 'b'));
SELECT murmurHash2_64(map(1, 'a', 2, 'b', 3, 'c'));
SELECT cityHash64(materialize(map(1, 'a', 2, 'b')));
SELECT cityHash64(materialize(map(1, 'c', 2, 'b')));
SELECT sipHash64(materialize(map(1, 'a', 2, 'b')));
SELECT murmurHash2_64(materialize(map(1, 'a', 2, 'b', 3, 'c')));

View File

@ -20,17 +20,17 @@ clickhouse-client --query_kind initial_query -q explain plan header=1 select toS
Expression ((Projection + Before ORDER BY))
Header: dummy String
Aggregating
Header: dummy UInt8
Header: toString(dummy) String
Expression (Before GROUP BY)
Header: dummy UInt8
Header: toString(dummy) String
ReadFromStorage (SystemOne)
Header: dummy UInt8
clickhouse-local --query_kind initial_query -q explain plan header=1 select toString(dummy) as dummy from system.one group by dummy
Expression ((Projection + Before ORDER BY))
Header: dummy String
Aggregating
Header: dummy UInt8
Header: toString(dummy) String
Expression (Before GROUP BY)
Header: dummy UInt8
Header: toString(dummy) String
ReadFromStorage (SystemOne)
Header: dummy UInt8

View File

@ -1,15 +0,0 @@
-- Tags: long
-- https://github.com/ClickHouse/ClickHouse/issues/21557
SET max_pipeline_depth = 1000;
EXPLAIN SYNTAX
WITH
x AS ( SELECT number FROM numbers(10) ),
cross_sales AS (
SELECT 1 AS xx
FROM x, x AS d1, x AS d2, x AS d3, x AS d4, x AS d5, x AS d6, x AS d7, x AS d8, x AS d9
WHERE x.number = d9.number
)
SELECT xx FROM cross_sales WHERE xx = 2000; -- { serverError TOO_DEEP_PIPELINE }

View File

@ -0,0 +1,11 @@
-- { echoOn }
SELECT toString(dummy) as dummy FROM remote('127.{1,1}', 'system.one') GROUP BY dummy;
0
SELECT toString(dummy+1) as dummy FROM remote('127.{1,1}', 'system.one') GROUP BY dummy;
1
SELECT toString((toInt8(dummy)+2) * (toInt8(dummy)+2)) as dummy FROM remote('127.{1,1}', system.one) GROUP BY dummy;
4
SELECT round(number % 3) AS number FROM remote('127.{1,1}', numbers(20)) GROUP BY number ORDER BY number ASC;
0
1
2

View File

@ -0,0 +1,5 @@
-- { echoOn }
SELECT toString(dummy) as dummy FROM remote('127.{1,1}', 'system.one') GROUP BY dummy;
SELECT toString(dummy+1) as dummy FROM remote('127.{1,1}', 'system.one') GROUP BY dummy;
SELECT toString((toInt8(dummy)+2) * (toInt8(dummy)+2)) as dummy FROM remote('127.{1,1}', system.one) GROUP BY dummy;
SELECT round(number % 3) AS number FROM remote('127.{1,1}', numbers(20)) GROUP BY number ORDER BY number ASC;

View File

@ -0,0 +1,23 @@
-- Tags: long, no-tsan, no-msan, no-asan, no-ubsan
create table t_2354_dist_with_external_aggr(a UInt64, b String, c FixedString(100)) engine = MergeTree order by tuple();
insert into t_2354_dist_with_external_aggr select number, toString(number) as s, toFixedString(s, 100) from numbers_mt(5e7);
set max_bytes_before_external_group_by = '2G',
max_threads = 16,
aggregation_memory_efficient_merge_threads = 16,
distributed_aggregation_memory_efficient = 1,
prefer_localhost_replica = 1,
group_by_two_level_threshold = 100000;
select a, b, c, sum(a) as s
from remote('127.0.0.{1,2}', currentDatabase(), t_2354_dist_with_external_aggr)
group by a, b, c
format Null;
system flush logs;
select memory_usage < 4 * 1024 * 1024 * 1024 -- whole aggregation state of local aggregation uncompressed is 5.8G
from system.query_log
where event_time >= now() - interval '15 minute' and type = 'QueryFinish' and is_initial_query and query like '%t_2354_dist_with_external_aggr%group_by%' and current_database = currentDatabase();

View File

@ -0,0 +1,54 @@
#!/usr/bin/env bash
# Tags: no-parallel
set -e
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
echo "<clickhouse>
<logger>
<level>trace</level>
<console>true</console>
</logger>
<tcp_port>9000</tcp_port>
<path>./</path>
<mark_cache_size>0</mark_cache_size>
<user_directories>
<users_xml>
<!-- Path to configuration file with predefined users. -->
<path>users.xml</path>
</users_xml>
</user_directories>
</clickhouse>" > $CUR_DIR/config.xml
echo "<clickhouse>
<profiles>
<default></default>
</profiles>
<users>
<default>
<password></password>
<networks>
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
</users>
<quotas>
<default></default>
</quotas>
</clickhouse>" > $CUR_DIR/users.xml
local_opts=(
"--config-file=$CUR_DIR/config.xml"
"--send_logs_level=none")
${CLICKHOUSE_LOCAL} "${local_opts[@]}" --query 'Select 1' |& grep -v -e 'Processing configuration file'
rm -rf $CUR_DIR/users.xml
rm -rf $CUR_DIR/config.xml

View File

@ -0,0 +1 @@
a

View File

@ -0,0 +1 @@
WITH ( SELECT toLowCardinality('a') ) AS bar SELECT bar

View File

@ -26,6 +26,20 @@ query
| truncateStmt // DDL
| useStmt
| watchStmt
| ctes? selectStmt
;
// CTE statement
ctes
: WITH namedQuery (',' namedQuery)*
;
namedQuery
: name=identifier (columnAliases)? AS '(' query ')'
;
columnAliases
: '(' identifier (',' identifier)* ')'
;
// ALTER statement

View File

@ -6,20 +6,14 @@ import logging
import os.path as p
import os
import re
from datetime import date, datetime, timedelta
from queue import Empty, Queue
from datetime import date, timedelta
from subprocess import CalledProcessError, DEVNULL
from threading import Thread
from time import sleep
from typing import Dict, List, Optional, TextIO
from fuzzywuzzy.fuzz import ratio # type: ignore
from github import Github
from github_helper import GitHub, PullRequest, PullRequests, Repository
from github.GithubException import RateLimitExceededException, UnknownObjectException
from github.NamedUser import NamedUser
from github.Issue import Issue
from github.PullRequest import PullRequest
from github.Repository import Repository
from git_helper import is_shallow, git_runner as runner
# This array gives the preferred category order, and is also used to
@ -39,7 +33,7 @@ categories_preferred_order = (
FROM_REF = ""
TO_REF = ""
SHA_IN_CHANGELOG = [] # type: List[str]
GitHub = Github()
gh = GitHub()
CACHE_PATH = p.join(p.dirname(p.realpath(__file__)), "gh_cache")
@ -49,7 +43,7 @@ class Description:
):
self.number = number
self.html_url = html_url
self.user = user
self.user = gh.get_user_cached(user._rawData["login"]) # type: ignore
self.entry = entry
self.category = category
@ -78,7 +72,7 @@ class Description:
user_name = self.user.login
break
except RateLimitExceededException:
sleep_on_rate_limit()
gh.sleep_on_rate_limit()
return (
f"* {entry} [#{self.number}]({self.html_url}) "
f"([{user_name}]({self.user.html_url}))."
@ -94,85 +88,34 @@ class Description:
return self.number < other.number
class Worker(Thread):
def __init__(self, request_queue: Queue, repo: Repository):
Thread.__init__(self)
self.queue = request_queue
self.repo = repo
self.response = [] # type: List[Description]
def run(self):
while not self.queue.empty():
try:
issue = self.queue.get() # type: Issue
except Empty:
break # possible race condition, just continue
api_pr = get_pull_cached(self.repo, issue.number, issue.updated_at)
in_changelog = False
merge_commit = api_pr.merge_commit_sha
try:
runner.run(f"git rev-parse '{merge_commit}'")
except CalledProcessError:
# It's possible that commit not in the repo, just continue
logging.info("PR %s does not belong to the repo", api_pr.number)
continue
in_changelog = merge_commit in SHA_IN_CHANGELOG
if in_changelog:
desc = generate_description(api_pr, self.repo)
if desc is not None:
self.response.append(desc)
self.queue.task_done()
def sleep_on_rate_limit(time: int = 20):
logging.warning("Faced rate limit, sleeping %s", time)
sleep(time)
def get_pull_cached(
repo: Repository, number: int, updated_at: Optional[datetime] = None
) -> PullRequest:
pr_cache_file = p.join(CACHE_PATH, f"{number}.pickle")
if updated_at is None:
updated_at = datetime.now() - timedelta(hours=-1)
if p.isfile(pr_cache_file):
cache_updated = datetime.fromtimestamp(p.getmtime(pr_cache_file))
if cache_updated > updated_at:
with open(pr_cache_file, "rb") as prfd:
return GitHub.load(prfd) # type: ignore
while True:
try:
pr = repo.get_pull(number)
break
except RateLimitExceededException:
sleep_on_rate_limit()
with open(pr_cache_file, "wb") as prfd:
GitHub.dump(pr, prfd) # type: ignore
return pr
def get_descriptions(
repo: Repository, issues: List[Issue], jobs: int
) -> Dict[str, List[Description]]:
workers = [] # type: List[Worker]
queue = Queue() # type: Queue[Issue]
for issue in issues:
queue.put(issue)
for _ in range(jobs):
worker = Worker(queue, repo)
worker.start()
workers.append(worker)
def get_descriptions(prs: PullRequests) -> Dict[str, List[Description]]:
descriptions = {} # type: Dict[str, List[Description]]
for worker in workers:
worker.join()
for desc in worker.response:
if desc.category not in descriptions:
descriptions[desc.category] = []
descriptions[desc.category].append(desc)
repos = {} # type: Dict[str, Repository]
for pr in prs:
# See https://github.com/PyGithub/PyGithub/issues/2202,
# obj._rawData doesn't spend additional API requests
# We'll save some requests
# pylint: disable=protected-access
repo_name = pr._rawData["base"]["repo"]["full_name"] # type: ignore
# pylint: enable=protected-access
if repo_name not in repos:
repos[repo_name] = pr.base.repo
in_changelog = False
merge_commit = pr.merge_commit_sha
try:
runner.run(f"git rev-parse '{merge_commit}'")
except CalledProcessError:
# It's possible that commit not in the repo, just continue
logging.info("PR %s does not belong to the repo", pr.number)
continue
in_changelog = merge_commit in SHA_IN_CHANGELOG
if in_changelog:
desc = generate_description(pr, repos[repo_name])
if desc is not None:
if desc.category not in descriptions:
descriptions[desc.category] = []
descriptions[desc.category].append(desc)
for descs in descriptions.values():
descs.sort()
@ -193,6 +136,11 @@ def parse_args() -> argparse.Namespace:
default=0,
help="set the script verbosity, could be used multiple",
)
parser.add_argument(
"--debug-helpers",
action="store_true",
help="add debug logging for git_helper and github_helper",
)
parser.add_argument(
"--output",
type=argparse.FileType("w"),
@ -246,7 +194,7 @@ def generate_description(item: PullRequest, repo: Repository) -> Optional[Descri
branch_parts = item.head.ref.split("/")
if len(branch_parts) == 3:
try:
item = get_pull_cached(repo, int(branch_parts[-1]))
item = gh.get_pull_cached(repo, int(branch_parts[-1]))
except Exception as e:
logging.warning("unable to get backpoted PR, exception: %s", e)
else:
@ -337,9 +285,13 @@ def generate_description(item: PullRequest, repo: Repository) -> Optional[Descri
def write_changelog(fd: TextIO, descriptions: Dict[str, List[Description]]):
year = date.today().year
to_commit = runner(f"git rev-parse {TO_REF}^{{}}")[:11]
from_commit = runner(f"git rev-parse {FROM_REF}^{{}}")[:11]
fd.write(
f"---\nsidebar_position: 1\nsidebar_label: {year}\n---\n\n# {year} Changelog\n\n"
f"### ClickHouse release {TO_REF} FIXME as compared to {FROM_REF}\n\n"
f"---\nsidebar_position: 1\nsidebar_label: {year}\n---\n\n"
f"# {year} Changelog\n\n"
f"### ClickHouse release {TO_REF} ({to_commit}) FIXME "
f"as compared to {FROM_REF} ({from_commit})\n\n"
)
seen_categories = [] # type: List[str]
@ -391,12 +343,15 @@ def set_sha_in_changelog():
def main():
log_levels = [logging.CRITICAL, logging.WARN, logging.INFO, logging.DEBUG]
log_levels = [logging.WARN, logging.INFO, logging.DEBUG]
args = parse_args()
logging.basicConfig(
format="%(asctime)s %(levelname)-8s [%(filename)s:%(lineno)d]:\n%(message)s",
level=log_levels[min(args.verbose, 3)],
level=log_levels[min(args.verbose, 2)],
)
if args.debug_helpers:
logging.getLogger("github_helper").setLevel(logging.DEBUG)
logging.getLogger("git_helper").setLevel(logging.DEBUG)
# Create a cache directory
if not p.isdir(CACHE_PATH):
os.mkdir(CACHE_PATH, 0o700)
@ -413,35 +368,29 @@ def main():
logging.info("Using %s..%s as changelog interval", FROM_REF, TO_REF)
# use merge-base commit as a starting point, if used ref in another branch
base_commit = runner.run(f"git merge-base '{FROM_REF}^{{}}' '{TO_REF}^{{}}'")
# Get starting and ending dates for gathering PRs
# Add one day after and before to mitigate TZ possible issues
# `tag^{}` format gives commit ref when we have annotated tags
# format %cs gives a committer date, works better for cherry-picked commits
from_date = runner.run(f"git log -1 --format=format:%cs '{FROM_REF}^{{}}'")
from_date = (date.fromisoformat(from_date) - timedelta(1)).isoformat()
from_date = runner.run(f"git log -1 --format=format:%cs '{base_commit}'")
to_date = runner.run(f"git log -1 --format=format:%cs '{TO_REF}^{{}}'")
to_date = (date.fromisoformat(to_date) + timedelta(1)).isoformat()
merged = (
date.fromisoformat(from_date) - timedelta(1),
date.fromisoformat(to_date) + timedelta(1),
)
# Get all PRs for the given time frame
global GitHub
GitHub = Github(
global gh
gh = GitHub(
args.gh_user_or_token, args.gh_password, per_page=100, pool_size=args.jobs
)
query = f"type:pr repo:{args.repo} is:merged merged:{from_date}..{to_date}"
repo = GitHub.get_repo(args.repo)
api_prs = GitHub.search_issues(query=query, sort="created")
logging.info("Found %s PRs for the query: '%s'", api_prs.totalCount, query)
gh.cache_path = CACHE_PATH
query = f"type:pr repo:{args.repo} is:merged"
prs = gh.get_pulls_from_search(query=query, merged=merged, sort="created")
issues = [] # type: List[Issue]
while True:
try:
for issue in api_prs:
issues.append(issue)
break
except RateLimitExceededException:
sleep_on_rate_limit()
descriptions = get_descriptions(repo, issues, args.jobs)
descriptions = get_descriptions(prs)
write_changelog(args.output, descriptions)

View File

@ -0,0 +1 @@
../../tests/ci/github_helper.py

View File

@ -1,3 +1,4 @@
v22.7.2.15-stable 2022-08-03
v22.7.1.2484-stable 2022-07-21
v22.6.4.35-stable 2022-07-25
v22.6.3.35-stable 2022-07-06
@ -11,6 +12,7 @@ v22.4.5.9-stable 2022-05-06
v22.4.4.7-stable 2022-04-29
v22.4.3.3-stable 2022-04-26
v22.4.2.1-stable 2022-04-22
v22.3.10.22-lts 2022-08-03
v22.3.9.19-lts 2022-07-25
v22.3.8.39-lts 2022-07-07
v22.3.7.28-lts 2022-06-20

1 v22.7.1.2484-stable v22.7.2.15-stable 2022-07-21 2022-08-03
1 v22.7.2.15-stable 2022-08-03
2 v22.7.1.2484-stable v22.7.1.2484-stable 2022-07-21 2022-07-21
3 v22.6.4.35-stable v22.6.4.35-stable 2022-07-25 2022-07-25
4 v22.6.3.35-stable v22.6.3.35-stable 2022-07-06 2022-07-06
12 v22.4.4.7-stable v22.4.4.7-stable 2022-04-29 2022-04-29
13 v22.4.3.3-stable v22.4.3.3-stable 2022-04-26 2022-04-26
14 v22.4.2.1-stable v22.4.2.1-stable 2022-04-22 2022-04-22
15 v22.3.10.22-lts 2022-08-03
16 v22.3.9.19-lts v22.3.9.19-lts 2022-07-25 2022-07-25
17 v22.3.8.39-lts v22.3.8.39-lts 2022-07-07 2022-07-07
18 v22.3.7.28-lts v22.3.7.28-lts 2022-06-20 2022-06-20

View File

@ -1,3 +1,7 @@
padding=" "
sz="$(stat -c %s 'decompressor')"
if [[ $OSTYPE == 'darwin'* ]]; then
sz="$(stat -f %z 'decompressor')"
else
sz="$(stat -c %s 'decompressor')"
fi
printf "%s%s" "${padding:${#sz}}" $sz

View File

@ -282,11 +282,9 @@ void createConcurrent(zkutil::ZooKeeper & testzk, const std::string & zkhost, si
asyncs.push_back(std::async(std::launch::async, callback));
}
size_t i = 0;
for (auto & async : asyncs)
{
async.wait();
i++;
}
}