Merge branch 'master' into hdfs_config_prefix

This commit is contained in:
taiyang-li 2022-03-29 18:32:59 +08:00
commit 77314246f3
123 changed files with 1270 additions and 383 deletions

View File

@ -210,3 +210,6 @@ CheckOptions:
value: false
- key: performance-move-const-arg.CheckTriviallyCopyableMove
value: false
# Workaround clang-tidy bug: https://github.com/llvm/llvm-project/issues/46097
- key: readability-identifier-naming.TypeTemplateParameterIgnoredRegexp
value: expr-type

View File

@ -360,6 +360,52 @@ jobs:
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinGCC:
needs: [DockerHubPush]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
CHECK_NAME=ClickHouse build check (actions)
BUILD_NAME=binary_gcc
EOF
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ env.IMAGES_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
with:
submodules: 'true'
fetch-depth: 0 # otherwise we will have no info about contributors
- name: Build
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$CHECK_NAME" "$BUILD_NAME"
- name: Upload build URLs to artifacts
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v2
with:
name: ${{ env.BUILD_NAME }}
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_NAME }}.json
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderDebAsan:
needs: [DockerHubPush]
runs-on: [self-hosted, builder]
@ -918,6 +964,7 @@ jobs:
- BuilderDebRelease
- BuilderDebAarch64
- BuilderBinRelease
- BuilderBinGCC
- BuilderDebAsan
- BuilderDebTsan
- BuilderDebUBsan
@ -2608,6 +2655,40 @@ jobs:
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
UnitTestsReleaseGCC:
needs: [BuilderBinGCC]
runs-on: [self-hosted, fuzzer-unit-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/unit_tests_asan
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Unit tests (release-gcc, actions)
REPO_COPY=${{runner.temp}}/unit_tests_asan/ClickHouse
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Unit test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 unit_tests_check.py "$CHECK_NAME"
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
UnitTestsTsan:
needs: [BuilderDebTsan]
runs-on: [self-hosted, fuzzer-unit-tester]

View File

@ -370,6 +370,48 @@ jobs:
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinGCC:
needs: [DockerHubPush, FastTest]
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
CHECK_NAME=ClickHouse build check (actions)
BUILD_NAME=binary_gcc
EOF
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ runner.temp }}/images_path
- name: Check out repository code
uses: actions/checkout@v2
with:
submodules: 'true'
fetch-depth: 0 # otherwise we will have no info about contributors
- name: Build
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$CHECK_NAME" "$BUILD_NAME"
- name: Upload build URLs to artifacts
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v2
with:
name: ${{ env.BUILD_NAME }}
path: ${{ runner.temp }}/build_check/${{ env.BUILD_NAME }}.json
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderDebAarch64:
needs: [DockerHubPush, FastTest]
runs-on: [self-hosted, builder]
@ -963,6 +1005,7 @@ jobs:
- BuilderDebRelease
- BuilderDebAarch64
- BuilderBinRelease
- BuilderBinGCC
- BuilderDebAsan
- BuilderDebTsan
- BuilderDebUBsan
@ -2808,6 +2851,40 @@ jobs:
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
UnitTestsReleaseGCC:
needs: [BuilderBinGCC]
runs-on: [self-hosted, fuzzer-unit-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/unit_tests_asan
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Unit tests (release-gcc, actions)
REPO_COPY=${{runner.temp}}/unit_tests_asan/ClickHouse
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Unit test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 unit_tests_check.py "$CHECK_NAME"
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
UnitTestsTsan:
needs: [BuilderDebTsan]
runs-on: [self-hosted, fuzzer-unit-tester]

View File

@ -261,7 +261,7 @@ endif ()
# Add a section with the hash of the compiled machine code for integrity checks.
# Only for official builds, because adding a section can be time consuming (rewrite of several GB).
# And cross compiled binaries are not supported (since you cannot execute clickhouse hash-binary)
if (OBJCOPY_PATH AND YANDEX_OFFICIAL_BUILD AND (NOT CMAKE_TOOLCHAIN_FILE))
if (OBJCOPY_PATH AND CLICKHOUSE_OFFICIAL_BUILD AND (NOT CMAKE_TOOLCHAIN_FILE))
set (USE_BINARY_HASH 1)
endif ()

View File

@ -51,6 +51,6 @@ if (GLIBC_COMPATIBILITY)
message (STATUS "Some symbols from glibc will be replaced for compatibility")
elseif (YANDEX_OFFICIAL_BUILD)
elseif (CLICKHOUSE_OFFICIAL_BUILD)
message (WARNING "Option GLIBC_COMPATIBILITY must be turned on for production builds.")
endif ()

View File

@ -18,6 +18,6 @@ set (VERSION_STRING_SHORT "${VERSION_MAJOR}.${VERSION_MINOR}")
math (EXPR VERSION_INTEGER "${VERSION_PATCH} + ${VERSION_MINOR}*1000 + ${VERSION_MAJOR}*1000000")
if(YANDEX_OFFICIAL_BUILD)
if(CLICKHOUSE_OFFICIAL_BUILD)
set(VERSION_OFFICIAL " (official build)")
endif()

View File

@ -69,9 +69,10 @@ endif ()
target_compile_options(_avrocpp PRIVATE ${SUPPRESS_WARNINGS})
# create a symlink to include headers with <avro/...>
set(AVRO_INCLUDE_DIR "${CMAKE_CURRENT_BINARY_DIR}/include")
ADD_CUSTOM_TARGET(avro_symlink_headers ALL
COMMAND ${CMAKE_COMMAND} -E make_directory "${AVROCPP_ROOT_DIR}/include"
COMMAND ${CMAKE_COMMAND} -E create_symlink "${AVROCPP_ROOT_DIR}/api" "${AVROCPP_ROOT_DIR}/include/avro"
COMMAND ${CMAKE_COMMAND} -E make_directory "${AVRO_INCLUDE_DIR}"
COMMAND ${CMAKE_COMMAND} -E create_symlink "${AVROCPP_ROOT_DIR}/api" "${AVRO_INCLUDE_DIR}/avro"
)
add_dependencies(_avrocpp avro_symlink_headers)
target_include_directories(_avrocpp SYSTEM BEFORE PUBLIC "${AVROCPP_ROOT_DIR}/include")
target_include_directories(_avrocpp SYSTEM BEFORE PUBLIC "${AVRO_INCLUDE_DIR}")

View File

@ -27,7 +27,11 @@ target_include_directories (_boost_headers_only SYSTEM BEFORE INTERFACE ${LIBRAR
# asio
target_compile_definitions (_boost_headers_only INTERFACE BOOST_ASIO_STANDALONE=1)
target_compile_definitions (_boost_headers_only INTERFACE
BOOST_ASIO_STANDALONE=1
# Avoid using of deprecated in c++ > 17 std::result_of
BOOST_ASIO_HAS_STD_INVOKE_RESULT=1
)
# iostreams

2
contrib/hyperscan vendored

@ -1 +1 @@
Subproject commit e9f08df0213fc637aac0a5bbde9beeaeba2fe9fa
Subproject commit 5edc68c5ac68d2d4f876159e9ee84def6d3dc87c

2
contrib/libcxx vendored

@ -1 +1 @@
Subproject commit 61e60294b1de01483caa9f5d00f437c99b674de6
Subproject commit 172b2ae074f6755145b91c53a95c8540c1468239

View File

@ -18,12 +18,14 @@ set(SRCS
"${LIBCXX_SOURCE_DIR}/src/filesystem/directory_iterator.cpp"
"${LIBCXX_SOURCE_DIR}/src/filesystem/int128_builtins.cpp"
"${LIBCXX_SOURCE_DIR}/src/filesystem/operations.cpp"
"${LIBCXX_SOURCE_DIR}/src/format.cpp"
"${LIBCXX_SOURCE_DIR}/src/functional.cpp"
"${LIBCXX_SOURCE_DIR}/src/future.cpp"
"${LIBCXX_SOURCE_DIR}/src/hash.cpp"
"${LIBCXX_SOURCE_DIR}/src/ios.cpp"
"${LIBCXX_SOURCE_DIR}/src/ios.instantiations.cpp"
"${LIBCXX_SOURCE_DIR}/src/iostream.cpp"
"${LIBCXX_SOURCE_DIR}/src/legacy_pointer_safety.cpp"
"${LIBCXX_SOURCE_DIR}/src/locale.cpp"
"${LIBCXX_SOURCE_DIR}/src/memory.cpp"
"${LIBCXX_SOURCE_DIR}/src/mutex.cpp"
@ -33,6 +35,9 @@ set(SRCS
"${LIBCXX_SOURCE_DIR}/src/random.cpp"
"${LIBCXX_SOURCE_DIR}/src/random_shuffle.cpp"
"${LIBCXX_SOURCE_DIR}/src/regex.cpp"
"${LIBCXX_SOURCE_DIR}/src/ryu/d2fixed.cpp"
"${LIBCXX_SOURCE_DIR}/src/ryu/d2s.cpp"
"${LIBCXX_SOURCE_DIR}/src/ryu/f2s.cpp"
"${LIBCXX_SOURCE_DIR}/src/shared_mutex.cpp"
"${LIBCXX_SOURCE_DIR}/src/stdexcept.cpp"
"${LIBCXX_SOURCE_DIR}/src/string.cpp"
@ -49,7 +54,9 @@ set(SRCS
add_library(cxx ${SRCS})
set_target_properties(cxx PROPERTIES FOLDER "contrib/libcxx-cmake")
target_include_directories(cxx SYSTEM BEFORE PUBLIC $<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/include>)
target_include_directories(cxx SYSTEM BEFORE PUBLIC
$<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/include>
$<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}>/src)
target_compile_definitions(cxx PRIVATE -D_LIBCPP_BUILDING_LIBRARY -DLIBCXX_BUILDING_LIBCXXABI)
# Enable capturing stack traces for all exceptions.

2
contrib/libcxxabi vendored

@ -1 +1 @@
Subproject commit df8f1e727dbc9e2bedf2282096fa189dc3fe0076
Subproject commit 6eb7cc7a7bdd779e6734d1b9fb451df2274462d7

View File

@ -1,24 +1,24 @@
set(LIBCXXABI_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libcxxabi")
set(SRCS
"${LIBCXXABI_SOURCE_DIR}/src/stdlib_stdexcept.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_virtual.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_thread_atexit.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/fallback_malloc.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_guard.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_default_handlers.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_personality.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/stdlib_exception.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/abort_message.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_aux_runtime.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_default_handlers.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_demangle.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_exception.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_handlers.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_exception_storage.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/private_typeinfo.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/stdlib_typeinfo.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_aux_runtime.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_guard.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_handlers.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_personality.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_thread_atexit.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_vector.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_virtual.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/fallback_malloc.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/private_typeinfo.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/stdlib_exception.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/stdlib_new_delete.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/stdlib_stdexcept.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/stdlib_typeinfo.cpp"
)
add_library(cxxabi ${SRCS})
@ -30,6 +30,7 @@ target_compile_options(cxxabi PRIVATE -w)
target_include_directories(cxxabi SYSTEM BEFORE
PUBLIC $<BUILD_INTERFACE:${LIBCXXABI_SOURCE_DIR}/include>
PRIVATE $<BUILD_INTERFACE:${LIBCXXABI_SOURCE_DIR}/../libcxx/include>
PRIVATE $<BUILD_INTERFACE:${LIBCXXABI_SOURCE_DIR}/../libcxx/src>
)
target_compile_definitions(cxxabi PRIVATE -D_LIBCPP_BUILDING_LIBRARY)
target_compile_options(cxxabi PRIVATE -nostdinc++ -fno-sanitize=undefined -Wno-macro-redefined) # If we don't disable UBSan, infinite recursion happens in dynamic_cast.

2
contrib/replxx vendored

@ -1 +1 @@
Subproject commit 9460e5e0fc10f78f460af26a6bd928798cac864d
Subproject commit 6f0b6f151ae2a044625ae93acd19ca365fcea64d

View File

@ -163,6 +163,7 @@ def parse_env_variables(
cmake_flags.append("-DCMAKE_INSTALL_PREFIX=/usr")
cmake_flags.append("-DCMAKE_INSTALL_SYSCONFDIR=/etc")
cmake_flags.append("-DCMAKE_INSTALL_LOCALSTATEDIR=/var")
cmake_flags.append("-DBUILD_STANDALONE_KEEPER=ON")
if is_release_build(build_type, package_type, sanitizer, split_binary):
cmake_flags.append("-DINSTALL_STRIPPED_BINARIES=ON")
@ -244,7 +245,7 @@ def parse_env_variables(
result.append(f"AUTHOR='{author}'")
if official:
cmake_flags.append("-DYANDEX_OFFICIAL_BUILD=1")
cmake_flags.append("-DCLICKHOUSE_OFFICIAL_BUILD=1")
result.append('CMAKE_FLAGS="' + " ".join(cmake_flags) + '"')

View File

@ -267,6 +267,7 @@ function run_tests
local test_opts=(
--hung-check
--fast-tests-only
--no-random-settings
--no-long
--testname
--shard

View File

@ -13,7 +13,7 @@ script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
echo "$script_dir"
repo_dir=ch
BINARY_TO_DOWNLOAD=${BINARY_TO_DOWNLOAD:="clang-13_debug_none_bundled_unsplitted_disable_False_binary"}
BINARY_URL_TO_DOWNLOAD=${BINARY_URL_TO_DOWNLOAD:="https://clickhouse-builds.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/clickhouse_build_check/$BINARY_TO_DOWNLOAD/clickhouse"}
BINARY_URL_TO_DOWNLOAD=${BINARY_URL_TO_DOWNLOAD:="https://clickhouse-builds.s3.amazonaws.com/$PR_TO_TEST/$SHA_TO_TEST/clickhouse_build_check/$BINARY_TO_DOWNLOAD/clickhouse"}
function clone
{

View File

@ -2,7 +2,7 @@
set -euo pipefail
CLICKHOUSE_PACKAGE=${CLICKHOUSE_PACKAGE:="https://clickhouse-builds.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/clickhouse_build_check/clang-13_relwithdebuginfo_none_bundled_unsplitted_disable_False_binary/clickhouse"}
CLICKHOUSE_PACKAGE=${CLICKHOUSE_PACKAGE:="https://clickhouse-builds.s3.amazonaws.com/$PR_TO_TEST/$SHA_TO_TEST/clickhouse_build_check/clang-13_relwithdebuginfo_none_bundled_unsplitted_disable_False_binary/clickhouse"}
CLICKHOUSE_REPO_PATH=${CLICKHOUSE_REPO_PATH:=""}
@ -10,7 +10,7 @@ if [ -z "$CLICKHOUSE_REPO_PATH" ]; then
CLICKHOUSE_REPO_PATH=ch
rm -rf ch ||:
mkdir ch ||:
wget -nv -nd -c "https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/repo/clickhouse_no_subs.tar.gz"
wget -nv -nd -c "https://clickhouse-test-reports.s3.amazonaws.com/$PR_TO_TEST/$SHA_TO_TEST/repo/clickhouse_no_subs.tar.gz"
tar -C ch --strip-components=1 -xf clickhouse_no_subs.tar.gz
ls -lath ||:
fi

View File

@ -1294,15 +1294,15 @@ create table ci_checks engine File(TSVWithNamesAndTypes, 'ci-checks.tsv')
select '' test_name,
'$(sed -n 's/.*<!--message: \(.*\)-->/\1/p' report.html)' test_status,
0 test_duration_ms,
'https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#fail1' report_url
'https://clickhouse-test-reports.s3.amazonaws.com/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#fail1' report_url
union all
select test || ' #' || toString(query_index), 'slower' test_status, 0 test_duration_ms,
'https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#changes-in-performance.'
'https://clickhouse-test-reports.s3.amazonaws.com/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#changes-in-performance.'
|| test || '.' || toString(query_index) report_url
from queries where changed_fail != 0 and diff > 0
union all
select test || ' #' || toString(query_index), 'unstable' test_status, 0 test_duration_ms,
'https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#unstable-queries.'
'https://clickhouse-test-reports.s3.amazonaws.com/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#unstable-queries.'
|| test || '.' || toString(query_index) report_url
from queries where unstable_fail != 0
)

View File

@ -16,26 +16,17 @@ right_sha=$4
datasets=${CHPC_DATASETS-"hits1 hits10 hits100 values"}
declare -A dataset_paths
if [[ $S3_URL == *"s3.amazonaws.com"* ]]; then
dataset_paths["hits10"]="https://clickhouse-private-datasets.s3.amazonaws.com/hits_10m_single/partitions/hits_10m_single.tar"
dataset_paths["hits100"]="https://clickhouse-private-datasets.s3.amazonaws.com/hits_100m_single/partitions/hits_100m_single.tar"
dataset_paths["hits1"]="https://clickhouse-datasets.s3.amazonaws.com/hits/partitions/hits_v1.tar"
dataset_paths["values"]="https://clickhouse-datasets.s3.amazonaws.com/values_with_expressions/partitions/test_values.tar"
else
dataset_paths["hits10"]="https://s3.mds.yandex.net/clickhouse-private-datasets/hits_10m_single/partitions/hits_10m_single.tar"
dataset_paths["hits100"]="https://s3.mds.yandex.net/clickhouse-private-datasets/hits_100m_single/partitions/hits_100m_single.tar"
dataset_paths["hits1"]="https://clickhouse-datasets.s3.yandex.net/hits/partitions/hits_v1.tar"
dataset_paths["values"]="https://clickhouse-datasets.s3.yandex.net/values_with_expressions/partitions/test_values.tar"
fi
dataset_paths["hits10"]="https://clickhouse-private-datasets.s3.amazonaws.com/hits_10m_single/partitions/hits_10m_single.tar"
dataset_paths["hits100"]="https://clickhouse-private-datasets.s3.amazonaws.com/hits_100m_single/partitions/hits_100m_single.tar"
dataset_paths["hits1"]="https://clickhouse-datasets.s3.amazonaws.com/hits/partitions/hits_v1.tar"
dataset_paths["values"]="https://clickhouse-datasets.s3.amazonaws.com/values_with_expressions/partitions/test_values.tar"
function download
{
# Historically there were various paths for the performance test package.
# Test all of them.
declare -a urls_to_try=("https://s3.amazonaws.com/clickhouse-builds/$left_pr/$left_sha/performance/performance.tgz"
"https://clickhouse-builds.s3.yandex.net/$left_pr/$left_sha/clickhouse_build_check/performance/performance.tgz"
)
declare -a urls_to_try=("https://s3.amazonaws.com/clickhouse-builds/$left_pr/$left_sha/performance/performance.tgz")
for path in "${urls_to_try[@]}"
do

View File

@ -4,7 +4,7 @@ set -ex
CHPC_CHECK_START_TIMESTAMP="$(date +%s)"
export CHPC_CHECK_START_TIMESTAMP
S3_URL=${S3_URL:="https://clickhouse-builds.s3.yandex.net"}
S3_URL=${S3_URL:="https://clickhouse-builds.s3.amazonaws.com"}
COMMON_BUILD_PREFIX="/clickhouse_build_check"
if [[ $S3_URL == *"s3.amazonaws.com"* ]]; then
@ -64,9 +64,7 @@ function find_reference_sha
# Historically there were various path for the performance test package,
# test all of them.
unset found
declare -a urls_to_try=("https://s3.amazonaws.com/clickhouse-builds/0/$REF_SHA/performance/performance.tgz"
"https://clickhouse-builds.s3.yandex.net/0/$REF_SHA/clickhouse_build_check/performance/performance.tgz"
)
declare -a urls_to_try=("https://s3.amazonaws.com/clickhouse-builds/0/$REF_SHA/performance/performance.tgz")
for path in "${urls_to_try[@]}"
do
if curl_with_retry "$path"

View File

@ -11,7 +11,7 @@ RUN apt-get update -y \
COPY s3downloader /s3downloader
ENV S3_URL="https://clickhouse-datasets.s3.yandex.net"
ENV S3_URL="https://clickhouse-datasets.s3.amazonaws.com"
ENV DATASETS="hits visits"
ENV EXPORT_S3_STORAGE_POLICIES=1

View File

@ -10,7 +10,7 @@ import requests
import tempfile
DEFAULT_URL = 'https://clickhouse-datasets.s3.yandex.net'
DEFAULT_URL = 'https://clickhouse-datasets.s3.amazonaws.com'
AVAILABLE_DATASETS = {
'hits': 'hits_v1.tar',

View File

@ -41,6 +41,7 @@ sleep 5
./mc admin user add clickminio test testtest
./mc admin policy set clickminio readwrite user=test
./mc mb clickminio/test
./mc policy set public clickminio/test
# Upload data to Minio. By default after unpacking all tests will in

View File

@ -29,7 +29,7 @@ COPY ./download_previous_release /download_previous_release
COPY run.sh /
ENV DATASETS="hits visits"
ENV S3_URL="https://clickhouse-datasets.s3.yandex.net"
ENV S3_URL="https://clickhouse-datasets.s3.amazonaws.com"
ENV EXPORT_S3_STORAGE_POLICIES=1
CMD ["/bin/bash", "/run.sh"]

View File

@ -1616,3 +1616,14 @@ Possible values:
Default value: `10000`.
## global_memory_usage_overcommit_max_wait_microseconds {#global_memory_usage_overcommit_max_wait_microseconds}
Sets maximum waiting time for global overcommit tracker.
Possible values:
- Positive integer.
Default value: `0`.

View File

@ -0,0 +1,31 @@
# Memory overcommit
Memory overcommit is an experimental technique intended to allow to set more flexible memory limits for queries.
The idea of this technique is to introduce settings which can represent guaranteed amount of memory a query can use.
When memory overcommit is enabled and the memory limit is reached ClickHouse will select the most overcommitted query and try to free memory by killing this query.
When memory limit is reached any query will wait some time during atempt to allocate new memory.
If timeout is passed and memory is freed, the query continues execution. Otherwise an exception will be thrown and the query is killed.
Selection of query to stop or kill is performed by either global or user overcommit trackers depending on what memory limit is reached.
## User overcommit tracker
User overcommit tracker finds a query with the biggest overcommit ratio in the user's query list.
Overcommit ratio is computed as number of allocated bytes divided by value of `max_guaranteed_memory_usage` setting.
Waiting timeout is set by `memory_usage_overcommit_max_wait_microseconds` setting.
**Example**
```sql
SELECT number FROM numbers(1000) GROUP BY number SETTINGS max_guaranteed_memory_usage=4000, memory_usage_overcommit_max_wait_microseconds=500
```
## Global overcommit tracker
Global overcommit tracker finds a query with the biggest overcommit ratio in the list of all queries.
In this case overcommit ratio is computed as number of allocated bytes divided by value of `max_guaranteed_memory_usage_for_user` setting.
Waiting timeout is set by `global_memory_usage_overcommit_max_wait_microseconds` parameter in the configuration file.

View File

@ -4220,10 +4220,36 @@ Possible values:
- 0 — Disabled.
- 1 — Enabled. The wait time equal shutdown_wait_unfinished config.
Default value: 0.
Default value: `0`.
## shutdown_wait_unfinished
The waiting time in seconds for currently handled connections when shutdown server.
Default Value: 5.
Default Value: `5`.
## max_guaranteed_memory_usage
Maximum guaranteed memory usage for processing of single query.
It represents soft limit in case when hard limit is reached on user level.
Zero means unlimited.
Read more about [memory overcommit](memory-overcommit.md).
Default value: `0`.
## memory_usage_overcommit_max_wait_microseconds
Maximum time thread will wait for memory to be freed in the case of memory overcommit on a user level.
If the timeout is reached and memory is not freed, an exception is thrown.
Read more about [memory overcommit](memory-overcommit.md).
Default value: `0`.
## max_guaranteed_memory_usage_for_user
Maximum guaranteed memory usage for processing all concurrently running queries for the user.
It represents soft limit in case when hard limit is reached on global level.
Zero means unlimited.
Read more about [memory overcommit](memory-overcommit.md).
Default value: `0`.

View File

@ -0,0 +1,28 @@
# package sources should be placed in ${PWD}/root
# nfpm should run from the same directory with a config
name: "clickhouse-keeper-dbg"
arch: "${DEB_ARCH}" # amd64, arm64
platform: "linux"
version: "${CLICKHOUSE_VERSION_STRING}"
vendor: "ClickHouse Inc."
homepage: "https://clickhouse.com"
license: "Apache"
section: "database"
priority: "optional"
maintainer: "ClickHouse Dev Team <packages+linux@clickhouse.com>"
description: |
debugging symbols for clickhouse-keeper
This package contains the debugging symbols for clickhouse-keeper.
contents:
- src: root/usr/lib/debug/usr/bin/clickhouse-keeper.debug
dst: /usr/lib/debug/usr/bin/clickhouse-keeper.debug
# docs
- src: ../AUTHORS
dst: /usr/share/doc/clickhouse-keeper-dbg/AUTHORS
- src: ../CHANGELOG.md
dst: /usr/share/doc/clickhouse-keeper-dbg/CHANGELOG.md
- src: ../LICENSE
dst: /usr/share/doc/clickhouse-keeper-dbg/LICENSE
- src: ../README.md
dst: /usr/share/doc/clickhouse-keeper-dbg/README.md

View File

@ -0,0 +1,40 @@
# package sources should be placed in ${PWD}/root
# nfpm should run from the same directory with a config
name: "clickhouse-keeper"
arch: "${DEB_ARCH}" # amd64, arm64
platform: "linux"
version: "${CLICKHOUSE_VERSION_STRING}"
vendor: "ClickHouse Inc."
homepage: "https://clickhouse.com"
license: "Apache"
section: "database"
priority: "optional"
conflicts:
- clickhouse-server
depends:
- adduser
suggests:
- clickhouse-keeper-dbg
maintainer: "ClickHouse Dev Team <packages+linux@clickhouse.com>"
description: |
Static clickhouse-keeper binary
A stand-alone clickhouse-keeper package
contents:
- src: root/etc/clickhouse-keeper
dst: /etc/clickhouse-keeper
type: config
- src: root/usr/bin/clickhouse-keeper
dst: /usr/bin/clickhouse-keeper
# docs
- src: ../AUTHORS
dst: /usr/share/doc/clickhouse-keeper/AUTHORS
- src: ../CHANGELOG.md
dst: /usr/share/doc/clickhouse-keeper/CHANGELOG.md
- src: ../LICENSE
dst: /usr/share/doc/clickhouse-keeper/LICENSE
- src: ../README.md
dst: /usr/share/doc/clickhouse-keeper/README.md

View File

@ -71,17 +71,11 @@ if (BUILD_STANDALONE_KEEPER)
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressedReadBuffer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressedReadBufferFromFile.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressedWriteBuffer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecDelta.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecDoubleDelta.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecEncrypted.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecGorilla.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecLZ4.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecMultiple.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecNone.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecT64.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecZSTD.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionFactory.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/getCompressionCodecForFile.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/ICompressionCodec.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/LZ4_decompress_faster.cpp

View File

@ -13,7 +13,7 @@ enum class QuotaType
{
QUERIES, /// Number of queries.
QUERY_SELECTS, /// Number of select queries.
QUERY_INSERTS, /// Number of inserts queries.
QUERY_INSERTS, /// Number of insert queries.
ERRORS, /// Number of queries with exceptions.
RESULT_ROWS, /// Number of rows returned as result.
RESULT_BYTES, /// Number of bytes returned as result.

View File

@ -67,7 +67,7 @@ auto parseArguments(const std::string & name, const DataTypes & arguments)
values_types.push_back(array_type->getNestedType());
}
return std::tuple{std::move(keys_type), std::move(values_types), tuple_argument};
return std::tuple<DataTypePtr, DataTypes, bool>{std::move(keys_type), std::move(values_types), tuple_argument};
}
// This function instantiates a particular overload of the sumMap family of

View File

@ -573,10 +573,6 @@ if (ENABLE_TESTS)
target_link_libraries(unit_tests_dbms PRIVATE ch_contrib::simdjson)
endif()
if(TARGET ch_contrib::rapidjson)
target_include_directories(unit_tests_dbms PRIVATE ch_contrib::rapidjson)
endif()
if (TARGET ch_contrib::yaml_cpp)
target_link_libraries(unit_tests_dbms PRIVATE ch_contrib::yaml_cpp)
endif()

View File

@ -521,7 +521,7 @@ ColumnObject::ColumnObject(bool is_nullable_)
{
}
ColumnObject::ColumnObject(SubcolumnsTree && subcolumns_, bool is_nullable_)
ColumnObject::ColumnObject(Subcolumns && subcolumns_, bool is_nullable_)
: is_nullable(is_nullable_)
, subcolumns(std::move(subcolumns_))
, num_rows(subcolumns.empty() ? 0 : (*subcolumns.begin())->data.size())
@ -696,7 +696,7 @@ const ColumnObject::Subcolumn & ColumnObject::getSubcolumn(const PathInData & ke
ColumnObject::Subcolumn & ColumnObject::getSubcolumn(const PathInData & key)
{
if (const auto * node = subcolumns.findLeaf(key))
return const_cast<SubcolumnsTree::Node *>(node)->data;
return const_cast<Subcolumns::Node *>(node)->data;
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in ColumnObject", key.getPath());
}
@ -794,7 +794,7 @@ bool ColumnObject::isFinalized() const
void ColumnObject::finalize()
{
size_t old_size = size();
SubcolumnsTree new_subcolumns;
Subcolumns new_subcolumns;
for (auto && entry : subcolumns)
{
const auto & least_common_type = entry->data.getLeastCommonType();

View File

@ -138,20 +138,20 @@ public:
size_t num_of_defaults_in_prefix = 0;
};
using SubcolumnsTree = SubcolumnsTree<Subcolumn>;
using Subcolumns = SubcolumnsTree<Subcolumn>;
private:
/// If true then all subcolumns are nullable.
const bool is_nullable;
SubcolumnsTree subcolumns;
Subcolumns subcolumns;
size_t num_rows;
public:
static constexpr auto COLUMN_NAME_DUMMY = "_dummy";
explicit ColumnObject(bool is_nullable_);
ColumnObject(SubcolumnsTree && subcolumns_, bool is_nullable_);
ColumnObject(Subcolumns && subcolumns_, bool is_nullable_);
/// Checks that all subcolumns have consistent sizes.
void checkConsistency() const;
@ -173,8 +173,8 @@ public:
/// It cares about consistency of sizes of Nested arrays.
void addNestedSubcolumn(const PathInData & key, const FieldInfo & field_info, size_t new_size);
const SubcolumnsTree & getSubcolumns() const { return subcolumns; }
SubcolumnsTree & getSubcolumns() { return subcolumns; }
const Subcolumns & getSubcolumns() const { return subcolumns; }
Subcolumns & getSubcolumns() { return subcolumns; }
PathsInData getKeys() const;
/// Finalizes all subcolumns.

View File

@ -437,6 +437,7 @@ String FileSegment::stateToString(FileSegment::State state)
case FileSegment::State::SKIP_CACHE:
return "SKIP_CACHE";
}
__builtin_unreachable();
}
String FileSegmentsHolder::toString()

View File

@ -23,6 +23,12 @@ void OvercommitTracker::setMaxWaitTime(UInt64 wait_time)
bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker)
{
// NOTE: Do not change the order of locks
//
// global_mutex must be acquired before overcommit_m, because
// method OvercommitTracker::unsubscribe(MemoryTracker *) is
// always called with already acquired global_mutex in
// ProcessListEntry::~ProcessListEntry().
std::unique_lock<std::mutex> global_lock(global_mutex);
std::unique_lock<std::mutex> lk(overcommit_m);
@ -76,7 +82,7 @@ void UserOvercommitTracker::pickQueryToExcludeImpl()
MemoryTracker * query_tracker = nullptr;
OvercommitRatio current_ratio{0, 0};
// At this moment query list must be read only.
// BlockQueryIfMemoryLimit is used in ProcessList to guarantee this.
// This is guaranteed by locking global_mutex in OvercommitTracker::needToStopQuery.
auto & queries = user_process_list->queries;
LOG_DEBUG(logger, "Trying to choose query to stop from {} queries", queries.size());
for (auto const & query : queries)
@ -111,9 +117,9 @@ void GlobalOvercommitTracker::pickQueryToExcludeImpl()
MemoryTracker * query_tracker = nullptr;
OvercommitRatio current_ratio{0, 0};
// At this moment query list must be read only.
// BlockQueryIfMemoryLimit is used in ProcessList to guarantee this.
LOG_DEBUG(logger, "Trying to choose query to stop");
process_list->processEachQueryStatus([&](DB::QueryStatus const & query)
// This is guaranteed by locking global_mutex in OvercommitTracker::needToStopQuery.
LOG_DEBUG(logger, "Trying to choose query to stop from {} queries", process_list->size());
for (auto const & query : process_list->processes)
{
if (query.isKilled())
return;
@ -134,7 +140,7 @@ void GlobalOvercommitTracker::pickQueryToExcludeImpl()
query_tracker = memory_tracker;
current_ratio = ratio;
}
});
}
LOG_DEBUG(logger, "Selected to stop query with overcommit ratio {}/{}",
current_ratio.committed, current_ratio.soft_limit);
picked_tracker = query_tracker;

View File

@ -43,8 +43,6 @@ class MemoryTracker;
// is killed to free memory.
struct OvercommitTracker : boost::noncopyable
{
explicit OvercommitTracker(std::mutex & global_mutex_);
void setMaxWaitTime(UInt64 wait_time);
bool needToStopQuery(MemoryTracker * tracker);
@ -54,8 +52,12 @@ struct OvercommitTracker : boost::noncopyable
virtual ~OvercommitTracker() = default;
protected:
explicit OvercommitTracker(std::mutex & global_mutex_);
virtual void pickQueryToExcludeImpl() = 0;
// This mutex is used to disallow concurrent access
// to picked_tracker and cancelation_state variables.
mutable std::mutex overcommit_m;
mutable std::condition_variable cv;
@ -87,6 +89,11 @@ private:
}
}
// Global mutex which is used in ProcessList to synchronize
// insertion and deletion of queries.
// OvercommitTracker::pickQueryToExcludeImpl() implementations
// require this mutex to be locked, because they read list (or sublist)
// of queries.
std::mutex & global_mutex;
};

View File

@ -9,6 +9,7 @@
M(SelectQuery, "Same as Query, but only for SELECT queries.") \
M(InsertQuery, "Same as Query, but only for INSERT queries.") \
M(AsyncInsertQuery, "Same as InsertQuery, but only for asynchronous INSERT queries.") \
M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.") \
M(FailedQuery, "Number of failed queries.") \
M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.") \
M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.") \

View File

@ -165,25 +165,36 @@ void registerCodecNone(CompressionCodecFactory & factory);
void registerCodecLZ4(CompressionCodecFactory & factory);
void registerCodecLZ4HC(CompressionCodecFactory & factory);
void registerCodecZSTD(CompressionCodecFactory & factory);
void registerCodecMultiple(CompressionCodecFactory & factory);
/// Keeper use only general-purpose codecs, so we don't need these special codecs
/// in standalone build
#ifndef KEEPER_STANDALONE_BUILD
void registerCodecDelta(CompressionCodecFactory & factory);
void registerCodecT64(CompressionCodecFactory & factory);
void registerCodecDoubleDelta(CompressionCodecFactory & factory);
void registerCodecGorilla(CompressionCodecFactory & factory);
void registerCodecEncrypted(CompressionCodecFactory & factory);
void registerCodecMultiple(CompressionCodecFactory & factory);
#endif
CompressionCodecFactory::CompressionCodecFactory()
{
registerCodecLZ4(*this);
registerCodecNone(*this);
registerCodecLZ4(*this);
registerCodecZSTD(*this);
registerCodecLZ4HC(*this);
registerCodecMultiple(*this);
#ifndef KEEPER_STANDALONE_BUILD
registerCodecDelta(*this);
registerCodecT64(*this);
registerCodecDoubleDelta(*this);
registerCodecGorilla(*this);
registerCodecEncrypted(*this);
registerCodecMultiple(*this);
#endif
default_codec = get("LZ4", {});
}

View File

@ -63,12 +63,12 @@ private:
size_t num_dimensions_to_keep;
};
using Node = typename ColumnObject::SubcolumnsTree::Node;
using Node = typename ColumnObject::Subcolumns::Node;
/// Finds a subcolumn from the same Nested type as @entry and inserts
/// an array with default values with consistent sizes as in Nested type.
bool tryInsertDefaultFromNested(
const std::shared_ptr<Node> & entry, const ColumnObject::SubcolumnsTree & subcolumns)
const std::shared_ptr<Node> & entry, const ColumnObject::Subcolumns & subcolumns)
{
if (!entry->path.hasNested())
return false;
@ -198,7 +198,7 @@ void SerializationObject<Parser>::deserializeWholeText(IColumn & column, ReadBuf
template <typename Parser>
void SerializationObject<Parser>::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
deserializeTextImpl(column, [&](String & s) { readEscapedStringInto(s, istr); });
deserializeTextImpl(column, [&](String & s) { readEscapedString(s, istr); });
}
template <typename Parser>

View File

@ -96,6 +96,7 @@ private:
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
return "REMOTE_FS_READ_AND_PUT_IN_CACHE";
}
__builtin_unreachable();
}
size_t first_offset = 0;
};

View File

@ -13,6 +13,7 @@ void registerFileSegmentationEngineCSV(FormatFactory & factory);
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory);
void registerFileSegmentationEngineRegexp(FormatFactory & factory);
void registerFileSegmentationEngineJSONAsString(FormatFactory & factory);
void registerFileSegmentationEngineJSONAsObject(FormatFactory & factory);
void registerFileSegmentationEngineJSONCompactEachRow(FormatFactory & factory);
/// Formats for both input/output.
@ -103,6 +104,7 @@ void registerProtobufSchemaReader(FormatFactory & factory);
void registerProtobufListSchemaReader(FormatFactory & factory);
void registerLineAsStringSchemaReader(FormatFactory & factory);
void registerJSONAsStringSchemaReader(FormatFactory & factory);
void registerJSONAsObjectSchemaReader(FormatFactory & factory);
void registerRawBLOBSchemaReader(FormatFactory & factory);
void registerMsgPackSchemaReader(FormatFactory & factory);
void registerCapnProtoSchemaReader(FormatFactory & factory);
@ -123,6 +125,7 @@ void registerFormats()
registerFileSegmentationEngineJSONEachRow(factory);
registerFileSegmentationEngineRegexp(factory);
registerFileSegmentationEngineJSONAsString(factory);
registerFileSegmentationEngineJSONAsObject(factory);
registerFileSegmentationEngineJSONCompactEachRow(factory);
registerInputFormatNative(factory);
@ -207,6 +210,7 @@ void registerFormats()
registerProtobufListSchemaReader(factory);
registerLineAsStringSchemaReader(factory);
registerJSONAsStringSchemaReader(factory);
registerJSONAsObjectSchemaReader(factory);
registerRawBLOBSchemaReader(factory);
registerMsgPackSchemaReader(factory);
registerCapnProtoSchemaReader(factory);

View File

@ -43,6 +43,9 @@ public:
for (size_t i = 2; i < args.size() - 1; i += 2)
dst_array_types.push_back(args[i]);
// Type of the ELSE branch
dst_array_types.push_back(args.back());
return getLeastSupertype(dst_array_types);
}

View File

@ -663,7 +663,7 @@ public:
Range range{from, to};
from = to;
return std::move(range);
return range;
}
private:

View File

@ -32,6 +32,7 @@ namespace CurrentMetrics
namespace ProfileEvents
{
extern const Event AsyncInsertQuery;
extern const Event AsyncInsertBytes;
}
namespace DB
@ -222,7 +223,9 @@ void AsynchronousInsertQueue::pushImpl(InsertData::EntryPtr entry, QueueIterator
if (!data)
data = std::make_unique<InsertData>();
data->size += entry->bytes.size();
size_t entry_data_size = entry->bytes.size();
data->size += entry_data_size;
data->last_update = std::chrono::steady_clock::now();
data->entries.emplace_back(entry);
@ -239,6 +242,7 @@ void AsynchronousInsertQueue::pushImpl(InsertData::EntryPtr entry, QueueIterator
CurrentMetrics::add(CurrentMetrics::PendingAsyncInsert);
ProfileEvents::increment(ProfileEvents::AsyncInsertQuery);
ProfileEvents::increment(ProfileEvents::AsyncInsertBytes, entry_data_size);
}
void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, const Milliseconds & timeout)

View File

@ -100,20 +100,9 @@ bool checkPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_q
{
auto columns = select_query->select()->children;
const auto * group_by_expr_with_alias = dynamic_cast<const ASTWithAlias *>(argument.get());
if (group_by_expr_with_alias && !group_by_expr_with_alias->alias.empty())
{
for (const auto & column : columns)
{
const auto * col_with_alias = dynamic_cast<const ASTWithAlias *>(column.get());
if (col_with_alias)
{
const auto & alias = col_with_alias->alias;
if (!alias.empty() && alias == group_by_expr_with_alias->alias)
return false;
}
}
}
const auto * expr_with_alias = dynamic_cast<const ASTWithAlias *>(argument.get());
if (expr_with_alias && !expr_with_alias->alias.empty())
return false;
const auto * ast_literal = typeid_cast<const ASTLiteral *>(argument.get());
if (!ast_literal)
@ -130,7 +119,7 @@ bool checkPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_q
pos, columns.size());
const auto & column = columns[--pos];
if (typeid_cast<const ASTIdentifier *>(column.get()))
if (typeid_cast<const ASTIdentifier *>(column.get()) || typeid_cast<const ASTLiteral *>(column.get()))
{
argument = column->clone();
}
@ -1324,7 +1313,9 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChai
throw Exception("Bad ORDER BY expression AST", ErrorCodes::UNKNOWN_TYPE_OF_AST_NODE);
if (getContext()->getSettingsRef().enable_positional_arguments)
{
replaceForPositionalArguments(ast->children.at(0), select_query, ASTSelectQuery::Expression::ORDER_BY);
}
}
getRootActions(select_query->orderBy(), only_types, step.actions());

View File

@ -962,18 +962,29 @@ public:
/// If it's joinGetOrNull, we need to wrap not-nullable columns in StorageJoin.
for (size_t j = 0, size = right_indexes.size(); j < size; ++j)
{
const auto & column = *block.getByPosition(right_indexes[j]).column;
if (auto * nullable_col = typeid_cast<ColumnNullable *>(columns[j].get()); nullable_col && !column.isNullable())
nullable_col->insertFromNotNullable(column, row_num);
auto column_from_block = block.getByPosition(right_indexes[j]);
if (type_name[j].type->lowCardinality() != column_from_block.type->lowCardinality())
{
JoinCommon::changeLowCardinalityInplace(column_from_block);
}
if (auto * nullable_col = typeid_cast<ColumnNullable *>(columns[j].get());
nullable_col && !column_from_block.column->isNullable())
nullable_col->insertFromNotNullable(*column_from_block.column, row_num);
else
columns[j]->insertFrom(column, row_num);
columns[j]->insertFrom(*column_from_block.column, row_num);
}
}
else
{
for (size_t j = 0, size = right_indexes.size(); j < size; ++j)
{
columns[j]->insertFrom(*block.getByPosition(right_indexes[j]).column, row_num);
auto column_from_block = block.getByPosition(right_indexes[j]);
if (type_name[j].type->lowCardinality() != column_from_block.type->lowCardinality())
{
JoinCommon::changeLowCardinalityInplace(column_from_block);
}
columns[j]->insertFrom(*column_from_block.column, row_num);
}
}
}
@ -1013,6 +1024,7 @@ private:
void addColumn(const ColumnWithTypeAndName & src_column, const std::string & qualified_name)
{
columns.push_back(src_column.column->cloneEmpty());
columns.back()->reserve(src_column.column->size());
type_name.emplace_back(src_column.type, src_column.name, qualified_name);

View File

@ -150,12 +150,12 @@ static ColumnsDescription createColumnsDescription(const NamesAndTypesList & col
ColumnsDescription columns_description;
for (
auto [column_name_and_type, declare_column_ast] = std::tuple{columns_name_and_type.begin(), columns_definition->children.begin()};
column_name_and_type != columns_name_and_type.end();
column_name_and_type++,
declare_column_ast++
)
/// FIXME: we could write it like auto [a, b] = std::tuple(x, y),
/// but this produce endless recursion in gcc-11, and leads to SIGSEGV
/// (see git blame for details).
auto column_name_and_type = columns_name_and_type.begin();
auto declare_column_ast = columns_definition->children.begin();
for (; column_name_and_type != columns_name_and_type.end(); column_name_and_type++, declare_column_ast++)
{
const auto & declare_column = (*declare_column_ast)->as<MySQLParser::ASTDeclareColumn>();
String comment;

View File

@ -351,15 +351,6 @@ public:
max_size = max_size_;
}
// Before calling this method you should be sure
// that lock is acquired.
template <typename F>
void processEachQueryStatus(F && func) const
{
for (auto && query : processes)
func(query);
}
void setMaxInsertQueriesAmount(size_t max_insert_queries_amount_)
{
std::lock_guard lock(mutex);

View File

@ -192,7 +192,7 @@ private:
using Result = Element;
static TKey & extractKey(Element & elem) { return elem.value; }
static Element extractResult(Element & elem) { return elem; }
static Result extractResult(Element & elem) { return elem; }
};
if constexpr (is_descending)

View File

@ -345,7 +345,10 @@ void replaceWithSumCount(String column_name, ASTFunction & func)
{
/// Rewrite "avg" to sumCount().1 / sumCount().2
auto new_arg1 = makeASTFunction("tupleElement", func_base, std::make_shared<ASTLiteral>(UInt8(1)));
auto new_arg2 = makeASTFunction("tupleElement", func_base, std::make_shared<ASTLiteral>(UInt8(2)));
auto new_arg2 = makeASTFunction("CAST",
makeASTFunction("tupleElement", func_base, std::make_shared<ASTLiteral>(UInt8(2))),
std::make_shared<ASTLiteral>("Float64"));
func.name = "divide";
exp_list->children.push_back(new_arg1);
exp_list->children.push_back(new_arg2);

View File

@ -607,6 +607,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (async_insert)
{
quota = context->getQuota();
if (quota)
{
quota->used(QuotaType::QUERY_INSERTS, 1);
quota->used(QuotaType::QUERIES, 1);
quota->checkExceeded(QuotaType::ERRORS);
}
queue->push(ast, context);
if (settings.wait_for_async_insert)
@ -617,13 +625,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
res.pipeline = QueryPipeline(Pipe(std::move(source)));
}
quota = context->getQuota();
if (quota)
{
quota->used(QuotaType::QUERY_INSERTS, 1);
quota->used(QuotaType::QUERIES, 1);
}
const auto & table_id = insert_query->table_id;
if (!table_id.empty())
context->setInsertionTable(table_id);

View File

@ -326,9 +326,10 @@ ColumnRawPtrMap materializeColumnsInplaceMap(Block & block, const Names & names)
for (const auto & column_name : names)
{
auto & column = block.getByName(column_name).column;
column = recursiveRemoveLowCardinality(column->convertToFullColumnIfConst());
ptrs[column_name] = column.get();
auto & column = block.getByName(column_name);
column.column = recursiveRemoveLowCardinality(column.column->convertToFullColumnIfConst());
column.type = recursiveRemoveLowCardinality(column.type);
ptrs[column_name] = column.column.get();
}
return ptrs;

View File

@ -25,7 +25,7 @@ CallbackRunner threadPoolCallbackRunner(ThreadPool & pool)
/// Usually it could be ok, because thread pool task is executed before user-level memory tracker is destroyed.
/// However, thread could stay alive inside the thread pool, and it's ThreadStatus as well.
/// When, finally, we destroy the thread (and the ThreadStatus),
/// it can use memory tracker in the ~ThreadStatus in order to alloc/free untracked_memory,\
/// it can use memory tracker in the ~ThreadStatus in order to alloc/free untracked_memory,
/// and by this time user-level memory tracker may be already destroyed.
///
/// As a work-around, reset memory tracker to total, which is always alive.

View File

@ -228,6 +228,14 @@ void registerNonTrivialPrefixAndSuffixCheckerJSONAsString(FormatFactory & factor
factory.registerNonTrivialPrefixAndSuffixChecker("JSONAsString", nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl);
}
void registerJSONAsStringSchemaReader(FormatFactory & factory)
{
factory.registerExternalSchemaReader("JSONAsString", [](const FormatSettings &)
{
return std::make_shared<JSONAsStringExternalSchemaReader>();
});
}
void registerInputFormatJSONAsObject(FormatFactory & factory)
{
factory.registerInputFormat("JSONAsObject", [](
@ -245,11 +253,16 @@ void registerNonTrivialPrefixAndSuffixCheckerJSONAsObject(FormatFactory & factor
factory.registerNonTrivialPrefixAndSuffixChecker("JSONAsObject", nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl);
}
void registerJSONAsStringSchemaReader(FormatFactory & factory)
void registerFileSegmentationEngineJSONAsObject(FormatFactory & factory)
{
factory.registerExternalSchemaReader("JSONAsString", [](const FormatSettings &)
factory.registerFileSegmentationEngine("JSONAsObject", &fileSegmentationEngineJSONEachRow);
}
void registerJSONAsObjectSchemaReader(FormatFactory & factory)
{
factory.registerExternalSchemaReader("JSONAsObject", [](const FormatSettings &)
{
return std::make_shared<JSONAsStringExternalSchemaReader>();
return std::make_shared<JSONAsObjectExternalSchemaReader>();
});
}

View File

@ -5,6 +5,7 @@
#include <Formats/FormatFactory.h>
#include <IO/PeekableReadBuffer.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeObject.h>
namespace DB
{
@ -73,4 +74,13 @@ public:
}
};
class JSONAsObjectExternalSchemaReader : public IExternalSchemaReader
{
public:
NamesAndTypesList readSchema() override
{
return {{"json", std::make_shared<DataTypeObject>("json", false)}};
}
};
}

View File

@ -1,16 +1,22 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <IO/WriteBuffer.h>
#include <IO/Operators.h>
#include <stack>
#include <Common/JSONBuilder.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h>
#include <stack>
#include <IO/Operators.h>
#include <IO/WriteBuffer.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Common/JSONBuilder.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace DB
{
@ -388,6 +394,7 @@ void QueryPlan::explainPlan(WriteBuffer & buffer, const ExplainPlanOptions & opt
static void explainPipelineStep(IQueryPlanStep & step, IQueryPlanStep::FormatSettings & settings)
{
settings.out << String(settings.offset, settings.indent_char) << "(" << step.getName() << ")\n";
size_t current_offset = settings.offset;
step.describePipeline(settings);
if (current_offset == settings.offset)

View File

@ -112,6 +112,9 @@ ReadFromMergeTree::ReadFromMergeTree(
if (enable_parallel_reading)
read_task_callback = context->getMergeTreeReadTaskCallback();
/// Add explicit description.
setStepDescription(data.getStorageID().getFullNameNotQuoted());
}
Pipe ReadFromMergeTree::readFromPool(

View File

@ -100,7 +100,8 @@ public:
bool enable_parallel_reading
);
String getName() const override { return "ReadFromMergeTree"; }
static constexpr auto name = "ReadFromMergeTree";
String getName() const override { return name; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;

View File

@ -488,7 +488,7 @@ auto WindowTransform::moveRowNumberNoCheck(const RowNumber & _x, int64_t offset)
}
}
return std::tuple{x, offset};
return std::tuple<RowNumber, int64_t>{x, offset};
}
auto WindowTransform::moveRowNumber(const RowNumber & _x, int64_t offset) const
@ -505,7 +505,7 @@ auto WindowTransform::moveRowNumber(const RowNumber & _x, int64_t offset) const
assert(oo == 0);
#endif
return std::tuple{x, o};
return std::tuple<RowNumber, int64_t>{x, o};
}

View File

@ -325,6 +325,7 @@ void URLBasedDataSourceConfiguration::set(const URLBasedDataSourceConfiguration
compression_method = conf.compression_method;
structure = conf.structure;
http_method = conf.http_method;
headers = conf.headers;
}
@ -364,6 +365,10 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
{
configuration.structure = config.getString(config_prefix + ".structure", "");
}
else if (key == "compression_method")
{
configuration.compression_method = config.getString(config_prefix + ".compression_method", "");
}
else if (key == "headers")
{
Poco::Util::AbstractConfiguration::Keys header_keys;

View File

@ -114,6 +114,12 @@ struct StorageS3Configuration : URLBasedDataSourceConfiguration
String secret_access_key;
};
struct StorageS3ClusterConfiguration : StorageS3Configuration
{
String cluster_name;
};
struct URLBasedDataSourceConfig
{
URLBasedDataSourceConfiguration configuration;

View File

@ -279,14 +279,17 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
ProfileEvents::increment(ProfileEvents::DataAfterMergeDiffersFromReplica);
LOG_ERROR(log,
"{}. Data after merge is not byte-identical to data on another replicas. There could be several"
" reasons: 1. Using newer version of compression library after server update. 2. Using another"
" compression method. 3. Non-deterministic compression algorithm (highly unlikely). 4."
" Non-deterministic merge algorithm due to logical error in code. 5. Data corruption in memory due"
" to bug in code. 6. Data corruption in memory due to hardware issue. 7. Manual modification of"
" source data after server startup. 8. Manual modification of checksums stored in ZooKeeper. 9."
" Part format related settings like 'enable_mixed_granularity_parts' are different on different"
" replicas. We will download merged part from replica to force byte-identical result.",
"{}. Data after merge is not byte-identical to data on another replicas. There could be several reasons:"
" 1. Using newer version of compression library after server update."
" 2. Using another compression method."
" 3. Non-deterministic compression algorithm (highly unlikely)."
" 4. Non-deterministic merge algorithm due to logical error in code."
" 5. Data corruption in memory due to bug in code."
" 6. Data corruption in memory due to hardware issue."
" 7. Manual modification of source data after server startup."
" 8. Manual modification of checksums stored in ZooKeeper."
" 9. Part format related settings like 'enable_mixed_granularity_parts' are different on different replicas."
" We will download merged part from replica to force byte-identical result.",
getCurrentExceptionMessage(false));
write_part_log(ExecutionStatus::fromCurrentException());

View File

@ -185,7 +185,8 @@ bool MutateFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrit
ProfileEvents::increment(ProfileEvents::DataAfterMutationDiffersFromReplica);
LOG_ERROR(log, "{}. Data after mutation is not byte-identical to data on another replicas. We will download merged part from replica to force byte-identical result.", getCurrentExceptionMessage(false));
LOG_ERROR(log, "{}. Data after mutation is not byte-identical to data on another replicas. "
"We will download merged part from replica to force byte-identical result.", getCurrentExceptionMessage(false));
write_part_log(ExecutionStatus::fromCurrentException());

View File

@ -136,7 +136,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
IMergeTreeDataPart::Checksums projection_checksums_data;
const auto & projection_path = file_path;
if (part_type == MergeTreeDataPartType::COMPACT)
if (projection->getType() == MergeTreeDataPartType::COMPACT)
{
auto proj_path = file_path + MergeTreeDataPartCompact::DATA_FILE_NAME_WITH_EXTENSION;
auto file_buf = disk->readFile(proj_path);

View File

@ -98,8 +98,24 @@ MaterializedPostgreSQLConsumer::StorageData::Buffer::Buffer(
}
void MaterializedPostgreSQLConsumer::assertCorrectInsertion(StorageData::Buffer & buffer, size_t column_idx)
{
if (column_idx >= buffer.description.sample_block.columns()
|| column_idx >= buffer.description.types.size()
|| column_idx >= buffer.columns.size())
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Attempt to insert into buffer at position: {}, but block columns size is {}, types size: {}, columns size: {}, buffer structure: {}",
column_idx,
buffer.description.sample_block.columns(), buffer.description.types.size(), buffer.columns.size(),
buffer.description.sample_block.dumpStructure());
}
void MaterializedPostgreSQLConsumer::insertValue(StorageData::Buffer & buffer, const std::string & value, size_t column_idx)
{
assertCorrectInsertion(buffer, column_idx);
const auto & sample = buffer.description.sample_block.getByPosition(column_idx);
bool is_nullable = buffer.description.types[column_idx].second;
@ -134,6 +150,8 @@ void MaterializedPostgreSQLConsumer::insertValue(StorageData::Buffer & buffer, c
void MaterializedPostgreSQLConsumer::insertDefaultValue(StorageData::Buffer & buffer, size_t column_idx)
{
assertCorrectInsertion(buffer, column_idx);
const auto & sample = buffer.description.sample_block.getByPosition(column_idx);
insertDefaultPostgreSQLValue(*buffer.columns[column_idx], *sample.column);
}
@ -515,13 +533,14 @@ void MaterializedPostgreSQLConsumer::processReplicationMessage(const char * repl
void MaterializedPostgreSQLConsumer::syncTables()
{
try
for (const auto & table_name : tables_to_sync)
{
for (const auto & table_name : tables_to_sync)
{
auto & storage_data = storages.find(table_name)->second;
Block result_rows = storage_data.buffer.description.sample_block.cloneWithColumns(std::move(storage_data.buffer.columns));
auto & storage_data = storages.find(table_name)->second;
Block result_rows = storage_data.buffer.description.sample_block.cloneWithColumns(std::move(storage_data.buffer.columns));
storage_data.buffer.columns = storage_data.buffer.description.sample_block.cloneEmptyColumns();
try
{
if (result_rows.rows())
{
auto storage = storage_data.storage;
@ -543,13 +562,18 @@ void MaterializedPostgreSQLConsumer::syncTables()
CompletedPipelineExecutor executor(io.pipeline);
executor.execute();
storage_data.buffer.columns = storage_data.buffer.description.sample_block.cloneEmptyColumns();
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
LOG_DEBUG(log, "Table sync end for {} tables, last lsn: {} = {}, (attempted lsn {})", tables_to_sync.size(), current_lsn, getLSNValue(current_lsn), getLSNValue(final_lsn));
LOG_DEBUG(log, "Table sync end for {} tables, last lsn: {} = {}, (attempted lsn {})", tables_to_sync.size(), current_lsn, getLSNValue(current_lsn), getLSNValue(final_lsn));
try
{
auto tx = std::make_shared<pqxx::nontransaction>(connection->getRef());
current_lsn = advanceLSN(tx);
tables_to_sync.clear();

View File

@ -122,6 +122,8 @@ private:
void markTableAsSkipped(Int32 relation_id, const String & relation_name);
static void assertCorrectInsertion(StorageData::Buffer & buffer, size_t column_idx);
/// lsn - log sequnce nuumber, like wal offset (64 bit).
static Int64 getLSNValue(const std::string & lsn)
{

View File

@ -64,8 +64,8 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
bool is_attach_,
const MaterializedPostgreSQLSettings & replication_settings,
bool is_materialized_postgresql_database_)
: log(&Poco::Logger::get("PostgreSQLReplicationHandler"))
, context(context_)
: WithContext(context_->getGlobalContext())
, log(&Poco::Logger::get("PostgreSQLReplicationHandler"))
, is_attach(is_attach_)
, postgres_database(postgres_database_)
, postgres_schema(replication_settings.materialized_postgresql_schema)
@ -94,9 +94,9 @@ PostgreSQLReplicationHandler::PostgreSQLReplicationHandler(
}
publication_name = fmt::format("{}_ch_publication", replication_identifier);
startup_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ checkConnectionAndStart(); });
consumer_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ consumerFunc(); });
cleanup_task = context->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ cleanupFunc(); });
startup_task = getContext()->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ checkConnectionAndStart(); });
consumer_task = getContext()->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ consumerFunc(); });
cleanup_task = getContext()->getSchedulePool().createTask("PostgreSQLReplicaStartup", [this]{ cleanupFunc(); });
}
@ -296,7 +296,7 @@ void PostgreSQLReplicationHandler::startSynchronization(bool throw_on_error)
/// (Apart from the case, when shutdownFinal is called).
/// Handler uses it only for loadFromSnapshot and shutdown methods.
consumer = std::make_shared<MaterializedPostgreSQLConsumer>(
context,
getContext(),
std::move(tmp_connection),
replication_slot,
publication_name,
@ -921,9 +921,9 @@ void PostgreSQLReplicationHandler::reloadFromSnapshot(const std::vector<std::pai
for (const auto & [relation_id, table_name] : relation_data)
{
auto storage = DatabaseCatalog::instance().getTable(StorageID(current_database_name, table_name), context);
auto storage = DatabaseCatalog::instance().getTable(StorageID(current_database_name, table_name), getContext());
auto * materialized_storage = storage->as <StorageMaterializedPostgreSQL>();
auto materialized_table_lock = materialized_storage->lockForShare(String(), context->getSettingsRef().lock_acquire_timeout);
auto materialized_table_lock = materialized_storage->lockForShare(String(), getContext()->getSettingsRef().lock_acquire_timeout);
/// If for some reason this temporary table already exists - also drop it.
auto temp_materialized_storage = materialized_storage->createTemporary();

View File

@ -13,7 +13,7 @@ namespace DB
class StorageMaterializedPostgreSQL;
struct SettingChange;
class PostgreSQLReplicationHandler
class PostgreSQLReplicationHandler : WithContext
{
friend class TemporaryReplicationSlot;
@ -98,7 +98,6 @@ private:
std::pair<String, String> getSchemaAndTableName(const String & table_name) const;
Poco::Logger * log;
ContextPtr context;
/// If it is not attach, i.e. a create query, then if publication already exists - always drop it.
bool is_attach;

View File

@ -1312,10 +1312,14 @@ void StorageReplicatedMergeTree::checkPartChecksumsAndAddCommitOps(const zkutil:
if (replica_part_header.getColumnsHash() != local_part_header.getColumnsHash())
{
/// Either it's a bug or ZooKeeper contains broken data.
/// TODO Fix KILL MUTATION and replace CHECKSUM_DOESNT_MATCH with LOGICAL_ERROR
/// (some replicas may skip killed mutation even if it was executed on other replicas)
throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH, "Part {} from {} has different columns hash", part_name, replica);
/// Currently there are two (known) cases when it may happen:
/// - KILL MUTATION query had removed mutation before all replicas have executed assigned MUTATE_PART entries.
/// Some replicas may skip this mutation and update part version without actually applying any changes.
/// It leads to mismatching checksum if changes were applied on other replicas.
/// - ALTER_METADATA and MERGE_PARTS were reordered on some replicas.
/// It may lead to different number of columns in merged parts on these replicas.
throw Exception(ErrorCodes::CHECKSUM_DOESNT_MATCH, "Part {} from {} has different columns hash "
"(it may rarely happen on race condition with KILL MUTATION or ALTER COLUMN).", part_name, replica);
}
replica_part_header.getChecksums().checkEqual(local_part_header.getChecksums(), true);

View File

@ -51,6 +51,7 @@ StorageS3Cluster::StorageS3Cluster(
const StorageID & table_id_,
String cluster_name_,
const String & format_name_,
UInt64 max_single_read_retries_,
UInt64 max_connections_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
@ -63,11 +64,26 @@ StorageS3Cluster::StorageS3Cluster(
, format_name(format_name_)
, compression_method(compression_method_)
{
context_->getGlobalContext()->getRemoteHostFilter().checkURL(Poco::URI{filename});
StorageInMemoryMetadata storage_metadata;
storage_metadata.setColumns(columns_);
StorageS3::updateClientAndAuthSettings(context_, client_auth);
if (columns_.empty())
{
const bool is_key_with_globs = filename.find_first_of("*?{") != std::string::npos;
/// `distributed_processing` is set to false, because this code is executed on the initiator, so there is no callback set
/// for asking for the next tasks.
/// `format_settings` is set to std::nullopt, because StorageS3Cluster is used only as table function
auto columns = StorageS3::getTableStructureFromDataImpl(format_name, client_auth, max_single_read_retries_, compression_method,
/*distributed_processing_*/false, is_key_with_globs, /*format_settings=*/std::nullopt, context_);
storage_metadata.setColumns(columns);
}
else
storage_metadata.setColumns(columns_);
storage_metadata.setConstraints(constraints_);
setInMemoryMetadata(storage_metadata);
StorageS3::updateClientAndAuthSettings(context_, client_auth);
}
/// The code executes on initiator
@ -83,7 +99,6 @@ Pipe StorageS3Cluster::read(
StorageS3::updateClientAndAuthSettings(context, client_auth);
auto cluster = context->getCluster(cluster_name)->getClusterWithReplicasAsShards(context->getSettingsRef());
StorageS3::updateClientAndAuthSettings(context, client_auth);
auto iterator = std::make_shared<StorageS3Source::DisclosedGlobIterator>(*client_auth.client, client_auth.uri);
auto callback = std::make_shared<StorageS3Source::IteratorWrapper>([iterator]() mutable -> String

View File

@ -41,6 +41,7 @@ protected:
const StorageID & table_id_,
String cluster_name_,
const String & format_name_,
UInt64 max_single_read_retries_,
UInt64 max_connections_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,

View File

@ -22,51 +22,29 @@ namespace ErrorCodes
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
void TableFunctionS3::parseArguments(const ASTPtr & ast_function, ContextPtr context)
/// This is needed to avoid copy-pase. Because s3Cluster arguments only differ in additional argument (first) - cluster name
void TableFunctionS3::parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3Configuration & s3_configuration)
{
/// Parse args
ASTs & args_func = ast_function->children;
const auto message = fmt::format(
"The signature of table function {} could be the following:\n" \
" - url\n"
" - url, format\n" \
" - url, format, structure\n" \
" - url, access_key_id, secret_access_key\n" \
" - url, format, structure, compression_method\n" \
" - url, access_key_id, secret_access_key, format\n"
" - url, access_key_id, secret_access_key, format, structure\n" \
" - url, access_key_id, secret_access_key, format, structure, compression_method",
getName());
if (args_func.size() != 1)
throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTs & args = args_func.at(0)->children;
StorageS3Configuration configuration;
if (auto named_collection = getURLBasedDataSourceConfiguration(args, context))
{
auto [common_configuration, storage_specific_args] = named_collection.value();
configuration.set(common_configuration);
s3_configuration.set(common_configuration);
for (const auto & [arg_name, arg_value] : storage_specific_args)
{
if (arg_name == "access_key_id")
configuration.access_key_id = arg_value->as<ASTLiteral>()->value.safeGet<String>();
s3_configuration.access_key_id = arg_value->as<ASTLiteral>()->value.safeGet<String>();
else if (arg_name == "secret_access_key")
configuration.secret_access_key = arg_value->as<ASTLiteral>()->value.safeGet<String>();
s3_configuration.secret_access_key = arg_value->as<ASTLiteral>()->value.safeGet<String>();
else
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Unknown key-value argument `{}` for StorageS3, expected: "
"url, [access_key_id, secret_access_key], name of used format, structure and [compression_method].",
arg_name);
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, error_message);
}
}
else
{
if (args.empty() || args.size() > 6)
throw Exception(message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, error_message);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
@ -110,53 +88,76 @@ void TableFunctionS3::parseArguments(const ASTPtr & ast_function, ContextPtr con
}
/// This argument is always the first
configuration.url = args[0]->as<ASTLiteral &>().value.safeGet<String>();
s3_configuration.url = args[0]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("format"))
configuration.format = args[args_to_idx["format"]]->as<ASTLiteral &>().value.safeGet<String>();
s3_configuration.format = args[args_to_idx["format"]]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("structure"))
configuration.structure = args[args_to_idx["structure"]]->as<ASTLiteral &>().value.safeGet<String>();
s3_configuration.structure = args[args_to_idx["structure"]]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("compression_method"))
configuration.compression_method = args[args_to_idx["compression_method"]]->as<ASTLiteral &>().value.safeGet<String>();
s3_configuration.compression_method = args[args_to_idx["compression_method"]]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("access_key_id"))
configuration.access_key_id = args[args_to_idx["access_key_id"]]->as<ASTLiteral &>().value.safeGet<String>();
s3_configuration.access_key_id = args[args_to_idx["access_key_id"]]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("secret_access_key"))
configuration.secret_access_key = args[args_to_idx["secret_access_key"]]->as<ASTLiteral &>().value.safeGet<String>();
s3_configuration.secret_access_key = args[args_to_idx["secret_access_key"]]->as<ASTLiteral &>().value.safeGet<String>();
}
if (configuration.format == "auto")
configuration.format = FormatFactory::instance().getFormatFromFileName(configuration.url, true);
if (s3_configuration.format == "auto")
s3_configuration.format = FormatFactory::instance().getFormatFromFileName(s3_configuration.url, true);
}
s3_configuration = std::move(configuration);
void TableFunctionS3::parseArguments(const ASTPtr & ast_function, ContextPtr context)
{
/// Parse args
ASTs & args_func = ast_function->children;
const auto message = fmt::format(
"The signature of table function {} could be the following:\n" \
" - url\n" \
" - url, format\n" \
" - url, format, structure\n" \
" - url, access_key_id, secret_access_key\n" \
" - url, format, structure, compression_method\n" \
" - url, access_key_id, secret_access_key, format\n" \
" - url, access_key_id, secret_access_key, format, structure\n" \
" - url, access_key_id, secret_access_key, format, structure, compression_method",
getName());
if (args_func.size() != 1)
throw Exception("Table function '" + getName() + "' must have arguments.", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
auto & args = args_func.at(0)->children;
parseArgumentsImpl(message, args, context, configuration);
}
ColumnsDescription TableFunctionS3::getActualTableStructure(ContextPtr context) const
{
if (s3_configuration->structure == "auto")
if (configuration.structure == "auto")
{
return StorageS3::getTableStructureFromData(
s3_configuration->format,
S3::URI(Poco::URI(s3_configuration->url)),
s3_configuration->access_key_id,
s3_configuration->secret_access_key,
configuration.format,
S3::URI(Poco::URI(configuration.url)),
configuration.access_key_id,
configuration.secret_access_key,
context->getSettingsRef().s3_max_connections,
context->getSettingsRef().s3_max_single_read_retries,
s3_configuration->compression_method,
configuration.compression_method,
false,
std::nullopt,
context);
}
return parseColumnsListFromString(s3_configuration->structure, context);
return parseColumnsListFromString(configuration.structure, context);
}
StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, ContextPtr context, const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
Poco::URI uri (s3_configuration->url);
Poco::URI uri (configuration.url);
S3::URI s3_uri (uri);
UInt64 max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries;
UInt64 min_upload_part_size = context->getSettingsRef().s3_min_upload_part_size;
@ -166,17 +167,17 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
UInt64 max_connections = context->getSettingsRef().s3_max_connections;
ColumnsDescription columns;
if (s3_configuration->structure != "auto")
columns = parseColumnsListFromString(s3_configuration->structure, context);
if (configuration.structure != "auto")
columns = parseColumnsListFromString(configuration.structure, context);
else if (!structure_hint.empty())
columns = structure_hint;
StoragePtr storage = StorageS3::create(
s3_uri,
s3_configuration->access_key_id,
s3_configuration->secret_access_key,
configuration.access_key_id,
configuration.secret_access_key,
StorageID(getDatabaseName(), table_name),
s3_configuration->format,
configuration.format,
max_single_read_retries,
min_upload_part_size,
upload_part_size_multiply_factor,
@ -189,7 +190,7 @@ StoragePtr TableFunctionS3::executeImpl(const ASTPtr & /*ast_function*/, Context
context,
/// No format_settings for table function S3
std::nullopt,
s3_configuration->compression_method);
configuration.compression_method);
storage->startup();

View File

@ -12,6 +12,7 @@ namespace DB
{
class Context;
class TableFunctionS3Cluster;
/* s3(source, [access_key_id, secret_access_key,] format, structure[, compression]) - creates a temporary storage for a file in S3.
*/
@ -23,13 +24,15 @@ public:
{
return name;
}
bool hasStaticStructure() const override { return s3_configuration->structure != "auto"; }
bool hasStaticStructure() const override { return configuration.structure != "auto"; }
bool needStructureHint() const override { return s3_configuration->structure == "auto"; }
bool needStructureHint() const override { return configuration.structure == "auto"; }
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
protected:
friend class TableFunctionS3Cluster;
StoragePtr executeImpl(
const ASTPtr & ast_function,
ContextPtr context,
@ -41,7 +44,9 @@ protected:
ColumnsDescription getActualTableStructure(ContextPtr context) const override;
void parseArguments(const ASTPtr & ast_function, ContextPtr context) override;
std::optional<StorageS3Configuration> s3_configuration;
static void parseArgumentsImpl(const String & error_message, ASTs & args, ContextPtr context, StorageS3Configuration & configuration);
StorageS3Configuration configuration;
ColumnsDescription structure_hint;
};

View File

@ -32,6 +32,7 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_GET;
}
@ -45,55 +46,58 @@ void TableFunctionS3Cluster::parseArguments(const ASTPtr & ast_function, Context
ASTs & args = args_func.at(0)->children;
for (auto & arg : args)
arg = evaluateConstantExpressionAsLiteral(arg, context);
const auto message = fmt::format(
"The signature of table function {} could be the following:\n" \
" - cluster, url\n"
" - cluster, url, format\n" \
" - cluster, url, format, structure\n" \
" - cluster, url, access_key_id, secret_access_key\n" \
" - cluster, url, format, structure, compression_method\n" \
" - cluster, url, access_key_id, secret_access_key, format\n"
" - cluster, url, access_key_id, secret_access_key, format, structure\n" \
" - cluster, url, access_key_id, secret_access_key, format, structure, compression_method",
getName());
if (args.size() < 4 || args.size() > 7)
if (args.size() < 2 || args.size() > 7)
throw Exception(message, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);
/// This arguments are always the first
cluster_name = args[0]->as<ASTLiteral &>().value.safeGet<String>();
filename = args[1]->as<ASTLiteral &>().value.safeGet<String>();
configuration.cluster_name = args[0]->as<ASTLiteral &>().value.safeGet<String>();
/// Size -> argument indexes
static auto size_to_args = std::map<size_t, std::map<String, size_t>>
{
{4, {{"format", 2}, {"structure", 3}}},
{5, {{"format", 2}, {"structure", 3}, {"compression_method", 4}}},
{6, {{"access_key_id", 2}, {"secret_access_key", 3}, {"format", 4}, {"structure", 5}}},
{7, {{"access_key_id", 2}, {"secret_access_key", 3}, {"format", 4}, {"structure", 5}, {"compression_method", 6}}}
};
if (!context->tryGetCluster(configuration.cluster_name))
throw Exception(ErrorCodes::BAD_GET, "Requested cluster '{}' not found", configuration.cluster_name);
auto & args_to_idx = size_to_args[args.size()];
/// Just cut the first arg (cluster_name) and try to parse s3 table function arguments as is
ASTs clipped_args;
clipped_args.reserve(args.size());
std::copy(args.begin() + 1, args.end(), std::back_inserter(clipped_args));
if (args_to_idx.contains("format"))
format = args[args_to_idx["format"]]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("structure"))
structure = args[args_to_idx["structure"]]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("compression_method"))
compression_method = args[args_to_idx["compression_method"]]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("access_key_id"))
access_key_id = args[args_to_idx["access_key_id"]]->as<ASTLiteral &>().value.safeGet<String>();
if (args_to_idx.contains("secret_access_key"))
secret_access_key = args[args_to_idx["secret_access_key"]]->as<ASTLiteral &>().value.safeGet<String>();
/// StorageS3ClusterConfiguration inherints from StorageS3Configuration, so it is safe to upcast it.
TableFunctionS3::parseArgumentsImpl(message, clipped_args, context, static_cast<StorageS3Configuration & >(configuration));
}
ColumnsDescription TableFunctionS3Cluster::getActualTableStructure(ContextPtr context) const
{
return parseColumnsListFromString(structure, context);
if (configuration.structure == "auto")
{
return StorageS3::getTableStructureFromData(
configuration.format,
S3::URI(Poco::URI(configuration.url)),
configuration.access_key_id,
configuration.secret_access_key,
context->getSettingsRef().s3_max_connections,
context->getSettingsRef().s3_max_single_read_retries,
configuration.compression_method,
false,
std::nullopt,
context);
}
return parseColumnsListFromString(configuration.structure, context);
}
StoragePtr TableFunctionS3Cluster::executeImpl(
@ -101,46 +105,60 @@ StoragePtr TableFunctionS3Cluster::executeImpl(
const std::string & table_name, ColumnsDescription /*cached_columns*/) const
{
StoragePtr storage;
UInt64 max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries;
UInt64 min_upload_part_size = context->getSettingsRef().s3_min_upload_part_size;
UInt64 upload_part_size_multiply_factor = context->getSettingsRef().s3_upload_part_size_multiply_factor;
UInt64 upload_part_size_multiply_parts_count_threshold = context->getSettingsRef().s3_upload_part_size_multiply_parts_count_threshold;
UInt64 max_single_part_upload_size = context->getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = context->getSettingsRef().s3_max_connections;
ColumnsDescription columns;
if (configuration.structure != "auto")
columns = parseColumnsListFromString(configuration.structure, context);
else if (!structure_hint.empty())
columns = structure_hint;
if (context->getClientInfo().query_kind == ClientInfo::QueryKind::SECONDARY_QUERY)
{
/// On worker node this filename won't contains globs
Poco::URI uri (filename);
Poco::URI uri (configuration.url);
S3::URI s3_uri (uri);
/// Actually this parameters are not used
UInt64 max_single_read_retries = context->getSettingsRef().s3_max_single_read_retries;
UInt64 min_upload_part_size = context->getSettingsRef().s3_min_upload_part_size;
UInt64 upload_part_size_multiply_factor = context->getSettingsRef().s3_upload_part_size_multiply_factor;
UInt64 upload_part_size_multiply_parts_count_threshold = context->getSettingsRef().s3_upload_part_size_multiply_parts_count_threshold;
UInt64 max_single_part_upload_size = context->getSettingsRef().s3_max_single_part_upload_size;
UInt64 max_connections = context->getSettingsRef().s3_max_connections;
storage = StorageS3::create(
s3_uri,
access_key_id,
secret_access_key,
configuration.access_key_id,
configuration.secret_access_key,
StorageID(getDatabaseName(), table_name),
format,
configuration.format,
max_single_read_retries,
min_upload_part_size,
upload_part_size_multiply_factor,
upload_part_size_multiply_parts_count_threshold,
max_single_part_upload_size,
max_connections,
getActualTableStructure(context),
columns,
ConstraintsDescription{},
String{},
context,
// No format_settings for S3Cluster
std::nullopt,
compression_method,
configuration.compression_method,
/*distributed_processing=*/true);
}
else
{
storage = StorageS3Cluster::create(
filename, access_key_id, secret_access_key, StorageID(getDatabaseName(), table_name),
cluster_name, format, context->getSettingsRef().s3_max_connections,
getActualTableStructure(context), ConstraintsDescription{},
context, compression_method);
configuration.url,
configuration.access_key_id,
configuration.secret_access_key,
StorageID(getDatabaseName(), table_name),
configuration.cluster_name, configuration.format,
max_single_read_retries,
max_connections,
columns,
ConstraintsDescription{},
context,
configuration.compression_method);
}
storage->startup();

View File

@ -5,6 +5,7 @@
#if USE_AWS_S3
#include <TableFunctions/ITableFunction.h>
#include <Storages/ExternalDataSourceConfiguration.h>
namespace DB
@ -28,7 +29,12 @@ public:
{
return name;
}
bool hasStaticStructure() const override { return true; }
bool hasStaticStructure() const override { return configuration.structure != "auto"; }
bool needStructureHint() const override { return configuration.structure == "auto"; }
void setStructureHint(const ColumnsDescription & structure_hint_) override { structure_hint = structure_hint_; }
protected:
StoragePtr executeImpl(
@ -42,13 +48,8 @@ protected:
ColumnsDescription getActualTableStructure(ContextPtr) const override;
void parseArguments(const ASTPtr &, ContextPtr) override;
String cluster_name;
String filename;
String format;
String structure;
String access_key_id;
String secret_access_key;
String compression_method = "auto";
StorageS3ClusterConfiguration configuration;
ColumnsDescription structure_hint;
};
}

View File

@ -148,6 +148,17 @@ if __name__ == "__main__":
build_name,
)
some_builds_are_missing = len(build_reports_map) < len(reports_order)
if some_builds_are_missing:
logging.info(
"Expected to get %s build results, got %s",
len(reports_order),
len(build_reports_map),
)
else:
logging.info("Got exactly %s builds", len(build_reports_map))
build_reports = [
build_reports_map[build_name]
for build_name in reports_order
@ -219,10 +230,10 @@ if __name__ == "__main__":
if build_result.status == "success":
ok_builds += 1
if ok_builds == 0:
if ok_builds == 0 or some_builds_are_missing:
summary_status = "error"
description = "{}/{} builds are OK".format(ok_builds, total_builds)
description = f"{ok_builds}/{total_builds} builds are OK"
print("::notice ::Report url: {}".format(url))

View File

@ -206,6 +206,7 @@ CI_CONFIG = {
"binary_freebsd",
"binary_darwin_aarch64",
"binary_ppc64le",
"binary_gcc",
],
},
"tests_config": {

View File

@ -8,30 +8,16 @@ from get_robot_token import get_parameter_from_ssm
class ClickHouseHelper:
def __init__(self, url=None, user=None, password=None):
self.url2 = None
self.auth2 = None
def __init__(self, url=None):
if url is None:
url = get_parameter_from_ssm("clickhouse-test-stat-url")
self.url2 = get_parameter_from_ssm("clickhouse-test-stat-url2")
self.auth2 = {
self.url = get_parameter_from_ssm("clickhouse-test-stat-url2")
self.auth = {
"X-ClickHouse-User": get_parameter_from_ssm(
"clickhouse-test-stat-login2"
),
"X-ClickHouse-Key": "",
}
self.url = url
self.auth = {
"X-ClickHouse-User": user
if user is not None
else get_parameter_from_ssm("clickhouse-test-stat-login"),
"X-ClickHouse-Key": password
if password is not None
else get_parameter_from_ssm("clickhouse-test-stat-password"),
}
@staticmethod
def _insert_json_str_info_impl(url, auth, db, table, json_str):
params = {
@ -78,8 +64,6 @@ class ClickHouseHelper:
def _insert_json_str_info(self, db, table, json_str):
self._insert_json_str_info_impl(self.url, self.auth, db, table, json_str)
if self.url2:
self._insert_json_str_info_impl(self.url2, self.auth2, db, table, json_str)
def insert_event_into(self, db, table, event):
event_str = json.dumps(event)

View File

@ -379,12 +379,16 @@ def check_need_to_rerun(workflow_description):
def rerun_workflow(workflow_description, token):
print("Going to rerun workflow")
_exec_post_with_retry(workflow_description.rerun_url, token)
try:
_exec_post_with_retry(f"{workflow_description.rerun_url}-failed-jobs", token)
except Exception:
_exec_post_with_retry(workflow_description.rerun_url, token)
def main(event):
token = get_token_from_aws()
event_data = json.loads(event["body"])
print("The body received:", event_data)
workflow_description = get_workflow_description_from_event(event_data)
print("Got workflow description", workflow_description)

View File

@ -374,7 +374,7 @@ class SettingsRandomizer:
"output_format_parallel_formatting": lambda: random.randint(0, 1),
"input_format_parallel_parsing": lambda: random.randint(0, 1),
"min_chunk_bytes_for_parallel_parsing": lambda: max(1024, int(random.gauss(10 * 1024 * 1024, 5 * 1000 * 1000))),
"max_read_buffer_size": lambda: random.randint(1, 20) if random.random() < 0.1 else random.randint(500000, 1048576),
"max_read_buffer_size": lambda: random.randint(500000, 1048576),
"prefer_localhost_replica": lambda: random.randint(0, 1),
"max_block_size": lambda: random.randint(8000, 100000),
"max_threads": lambda: random.randint(1, 64),
@ -468,9 +468,11 @@ class TestCase:
return testcase_args
def add_random_settings(self, client_options):
def add_random_settings(self, args, client_options):
if self.tags and 'no-random-settings' in self.tags:
return client_options
if args.no_random_settings:
return client_options
if len(self.base_url_params) == 0:
os.environ['CLICKHOUSE_URL_PARAMS'] = '&'.join(self.random_settings)
@ -485,9 +487,11 @@ class TestCase:
os.environ['CLICKHOUSE_URL_PARAMS'] = self.base_url_params
os.environ['CLICKHOUSE_CLIENT_OPT'] = self.base_client_options
def add_info_about_settings(self, description):
def add_info_about_settings(self, args, description):
if self.tags and 'no-random-settings' in self.tags:
return description
if args.no_random_settings:
return description
return description + "\n" + "Settings used in the test: " + "--" + " --".join(self.random_settings) + "\n"
@ -788,13 +792,13 @@ class TestCase:
self.runs_count += 1
self.testcase_args = self.configure_testcase_args(args, self.case_file, suite.suite_tmp_path)
client_options = self.add_random_settings(client_options)
client_options = self.add_random_settings(args, client_options)
proc, stdout, stderr, total_time = self.run_single_test(server_logs_level, client_options)
result = self.process_result_impl(proc, stdout, stderr, total_time)
result.check_if_need_retry(args, stdout, stderr, self.runs_count)
if result.status == TestStatus.FAIL:
result.description = self.add_info_about_settings(result.description)
result.description = self.add_info_about_settings(args, result.description)
return result
except KeyboardInterrupt as e:
raise e
@ -802,12 +806,12 @@ class TestCase:
return TestResult(self.name, TestStatus.FAIL,
FailureReason.INTERNAL_QUERY_FAIL,
0.,
self.add_info_about_settings(self.get_description_from_exception_info(sys.exc_info())))
self.add_info_about_settings(args, self.get_description_from_exception_info(sys.exc_info())))
except (ConnectionRefusedError, ConnectionResetError):
return TestResult(self.name, TestStatus.FAIL,
FailureReason.SERVER_DIED,
0.,
self.add_info_about_settings(self.get_description_from_exception_info(sys.exc_info())))
self.add_info_about_settings(args, self.get_description_from_exception_info(sys.exc_info())))
except:
return TestResult(self.name, TestStatus.UNKNOWN,
FailureReason.INTERNAL_ERROR,
@ -1501,6 +1505,7 @@ if __name__ == '__main__':
parser.add_argument('--print-time', action='store_true', dest='print_time', help='Print test time')
parser.add_argument('--check-zookeeper-session', action='store_true', help='Check ZooKeeper session uptime to determine if failed test should be retried')
parser.add_argument('--s3-storage', action='store_true', default=False, help='Run tests over s3 storage')
parser.add_argument('--no-random-settings', action='store_true', default=False, help='Disable settings randomization')
parser.add_argument('--run-by-hash-num', type=int, help='Run tests matching crc32(test_name) % run_by_hash_total == run_by_hash_num')
parser.add_argument('--run-by-hash-total', type=int, help='Total test groups for crc32(test_name) % run_by_hash_total == run_by_hash_num')

View File

@ -14,5 +14,13 @@
<user>default</user>
<table>s</table>
</clickhouse_dictionary>
<url_with_headers>
<headers>
<header>
<name>X-ClickHouse-Format</name>
<value>JSONEachRow</value>
</header>
</headers>
</url_with_headers>
</named_collections>
</clickhouse>

View File

@ -162,7 +162,7 @@ def test_put(started_cluster, maybe_auth, positive, compression):
values_csv = "1,2,3\n3,2,1\n78,43,45\n"
filename = "test.csv"
put_query = f"""insert into table function s3('http://{started_cluster.minio_ip}:{started_cluster.minio_port}/{bucket}/{filename}',
{maybe_auth}'CSV', '{table_format}', {compression}) values settings s3_truncate_on_insert=1 {values}"""
{maybe_auth}'CSV', '{table_format}', '{compression}') values settings s3_truncate_on_insert=1 {values}"""
try:
run_query(instance, put_query)

View File

@ -19,11 +19,11 @@ function pack_unpack_compare()
${CLICKHOUSE_CLIENT} --query "CREATE TABLE buf_00385 ENGINE = Memory AS $1"
local res_orig
res_orig=$(${CLICKHOUSE_CLIENT} --max_threads=1 --query "SELECT $TABLE_HASH FROM buf_00385")
res_orig=$(${CLICKHOUSE_CLIENT} --max_block_size=65505 --max_threads=1 --query "SELECT $TABLE_HASH FROM buf_00385")
${CLICKHOUSE_CLIENT} --max_threads=1 --query "CREATE TABLE buf_file ENGINE = File($3) AS SELECT * FROM buf_00385"
local res_db_file
res_db_file=$(${CLICKHOUSE_CLIENT} --max_threads=1 --query "SELECT $TABLE_HASH FROM buf_file")
res_db_file=$(${CLICKHOUSE_CLIENT} --max_block_size=65505 --max_threads=1 --query "SELECT $TABLE_HASH FROM buf_file")
${CLICKHOUSE_CLIENT} --max_threads=1 --query "SELECT * FROM buf_00385 FORMAT $3" > "$buf_file"
local res_ch_local1

View File

@ -34,7 +34,7 @@ CREATE TABLE test_01037.polygons_array
ENGINE = Memory;
"
$CLICKHOUSE_CLIENT --query="INSERT INTO test_01037.polygons_array FORMAT JSONEachRow" --max_insert_block_size=100000 < "${CURDIR}/01037_polygon_data"
$CLICKHOUSE_CLIENT --query="INSERT INTO test_01037.polygons_array FORMAT JSONEachRow" --min_chunk_bytes_for_parallel_parsing=10485760 --max_insert_block_size=100000 < "${CURDIR}/01037_polygon_data"
rm "${CURDIR}"/01037_polygon_data

View File

@ -16,10 +16,10 @@ do
${CLICKHOUSE_CLIENT} --query "DROP TABLE file"
done
${CLICKHOUSE_CLIENT} --query "SELECT count(), max(x) FROM file('${CLICKHOUSE_DATABASE}/{gz,br,xz,zst,lz4,bz2}.tsv.{gz,br,xz,zst,lz4,bz2}', TSV, 'x UInt64')"
${CLICKHOUSE_CLIENT} --max_read_buffer_size=1048576 --query "SELECT count(), max(x) FROM file('${CLICKHOUSE_DATABASE}/{gz,br,xz,zst,lz4,bz2}.tsv.{gz,br,xz,zst,lz4,bz2}', TSV, 'x UInt64')"
for m in gz br xz zst lz4 bz2
do
${CLICKHOUSE_CLIENT} --query "SELECT count() < 4000000, max(x) FROM file('${CLICKHOUSE_DATABASE}/${m}.tsv.${m}', RowBinary, 'x UInt8', 'none')"
${CLICKHOUSE_CLIENT} --max_read_buffer_size=1048576 --query "SELECT count() < 4000000, max(x) FROM file('${CLICKHOUSE_DATABASE}/${m}.tsv.${m}', RowBinary, 'x UInt8', 'none')"
done

View File

@ -1,5 +1,6 @@
set log_queries=1;
set log_query_threads=1;
set max_threads=0;
WITH 01091 AS id SELECT 1;
SYSTEM FLUSH LOGS;

View File

@ -38,27 +38,17 @@ function restart_replicas_loop()
done
sleep 0.$RANDOM
}
function restart_thread_1()
{
restart_replicas_loop
}
function restart_thread_2()
{
restart_replicas_loop
}
export -f rename_thread_1
export -f rename_thread_2
export -f restart_thread_1
export -f restart_thread_2
export -f restart_replicas_loop
TIMEOUT=10
clickhouse_client_loop_timeout $TIMEOUT rename_thread_1 2> /dev/null &
clickhouse_client_loop_timeout $TIMEOUT rename_thread_2 2> /dev/null &
clickhouse_client_loop_timeout $TIMEOUT restart_thread_1 2> /dev/null &
clickhouse_client_loop_timeout $TIMEOUT restart_thread_2 2> /dev/null &
clickhouse_client_loop_timeout $TIMEOUT restart_replicas_loop 2> /dev/null &
clickhouse_client_loop_timeout $TIMEOUT restart_replicas_loop 2> /dev/null &
wait

View File

@ -41,4 +41,13 @@ timeout $TIMEOUT bash -c show_processes_func &
wait
# otherwise it can be alive after test
query_alive=$($CLICKHOUSE_CLIENT --query "SELECT count() FROM system.processes WHERE query ILIKE 'SELECT * FROM numbers(600000000)%'")
while [[ $query_alive != 0 ]]
do
$CLICKHOUSE_CLIENT -q "KILL QUERY WHERE query ilike '%SELECT * FROM numbers(600000000)%'" 2> /dev/null 1> /dev/null
sleep 0.5
query_alive=$($CLICKHOUSE_CLIENT --query "SELECT count() FROM system.processes WHERE query ILIKE 'SELECT * FROM numbers(600000000)%'")
done
echo "Test OK"

View File

@ -20,7 +20,8 @@ function thread()
REPLICA=$1
ITERATIONS=$2
$CLICKHOUSE_CLIENT --max_block_size 1 --min_insert_block_size_rows 0 --min_insert_block_size_bytes 0 --query "INSERT INTO r$REPLICA SELECT number * $NUM_REPLICAS + $REPLICA FROM numbers($ITERATIONS)"
# It's legal to fetch something before insert finished
$CLICKHOUSE_CLIENT --max_block_size 1 --min_insert_block_size_rows 0 --min_insert_block_size_bytes 0 --query "INSERT INTO r$REPLICA SELECT number * $NUM_REPLICAS + $REPLICA FROM numbers($ITERATIONS)" 2>&1 | grep -v -F "Tried to commit obsolete part"
}
for REPLICA in $SEQ; do

View File

@ -1,5 +1,8 @@
drop table if exists tsv;
set output_format_parallel_formatting=1;
set max_read_buffer_size=1048576;
set max_block_size=65505;
create table tsv(a int, b int default 7) engine File(TSV);
insert into tsv(a) select number from numbers(10000000);

View File

@ -8,6 +8,6 @@ CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. "$CURDIR"/../shell_config.sh
for _ in {1..10}; do
${CLICKHOUSE_CLIENT} --max_memory_usage '10G' --query "SELECT i FROM generateRandom('i Array(Int8)', 1, 1, 1048577) LIMIT 65536" |& grep -v -e 'Received exception from server' -e 'Code: 241' -e '(query: '
${CLICKHOUSE_CLIENT} --max_block_size=65505 --max_memory_usage '10G' --query "SELECT i FROM generateRandom('i Array(Int8)', 1, 1, 1048577) LIMIT 65536" |& grep -v -e 'Received exception from server' -e 'Code: 241' -e '(query: '
done
exit 0

View File

@ -9,8 +9,8 @@ CREATE TABLE default.merge_tree_pk_sql\n(\n `key` UInt64,\n `value` String
1 c
2 b
1 c 0
2 e 555
2 b 0
2 e 555
CREATE TABLE default.merge_tree_pk_sql\n(\n `key` UInt64,\n `value` String,\n `key2` UInt64\n)\nENGINE = ReplacingMergeTree\nPRIMARY KEY key\nORDER BY (key, key2)\nSETTINGS index_granularity = 8192
CREATE TABLE default.replicated_merge_tree_pk_sql\n(\n `key` UInt64,\n `value` String\n)\nENGINE = ReplicatedReplacingMergeTree(\'/clickhouse/test/01532_primary_key_without\', \'r1\')\nPRIMARY KEY key\nORDER BY key\nSETTINGS index_granularity = 8192
1 a
@ -18,6 +18,6 @@ CREATE TABLE default.replicated_merge_tree_pk_sql\n(\n `key` UInt64,\n `va
1 c
2 b
1 c 0
2 e 555
2 b 0
2 e 555
CREATE TABLE default.replicated_merge_tree_pk_sql\n(\n `key` UInt64,\n `value` String,\n `key2` UInt64\n)\nENGINE = ReplicatedReplacingMergeTree(\'/clickhouse/test/01532_primary_key_without\', \'r1\')\nPRIMARY KEY key\nORDER BY (key, key2)\nSETTINGS index_granularity = 8192

View File

@ -1,4 +1,4 @@
-- Tags: zookeeper
-- Tags: zookeeper, no-parallel
DROP TABLE IF EXISTS merge_tree_pk;
@ -15,14 +15,14 @@ SHOW CREATE TABLE merge_tree_pk;
INSERT INTO merge_tree_pk VALUES (1, 'a');
INSERT INTO merge_tree_pk VALUES (2, 'b');
SELECT * FROM merge_tree_pk ORDER BY key;
SELECT * FROM merge_tree_pk ORDER BY key, value;
INSERT INTO merge_tree_pk VALUES (1, 'c');
DETACH TABLE merge_tree_pk;
ATTACH TABLE merge_tree_pk;
SELECT * FROM merge_tree_pk FINAL ORDER BY key;
SELECT * FROM merge_tree_pk FINAL ORDER BY key, value;
DROP TABLE IF EXISTS merge_tree_pk;
@ -41,14 +41,14 @@ SHOW CREATE TABLE merge_tree_pk_sql;
INSERT INTO merge_tree_pk_sql VALUES (1, 'a');
INSERT INTO merge_tree_pk_sql VALUES (2, 'b');
SELECT * FROM merge_tree_pk_sql ORDER BY key;
SELECT * FROM merge_tree_pk_sql ORDER BY key, value;
INSERT INTO merge_tree_pk_sql VALUES (1, 'c');
DETACH TABLE merge_tree_pk_sql;
ATTACH TABLE merge_tree_pk_sql;
SELECT * FROM merge_tree_pk_sql FINAL ORDER BY key;
SELECT * FROM merge_tree_pk_sql FINAL ORDER BY key, value;
ALTER TABLE merge_tree_pk_sql ADD COLUMN key2 UInt64, MODIFY ORDER BY (key, key2);
@ -56,7 +56,7 @@ INSERT INTO merge_tree_pk_sql VALUES (2, 'd', 555);
INSERT INTO merge_tree_pk_sql VALUES (2, 'e', 555);
SELECT * FROM merge_tree_pk_sql FINAL ORDER BY key;
SELECT * FROM merge_tree_pk_sql FINAL ORDER BY key, value;
SHOW CREATE TABLE merge_tree_pk_sql;
@ -77,14 +77,14 @@ SHOW CREATE TABLE replicated_merge_tree_pk_sql;
INSERT INTO replicated_merge_tree_pk_sql VALUES (1, 'a');
INSERT INTO replicated_merge_tree_pk_sql VALUES (2, 'b');
SELECT * FROM replicated_merge_tree_pk_sql ORDER BY key;
SELECT * FROM replicated_merge_tree_pk_sql ORDER BY key, value;
INSERT INTO replicated_merge_tree_pk_sql VALUES (1, 'c');
DETACH TABLE replicated_merge_tree_pk_sql;
ATTACH TABLE replicated_merge_tree_pk_sql;
SELECT * FROM replicated_merge_tree_pk_sql FINAL ORDER BY key;
SELECT * FROM replicated_merge_tree_pk_sql FINAL ORDER BY key, value;
ALTER TABLE replicated_merge_tree_pk_sql ADD COLUMN key2 UInt64, MODIFY ORDER BY (key, key2);
@ -92,7 +92,7 @@ INSERT INTO replicated_merge_tree_pk_sql VALUES (2, 'd', 555);
INSERT INTO replicated_merge_tree_pk_sql VALUES (2, 'e', 555);
SELECT * FROM replicated_merge_tree_pk_sql FINAL ORDER BY key;
SELECT * FROM replicated_merge_tree_pk_sql FINAL ORDER BY key, value;
DETACH TABLE replicated_merge_tree_pk_sql;
ATTACH TABLE replicated_merge_tree_pk_sql;

View File

@ -9,7 +9,7 @@ Expression (Projection)
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree
ReadFromMergeTree (default.test_order_by)
SELECT
timestamp,
key
@ -21,7 +21,7 @@ Expression (Projection)
Sorting
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree
ReadFromMergeTree (default.test_order_by)
SELECT
timestamp,
key
@ -35,7 +35,7 @@ Expression (Projection)
Sorting
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree
ReadFromMergeTree (default.test_order_by)
SELECT
timestamp,
key

View File

@ -26,35 +26,35 @@ Expression (Projection)
Sorting (Sorting for ORDER BY)
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree
ReadFromMergeTree (default.test_table)
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
Sorting
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree
ReadFromMergeTree (default.test_table)
Expression (Projection)
Limit (preliminary LIMIT (without OFFSET))
Sorting
Expression (Before ORDER BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree
ReadFromMergeTree (default.test_table)
optimize_aggregation_in_order
Expression ((Projection + Before ORDER BY))
Aggregating
Expression (Before GROUP BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree
ReadFromMergeTree (default.test_table)
Expression ((Projection + Before ORDER BY))
Aggregating
Expression (Before GROUP BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree
ReadFromMergeTree (default.test_table)
Expression ((Projection + Before ORDER BY))
Aggregating
Expression (Before GROUP BY)
SettingQuotaAndLimits (Set limits and quota after reading from storage)
ReadFromMergeTree
ReadFromMergeTree (default.test_table)
second-index
1
1

View File

@ -1,3 +1,6 @@
all_1_1_0 1
all_2_2_0 1
201805_1_1_0 1
Wide
Compact
all_1_1_0 1

View File

@ -8,12 +8,20 @@ insert into tp select number, number from numbers(5);
check table tp settings check_query_single_value_result=0;
drop table tp;
create table tp (p Date, k UInt64, v1 UInt64, v2 Int64, projection p1 ( select p, sum(k), sum(v1), sum(v2) group by p) ) engine = MergeTree partition by toYYYYMM(p) order by k settings min_bytes_for_wide_part = 0;
insert into tp (p, k, v1, v2) values ('2018-05-15', 1, 1000, 2000), ('2018-05-16', 2, 3000, 4000), ('2018-05-17', 3, 5000, 6000), ('2018-05-18', 4, 7000, 8000);
check table tp settings check_query_single_value_result=0;
drop table tp;
drop table if exists tp;
CREATE TABLE tp (`p` Date, `k` UInt64, `v1` UInt64, `v2` Int64, PROJECTION p1 ( SELECT p, sum(k), sum(v1), sum(v2) GROUP BY p) ) ENGINE = MergeTree PARTITION BY toYYYYMM(p) ORDER BY k SETTINGS min_bytes_for_wide_part = 0;
INSERT INTO tp (p, k, v1, v2) VALUES ('2018-05-15', 1, 1000, 2000), ('2018-05-16', 2, 3000, 4000), ('2018-05-17', 3, 5000, 6000), ('2018-05-18', 4, 7000, 8000);
CHECK TABLE tp settings check_query_single_value_result=0;
DROP TABLE if exists tp;
create table tp (x int, projection p (select sum(x))) engine = MergeTree order by x settings min_rows_for_wide_part = 2, min_bytes_for_wide_part = 0;
insert into tp values (1), (2), (3), (4);
select part_type from system.parts where database = currentDatabase() and table = 'tp';
select part_type from system.projection_parts where database = currentDatabase() and table = 'tp';
check table tp settings check_query_single_value_result=0;
drop table tp;

Some files were not shown because too many files have changed in this diff Show More