Merge branch 'master' into improve_access_type

This commit is contained in:
taiyang-li 2022-03-29 20:42:57 +08:00
commit 67c3c0be3d
187 changed files with 4197 additions and 517 deletions

View File

@ -210,3 +210,6 @@ CheckOptions:
value: false
- key: performance-move-const-arg.CheckTriviallyCopyableMove
value: false
# Workaround clang-tidy bug: https://github.com/llvm/llvm-project/issues/46097
- key: readability-identifier-naming.TypeTemplateParameterIgnoredRegexp
value: expr-type

View File

@ -360,6 +360,52 @@ jobs:
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinGCC:
needs: [DockerHubPush]
if: ${{ !contains(github.event.pull_request.labels.*.name, 'pr-documentation') && !contains(github.event.pull_request.labels.*.name, 'pr-doc-fix') }}
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
CHECK_NAME=ClickHouse build check (actions)
BUILD_NAME=binary_gcc
EOF
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ env.IMAGES_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
with:
submodules: 'true'
fetch-depth: 0 # otherwise we will have no info about contributors
- name: Build
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$CHECK_NAME" "$BUILD_NAME"
- name: Upload build URLs to artifacts
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v2
with:
name: ${{ env.BUILD_NAME }}
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_NAME }}.json
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderDebAsan:
needs: [DockerHubPush]
runs-on: [self-hosted, builder]
@ -918,6 +964,7 @@ jobs:
- BuilderDebRelease
- BuilderDebAarch64
- BuilderBinRelease
- BuilderBinGCC
- BuilderDebAsan
- BuilderDebTsan
- BuilderDebUBsan
@ -2608,6 +2655,40 @@ jobs:
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
UnitTestsReleaseGCC:
needs: [BuilderBinGCC]
runs-on: [self-hosted, fuzzer-unit-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/unit_tests_asan
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Unit tests (release-gcc, actions)
REPO_COPY=${{runner.temp}}/unit_tests_asan/ClickHouse
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Unit test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 unit_tests_check.py "$CHECK_NAME"
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
UnitTestsTsan:
needs: [BuilderDebTsan]
runs-on: [self-hosted, fuzzer-unit-tester]

View File

@ -370,6 +370,48 @@ jobs:
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderBinGCC:
needs: [DockerHubPush, FastTest]
runs-on: [self-hosted, builder]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/build_check
IMAGES_PATH=${{runner.temp}}/images_path
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
CACHES_PATH=${{runner.temp}}/../ccaches
CHECK_NAME=ClickHouse build check (actions)
BUILD_NAME=binary_gcc
EOF
- name: Download changed images
uses: actions/download-artifact@v2
with:
name: changed_images
path: ${{ runner.temp }}/images_path
- name: Check out repository code
uses: actions/checkout@v2
with:
submodules: 'true'
fetch-depth: 0 # otherwise we will have no info about contributors
- name: Build
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$CHECK_NAME" "$BUILD_NAME"
- name: Upload build URLs to artifacts
if: ${{ success() || failure() }}
uses: actions/upload-artifact@v2
with:
name: ${{ env.BUILD_NAME }}
path: ${{ runner.temp }}/build_check/${{ env.BUILD_NAME }}.json
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
BuilderDebAarch64:
needs: [DockerHubPush, FastTest]
runs-on: [self-hosted, builder]
@ -963,6 +1005,7 @@ jobs:
- BuilderDebRelease
- BuilderDebAarch64
- BuilderBinRelease
- BuilderBinGCC
- BuilderDebAsan
- BuilderDebTsan
- BuilderDebUBsan
@ -2808,6 +2851,40 @@ jobs:
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
UnitTestsReleaseGCC:
needs: [BuilderBinGCC]
runs-on: [self-hosted, fuzzer-unit-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/unit_tests_asan
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Unit tests (release-gcc, actions)
REPO_COPY=${{runner.temp}}/unit_tests_asan/ClickHouse
EOF
- name: Download json reports
uses: actions/download-artifact@v2
with:
path: ${{ env.REPORTS_PATH }}
- name: Clear repository
run: |
sudo rm -fr "$GITHUB_WORKSPACE" && mkdir "$GITHUB_WORKSPACE"
- name: Check out repository code
uses: actions/checkout@v2
- name: Unit test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 unit_tests_check.py "$CHECK_NAME"
- name: Cleanup
if: always()
run: |
docker kill "$(docker ps -q)" ||:
docker rm -f "$(docker ps -a -q)" ||:
sudo rm -fr "$TEMP_PATH"
UnitTestsTsan:
needs: [BuilderDebTsan]
runs-on: [self-hosted, fuzzer-unit-tester]

View File

@ -261,7 +261,7 @@ endif ()
# Add a section with the hash of the compiled machine code for integrity checks.
# Only for official builds, because adding a section can be time consuming (rewrite of several GB).
# And cross compiled binaries are not supported (since you cannot execute clickhouse hash-binary)
if (OBJCOPY_PATH AND YANDEX_OFFICIAL_BUILD AND (NOT CMAKE_TOOLCHAIN_FILE))
if (OBJCOPY_PATH AND CLICKHOUSE_OFFICIAL_BUILD AND (NOT CMAKE_TOOLCHAIN_FILE))
set (USE_BINARY_HASH 1)
endif ()

View File

@ -51,6 +51,6 @@ if (GLIBC_COMPATIBILITY)
message (STATUS "Some symbols from glibc will be replaced for compatibility")
elseif (YANDEX_OFFICIAL_BUILD)
elseif (CLICKHOUSE_OFFICIAL_BUILD)
message (WARNING "Option GLIBC_COMPATIBILITY must be turned on for production builds.")
endif ()

View File

@ -18,6 +18,6 @@ set (VERSION_STRING_SHORT "${VERSION_MAJOR}.${VERSION_MINOR}")
math (EXPR VERSION_INTEGER "${VERSION_PATCH} + ${VERSION_MINOR}*1000 + ${VERSION_MAJOR}*1000000")
if(YANDEX_OFFICIAL_BUILD)
if(CLICKHOUSE_OFFICIAL_BUILD)
set(VERSION_OFFICIAL " (official build)")
endif()

View File

@ -69,9 +69,10 @@ endif ()
target_compile_options(_avrocpp PRIVATE ${SUPPRESS_WARNINGS})
# create a symlink to include headers with <avro/...>
set(AVRO_INCLUDE_DIR "${CMAKE_CURRENT_BINARY_DIR}/include")
ADD_CUSTOM_TARGET(avro_symlink_headers ALL
COMMAND ${CMAKE_COMMAND} -E make_directory "${AVROCPP_ROOT_DIR}/include"
COMMAND ${CMAKE_COMMAND} -E create_symlink "${AVROCPP_ROOT_DIR}/api" "${AVROCPP_ROOT_DIR}/include/avro"
COMMAND ${CMAKE_COMMAND} -E make_directory "${AVRO_INCLUDE_DIR}"
COMMAND ${CMAKE_COMMAND} -E create_symlink "${AVROCPP_ROOT_DIR}/api" "${AVRO_INCLUDE_DIR}/avro"
)
add_dependencies(_avrocpp avro_symlink_headers)
target_include_directories(_avrocpp SYSTEM BEFORE PUBLIC "${AVROCPP_ROOT_DIR}/include")
target_include_directories(_avrocpp SYSTEM BEFORE PUBLIC "${AVRO_INCLUDE_DIR}")

View File

@ -27,7 +27,11 @@ target_include_directories (_boost_headers_only SYSTEM BEFORE INTERFACE ${LIBRAR
# asio
target_compile_definitions (_boost_headers_only INTERFACE BOOST_ASIO_STANDALONE=1)
target_compile_definitions (_boost_headers_only INTERFACE
BOOST_ASIO_STANDALONE=1
# Avoid using of deprecated in c++ > 17 std::result_of
BOOST_ASIO_HAS_STD_INVOKE_RESULT=1
)
# iostreams

2
contrib/hyperscan vendored

@ -1 +1 @@
Subproject commit e9f08df0213fc637aac0a5bbde9beeaeba2fe9fa
Subproject commit 5edc68c5ac68d2d4f876159e9ee84def6d3dc87c

2
contrib/libcxx vendored

@ -1 +1 @@
Subproject commit 61e60294b1de01483caa9f5d00f437c99b674de6
Subproject commit 172b2ae074f6755145b91c53a95c8540c1468239

View File

@ -18,12 +18,14 @@ set(SRCS
"${LIBCXX_SOURCE_DIR}/src/filesystem/directory_iterator.cpp"
"${LIBCXX_SOURCE_DIR}/src/filesystem/int128_builtins.cpp"
"${LIBCXX_SOURCE_DIR}/src/filesystem/operations.cpp"
"${LIBCXX_SOURCE_DIR}/src/format.cpp"
"${LIBCXX_SOURCE_DIR}/src/functional.cpp"
"${LIBCXX_SOURCE_DIR}/src/future.cpp"
"${LIBCXX_SOURCE_DIR}/src/hash.cpp"
"${LIBCXX_SOURCE_DIR}/src/ios.cpp"
"${LIBCXX_SOURCE_DIR}/src/ios.instantiations.cpp"
"${LIBCXX_SOURCE_DIR}/src/iostream.cpp"
"${LIBCXX_SOURCE_DIR}/src/legacy_pointer_safety.cpp"
"${LIBCXX_SOURCE_DIR}/src/locale.cpp"
"${LIBCXX_SOURCE_DIR}/src/memory.cpp"
"${LIBCXX_SOURCE_DIR}/src/mutex.cpp"
@ -33,6 +35,9 @@ set(SRCS
"${LIBCXX_SOURCE_DIR}/src/random.cpp"
"${LIBCXX_SOURCE_DIR}/src/random_shuffle.cpp"
"${LIBCXX_SOURCE_DIR}/src/regex.cpp"
"${LIBCXX_SOURCE_DIR}/src/ryu/d2fixed.cpp"
"${LIBCXX_SOURCE_DIR}/src/ryu/d2s.cpp"
"${LIBCXX_SOURCE_DIR}/src/ryu/f2s.cpp"
"${LIBCXX_SOURCE_DIR}/src/shared_mutex.cpp"
"${LIBCXX_SOURCE_DIR}/src/stdexcept.cpp"
"${LIBCXX_SOURCE_DIR}/src/string.cpp"
@ -49,7 +54,9 @@ set(SRCS
add_library(cxx ${SRCS})
set_target_properties(cxx PROPERTIES FOLDER "contrib/libcxx-cmake")
target_include_directories(cxx SYSTEM BEFORE PUBLIC $<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/include>)
target_include_directories(cxx SYSTEM BEFORE PUBLIC
$<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}/include>
$<BUILD_INTERFACE:${LIBCXX_SOURCE_DIR}>/src)
target_compile_definitions(cxx PRIVATE -D_LIBCPP_BUILDING_LIBRARY -DLIBCXX_BUILDING_LIBCXXABI)
# Enable capturing stack traces for all exceptions.

2
contrib/libcxxabi vendored

@ -1 +1 @@
Subproject commit df8f1e727dbc9e2bedf2282096fa189dc3fe0076
Subproject commit 6eb7cc7a7bdd779e6734d1b9fb451df2274462d7

View File

@ -1,24 +1,24 @@
set(LIBCXXABI_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libcxxabi")
set(SRCS
"${LIBCXXABI_SOURCE_DIR}/src/stdlib_stdexcept.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_virtual.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_thread_atexit.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/fallback_malloc.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_guard.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_default_handlers.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_personality.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/stdlib_exception.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/abort_message.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_aux_runtime.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_default_handlers.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_demangle.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_exception.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_handlers.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_exception_storage.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/private_typeinfo.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/stdlib_typeinfo.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_aux_runtime.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_guard.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_handlers.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_personality.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_thread_atexit.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_vector.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/cxa_virtual.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/fallback_malloc.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/private_typeinfo.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/stdlib_exception.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/stdlib_new_delete.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/stdlib_stdexcept.cpp"
"${LIBCXXABI_SOURCE_DIR}/src/stdlib_typeinfo.cpp"
)
add_library(cxxabi ${SRCS})
@ -30,6 +30,7 @@ target_compile_options(cxxabi PRIVATE -w)
target_include_directories(cxxabi SYSTEM BEFORE
PUBLIC $<BUILD_INTERFACE:${LIBCXXABI_SOURCE_DIR}/include>
PRIVATE $<BUILD_INTERFACE:${LIBCXXABI_SOURCE_DIR}/../libcxx/include>
PRIVATE $<BUILD_INTERFACE:${LIBCXXABI_SOURCE_DIR}/../libcxx/src>
)
target_compile_definitions(cxxabi PRIVATE -D_LIBCPP_BUILDING_LIBRARY)
target_compile_options(cxxabi PRIVATE -nostdinc++ -fno-sanitize=undefined -Wno-macro-redefined) # If we don't disable UBSan, infinite recursion happens in dynamic_cast.

View File

@ -1,12 +1,9 @@
# During cross-compilation in our CI we have to use llvm-tblgen and other building tools
# tools to be build for host architecture and everything else for target architecture (e.g. AArch64)
# Possible workaround is to use llvm-tblgen from some package...
# But lets just enable LLVM for native builds
if (CMAKE_CROSSCOMPILING OR SANITIZE STREQUAL "undefined")
set (ENABLE_EMBEDDED_COMPILER_DEFAULT OFF)
if (APPLE OR NOT ARCH_AMD64 OR SANITIZE STREQUAL "undefined")
set (ENABLE_EMBEDDED_COMPILER_DEFAULT OFF)
else()
set (ENABLE_EMBEDDED_COMPILER_DEFAULT ON)
set (ENABLE_EMBEDDED_COMPILER_DEFAULT ON)
endif()
option (ENABLE_EMBEDDED_COMPILER "Enable support for 'compile_expressions' option for query execution" ${ENABLE_EMBEDDED_COMPILER_DEFAULT})
if (NOT ENABLE_EMBEDDED_COMPILER)

2
contrib/replxx vendored

@ -1 +1 @@
Subproject commit 9460e5e0fc10f78f460af26a6bd928798cac864d
Subproject commit 6f0b6f151ae2a044625ae93acd19ca365fcea64d

View File

@ -163,6 +163,7 @@ def parse_env_variables(
cmake_flags.append("-DCMAKE_INSTALL_PREFIX=/usr")
cmake_flags.append("-DCMAKE_INSTALL_SYSCONFDIR=/etc")
cmake_flags.append("-DCMAKE_INSTALL_LOCALSTATEDIR=/var")
cmake_flags.append("-DBUILD_STANDALONE_KEEPER=ON")
if is_release_build(build_type, package_type, sanitizer, split_binary):
cmake_flags.append("-DINSTALL_STRIPPED_BINARIES=ON")
@ -244,7 +245,7 @@ def parse_env_variables(
result.append(f"AUTHOR='{author}'")
if official:
cmake_flags.append("-DYANDEX_OFFICIAL_BUILD=1")
cmake_flags.append("-DCLICKHOUSE_OFFICIAL_BUILD=1")
result.append('CMAKE_FLAGS="' + " ".join(cmake_flags) + '"')

View File

@ -267,6 +267,7 @@ function run_tests
local test_opts=(
--hung-check
--fast-tests-only
--no-random-settings
--no-long
--testname
--shard

View File

@ -13,7 +13,7 @@ script_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null 2>&1 && pwd )"
echo "$script_dir"
repo_dir=ch
BINARY_TO_DOWNLOAD=${BINARY_TO_DOWNLOAD:="clang-13_debug_none_bundled_unsplitted_disable_False_binary"}
BINARY_URL_TO_DOWNLOAD=${BINARY_URL_TO_DOWNLOAD:="https://clickhouse-builds.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/clickhouse_build_check/$BINARY_TO_DOWNLOAD/clickhouse"}
BINARY_URL_TO_DOWNLOAD=${BINARY_URL_TO_DOWNLOAD:="https://clickhouse-builds.s3.amazonaws.com/$PR_TO_TEST/$SHA_TO_TEST/clickhouse_build_check/$BINARY_TO_DOWNLOAD/clickhouse"}
function clone
{

View File

@ -2,7 +2,7 @@
set -euo pipefail
CLICKHOUSE_PACKAGE=${CLICKHOUSE_PACKAGE:="https://clickhouse-builds.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/clickhouse_build_check/clang-13_relwithdebuginfo_none_bundled_unsplitted_disable_False_binary/clickhouse"}
CLICKHOUSE_PACKAGE=${CLICKHOUSE_PACKAGE:="https://clickhouse-builds.s3.amazonaws.com/$PR_TO_TEST/$SHA_TO_TEST/clickhouse_build_check/clang-13_relwithdebuginfo_none_bundled_unsplitted_disable_False_binary/clickhouse"}
CLICKHOUSE_REPO_PATH=${CLICKHOUSE_REPO_PATH:=""}
@ -10,7 +10,7 @@ if [ -z "$CLICKHOUSE_REPO_PATH" ]; then
CLICKHOUSE_REPO_PATH=ch
rm -rf ch ||:
mkdir ch ||:
wget -nv -nd -c "https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/repo/clickhouse_no_subs.tar.gz"
wget -nv -nd -c "https://clickhouse-test-reports.s3.amazonaws.com/$PR_TO_TEST/$SHA_TO_TEST/repo/clickhouse_no_subs.tar.gz"
tar -C ch --strip-components=1 -xf clickhouse_no_subs.tar.gz
ls -lath ||:
fi

View File

@ -1294,15 +1294,15 @@ create table ci_checks engine File(TSVWithNamesAndTypes, 'ci-checks.tsv')
select '' test_name,
'$(sed -n 's/.*<!--message: \(.*\)-->/\1/p' report.html)' test_status,
0 test_duration_ms,
'https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#fail1' report_url
'https://clickhouse-test-reports.s3.amazonaws.com/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#fail1' report_url
union all
select test || ' #' || toString(query_index), 'slower' test_status, 0 test_duration_ms,
'https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#changes-in-performance.'
'https://clickhouse-test-reports.s3.amazonaws.com/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#changes-in-performance.'
|| test || '.' || toString(query_index) report_url
from queries where changed_fail != 0 and diff > 0
union all
select test || ' #' || toString(query_index), 'unstable' test_status, 0 test_duration_ms,
'https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#unstable-queries.'
'https://clickhouse-test-reports.s3.amazonaws.com/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#unstable-queries.'
|| test || '.' || toString(query_index) report_url
from queries where unstable_fail != 0
)

View File

@ -16,26 +16,17 @@ right_sha=$4
datasets=${CHPC_DATASETS-"hits1 hits10 hits100 values"}
declare -A dataset_paths
if [[ $S3_URL == *"s3.amazonaws.com"* ]]; then
dataset_paths["hits10"]="https://clickhouse-private-datasets.s3.amazonaws.com/hits_10m_single/partitions/hits_10m_single.tar"
dataset_paths["hits100"]="https://clickhouse-private-datasets.s3.amazonaws.com/hits_100m_single/partitions/hits_100m_single.tar"
dataset_paths["hits1"]="https://clickhouse-datasets.s3.amazonaws.com/hits/partitions/hits_v1.tar"
dataset_paths["values"]="https://clickhouse-datasets.s3.amazonaws.com/values_with_expressions/partitions/test_values.tar"
else
dataset_paths["hits10"]="https://s3.mds.yandex.net/clickhouse-private-datasets/hits_10m_single/partitions/hits_10m_single.tar"
dataset_paths["hits100"]="https://s3.mds.yandex.net/clickhouse-private-datasets/hits_100m_single/partitions/hits_100m_single.tar"
dataset_paths["hits1"]="https://clickhouse-datasets.s3.yandex.net/hits/partitions/hits_v1.tar"
dataset_paths["values"]="https://clickhouse-datasets.s3.yandex.net/values_with_expressions/partitions/test_values.tar"
fi
dataset_paths["hits10"]="https://clickhouse-private-datasets.s3.amazonaws.com/hits_10m_single/partitions/hits_10m_single.tar"
dataset_paths["hits100"]="https://clickhouse-private-datasets.s3.amazonaws.com/hits_100m_single/partitions/hits_100m_single.tar"
dataset_paths["hits1"]="https://clickhouse-datasets.s3.amazonaws.com/hits/partitions/hits_v1.tar"
dataset_paths["values"]="https://clickhouse-datasets.s3.amazonaws.com/values_with_expressions/partitions/test_values.tar"
function download
{
# Historically there were various paths for the performance test package.
# Test all of them.
declare -a urls_to_try=("https://s3.amazonaws.com/clickhouse-builds/$left_pr/$left_sha/performance/performance.tgz"
"https://clickhouse-builds.s3.yandex.net/$left_pr/$left_sha/clickhouse_build_check/performance/performance.tgz"
)
declare -a urls_to_try=("https://s3.amazonaws.com/clickhouse-builds/$left_pr/$left_sha/performance/performance.tgz")
for path in "${urls_to_try[@]}"
do

View File

@ -4,7 +4,7 @@ set -ex
CHPC_CHECK_START_TIMESTAMP="$(date +%s)"
export CHPC_CHECK_START_TIMESTAMP
S3_URL=${S3_URL:="https://clickhouse-builds.s3.yandex.net"}
S3_URL=${S3_URL:="https://clickhouse-builds.s3.amazonaws.com"}
COMMON_BUILD_PREFIX="/clickhouse_build_check"
if [[ $S3_URL == *"s3.amazonaws.com"* ]]; then
@ -64,9 +64,7 @@ function find_reference_sha
# Historically there were various path for the performance test package,
# test all of them.
unset found
declare -a urls_to_try=("https://s3.amazonaws.com/clickhouse-builds/0/$REF_SHA/performance/performance.tgz"
"https://clickhouse-builds.s3.yandex.net/0/$REF_SHA/clickhouse_build_check/performance/performance.tgz"
)
declare -a urls_to_try=("https://s3.amazonaws.com/clickhouse-builds/0/$REF_SHA/performance/performance.tgz")
for path in "${urls_to_try[@]}"
do
if curl_with_retry "$path"

View File

@ -11,7 +11,7 @@ RUN apt-get update -y \
COPY s3downloader /s3downloader
ENV S3_URL="https://clickhouse-datasets.s3.yandex.net"
ENV S3_URL="https://clickhouse-datasets.s3.amazonaws.com"
ENV DATASETS="hits visits"
ENV EXPORT_S3_STORAGE_POLICIES=1

View File

@ -10,7 +10,7 @@ import requests
import tempfile
DEFAULT_URL = 'https://clickhouse-datasets.s3.yandex.net'
DEFAULT_URL = 'https://clickhouse-datasets.s3.amazonaws.com'
AVAILABLE_DATASETS = {
'hits': 'hits_v1.tar',

View File

@ -41,6 +41,7 @@ sleep 5
./mc admin user add clickminio test testtest
./mc admin policy set clickminio readwrite user=test
./mc mb clickminio/test
./mc policy set public clickminio/test
# Upload data to Minio. By default after unpacking all tests will in

View File

@ -29,7 +29,7 @@ COPY ./download_previous_release /download_previous_release
COPY run.sh /
ENV DATASETS="hits visits"
ENV S3_URL="https://clickhouse-datasets.s3.yandex.net"
ENV S3_URL="https://clickhouse-datasets.s3.amazonaws.com"
ENV EXPORT_S3_STORAGE_POLICIES=1
CMD ["/bin/bash", "/run.sh"]

View File

@ -1616,3 +1616,14 @@ Possible values:
Default value: `10000`.
## global_memory_usage_overcommit_max_wait_microseconds {#global_memory_usage_overcommit_max_wait_microseconds}
Sets maximum waiting time for global overcommit tracker.
Possible values:
- Positive integer.
Default value: `0`.

View File

@ -0,0 +1,31 @@
# Memory overcommit
Memory overcommit is an experimental technique intended to allow to set more flexible memory limits for queries.
The idea of this technique is to introduce settings which can represent guaranteed amount of memory a query can use.
When memory overcommit is enabled and the memory limit is reached ClickHouse will select the most overcommitted query and try to free memory by killing this query.
When memory limit is reached any query will wait some time during atempt to allocate new memory.
If timeout is passed and memory is freed, the query continues execution. Otherwise an exception will be thrown and the query is killed.
Selection of query to stop or kill is performed by either global or user overcommit trackers depending on what memory limit is reached.
## User overcommit tracker
User overcommit tracker finds a query with the biggest overcommit ratio in the user's query list.
Overcommit ratio is computed as number of allocated bytes divided by value of `max_guaranteed_memory_usage` setting.
Waiting timeout is set by `memory_usage_overcommit_max_wait_microseconds` setting.
**Example**
```sql
SELECT number FROM numbers(1000) GROUP BY number SETTINGS max_guaranteed_memory_usage=4000, memory_usage_overcommit_max_wait_microseconds=500
```
## Global overcommit tracker
Global overcommit tracker finds a query with the biggest overcommit ratio in the list of all queries.
In this case overcommit ratio is computed as number of allocated bytes divided by value of `max_guaranteed_memory_usage_for_user` setting.
Waiting timeout is set by `global_memory_usage_overcommit_max_wait_microseconds` parameter in the configuration file.

View File

@ -4220,10 +4220,36 @@ Possible values:
- 0 — Disabled.
- 1 — Enabled. The wait time equal shutdown_wait_unfinished config.
Default value: 0.
Default value: `0`.
## shutdown_wait_unfinished
The waiting time in seconds for currently handled connections when shutdown server.
Default Value: 5.
Default Value: `5`.
## max_guaranteed_memory_usage
Maximum guaranteed memory usage for processing of single query.
It represents soft limit in case when hard limit is reached on user level.
Zero means unlimited.
Read more about [memory overcommit](memory-overcommit.md).
Default value: `0`.
## memory_usage_overcommit_max_wait_microseconds
Maximum time thread will wait for memory to be freed in the case of memory overcommit on a user level.
If the timeout is reached and memory is not freed, an exception is thrown.
Read more about [memory overcommit](memory-overcommit.md).
Default value: `0`.
## max_guaranteed_memory_usage_for_user
Maximum guaranteed memory usage for processing all concurrently running queries for the user.
It represents soft limit in case when hard limit is reached on global level.
Zero means unlimited.
Read more about [memory overcommit](memory-overcommit.md).
Default value: `0`.

View File

@ -0,0 +1,28 @@
# package sources should be placed in ${PWD}/root
# nfpm should run from the same directory with a config
name: "clickhouse-keeper-dbg"
arch: "${DEB_ARCH}" # amd64, arm64
platform: "linux"
version: "${CLICKHOUSE_VERSION_STRING}"
vendor: "ClickHouse Inc."
homepage: "https://clickhouse.com"
license: "Apache"
section: "database"
priority: "optional"
maintainer: "ClickHouse Dev Team <packages+linux@clickhouse.com>"
description: |
debugging symbols for clickhouse-keeper
This package contains the debugging symbols for clickhouse-keeper.
contents:
- src: root/usr/lib/debug/usr/bin/clickhouse-keeper.debug
dst: /usr/lib/debug/usr/bin/clickhouse-keeper.debug
# docs
- src: ../AUTHORS
dst: /usr/share/doc/clickhouse-keeper-dbg/AUTHORS
- src: ../CHANGELOG.md
dst: /usr/share/doc/clickhouse-keeper-dbg/CHANGELOG.md
- src: ../LICENSE
dst: /usr/share/doc/clickhouse-keeper-dbg/LICENSE
- src: ../README.md
dst: /usr/share/doc/clickhouse-keeper-dbg/README.md

View File

@ -0,0 +1,40 @@
# package sources should be placed in ${PWD}/root
# nfpm should run from the same directory with a config
name: "clickhouse-keeper"
arch: "${DEB_ARCH}" # amd64, arm64
platform: "linux"
version: "${CLICKHOUSE_VERSION_STRING}"
vendor: "ClickHouse Inc."
homepage: "https://clickhouse.com"
license: "Apache"
section: "database"
priority: "optional"
conflicts:
- clickhouse-server
depends:
- adduser
suggests:
- clickhouse-keeper-dbg
maintainer: "ClickHouse Dev Team <packages+linux@clickhouse.com>"
description: |
Static clickhouse-keeper binary
A stand-alone clickhouse-keeper package
contents:
- src: root/etc/clickhouse-keeper
dst: /etc/clickhouse-keeper
type: config
- src: root/usr/bin/clickhouse-keeper
dst: /usr/bin/clickhouse-keeper
# docs
- src: ../AUTHORS
dst: /usr/share/doc/clickhouse-keeper/AUTHORS
- src: ../CHANGELOG.md
dst: /usr/share/doc/clickhouse-keeper/CHANGELOG.md
- src: ../LICENSE
dst: /usr/share/doc/clickhouse-keeper/LICENSE
- src: ../README.md
dst: /usr/share/doc/clickhouse-keeper/README.md

View File

@ -71,17 +71,11 @@ if (BUILD_STANDALONE_KEEPER)
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressedReadBuffer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressedReadBufferFromFile.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressedWriteBuffer.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecDelta.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecDoubleDelta.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecEncrypted.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecGorilla.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecLZ4.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecMultiple.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecNone.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecT64.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionCodecZSTD.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/CompressionFactory.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/getCompressionCodecForFile.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/ICompressionCodec.cpp
${CMAKE_CURRENT_SOURCE_DIR}/../../src/Compression/LZ4_decompress_faster.cpp

View File

@ -829,6 +829,36 @@ if (ThreadFuzzer::instance().isEffective())
fs::create_directories(path / "metadata_dropped/");
}
#if USE_ROCKSDB
/// Initialize merge tree metadata cache
if (config().has("merge_tree_metadata_cache"))
{
fs::create_directories(path / "rocksdb/");
size_t size = config().getUInt64("merge_tree_metadata_cache.lru_cache_size", 256 << 20);
bool continue_if_corrupted = config().getBool("merge_tree_metadata_cache.continue_if_corrupted", false);
try
{
LOG_DEBUG(
log, "Initiailizing merge tree metadata cache lru_cache_size:{} continue_if_corrupted:{}", size, continue_if_corrupted);
global_context->initializeMergeTreeMetadataCache(path_str + "/" + "rocksdb", size);
}
catch (...)
{
if (continue_if_corrupted)
{
/// Rename rocksdb directory and reinitialize merge tree metadata cache
time_t now = time(nullptr);
fs::rename(path / "rocksdb", path / ("rocksdb.old." + std::to_string(now)));
global_context->initializeMergeTreeMetadataCache(path_str + "/" + "rocksdb", size);
}
else
{
throw;
}
}
}
#endif
if (config().has("interserver_http_port") && config().has("interserver_https_port"))
throw Exception("Both http and https interserver ports are specified", ErrorCodes::EXCESSIVE_ELEMENT_IN_CONFIG);

View File

@ -1294,4 +1294,10 @@
</tables>
</rocksdb>
-->
<!-- Uncomment if enable merge tree metadata cache -->
<merge_tree_metadata_cache>
<lru_cache_size>268435456</lru_cache_size>
<continue_if_corrupted>true</continue_if_corrupted>
</merge_tree_metadata_cache>
</clickhouse>

View File

@ -13,7 +13,7 @@ enum class QuotaType
{
QUERIES, /// Number of queries.
QUERY_SELECTS, /// Number of select queries.
QUERY_INSERTS, /// Number of inserts queries.
QUERY_INSERTS, /// Number of insert queries.
ERRORS, /// Number of queries with exceptions.
RESULT_ROWS, /// Number of rows returned as result.
RESULT_BYTES, /// Number of bytes returned as result.

View File

@ -67,7 +67,7 @@ auto parseArguments(const std::string & name, const DataTypes & arguments)
values_types.push_back(array_type->getNestedType());
}
return std::tuple{std::move(keys_type), std::move(values_types), tuple_argument};
return std::tuple<DataTypePtr, DataTypes, bool>{std::move(keys_type), std::move(values_types), tuple_argument};
}
// This function instantiates a particular overload of the sumMap family of

View File

@ -494,6 +494,11 @@ endif()
target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::fast_float)
if (USE_ORC)
dbms_target_link_libraries(PUBLIC ${ORC_LIBRARIES})
dbms_target_include_directories(SYSTEM BEFORE PUBLIC ${ORC_INCLUDE_DIR} "${CMAKE_BINARY_DIR}/contrib/orc/c++/include")
endif ()
if (TARGET ch_contrib::rocksdb)
dbms_target_link_libraries(PUBLIC ch_contrib::rocksdb)
endif()
@ -573,10 +578,6 @@ if (ENABLE_TESTS)
target_link_libraries(unit_tests_dbms PRIVATE ch_contrib::simdjson)
endif()
if(TARGET ch_contrib::rapidjson)
target_include_directories(unit_tests_dbms PRIVATE ch_contrib::rapidjson)
endif()
if (TARGET ch_contrib::yaml_cpp)
target_link_libraries(unit_tests_dbms PRIVATE ch_contrib::yaml_cpp)
endif()

View File

@ -521,7 +521,7 @@ ColumnObject::ColumnObject(bool is_nullable_)
{
}
ColumnObject::ColumnObject(SubcolumnsTree && subcolumns_, bool is_nullable_)
ColumnObject::ColumnObject(Subcolumns && subcolumns_, bool is_nullable_)
: is_nullable(is_nullable_)
, subcolumns(std::move(subcolumns_))
, num_rows(subcolumns.empty() ? 0 : (*subcolumns.begin())->data.size())
@ -696,7 +696,7 @@ const ColumnObject::Subcolumn & ColumnObject::getSubcolumn(const PathInData & ke
ColumnObject::Subcolumn & ColumnObject::getSubcolumn(const PathInData & key)
{
if (const auto * node = subcolumns.findLeaf(key))
return const_cast<SubcolumnsTree::Node *>(node)->data;
return const_cast<Subcolumns::Node *>(node)->data;
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "There is no subcolumn {} in ColumnObject", key.getPath());
}
@ -794,7 +794,7 @@ bool ColumnObject::isFinalized() const
void ColumnObject::finalize()
{
size_t old_size = size();
SubcolumnsTree new_subcolumns;
Subcolumns new_subcolumns;
for (auto && entry : subcolumns)
{
const auto & least_common_type = entry->data.getLeastCommonType();

View File

@ -138,20 +138,20 @@ public:
size_t num_of_defaults_in_prefix = 0;
};
using SubcolumnsTree = SubcolumnsTree<Subcolumn>;
using Subcolumns = SubcolumnsTree<Subcolumn>;
private:
/// If true then all subcolumns are nullable.
const bool is_nullable;
SubcolumnsTree subcolumns;
Subcolumns subcolumns;
size_t num_rows;
public:
static constexpr auto COLUMN_NAME_DUMMY = "_dummy";
explicit ColumnObject(bool is_nullable_);
ColumnObject(SubcolumnsTree && subcolumns_, bool is_nullable_);
ColumnObject(Subcolumns && subcolumns_, bool is_nullable_);
/// Checks that all subcolumns have consistent sizes.
void checkConsistency() const;
@ -173,8 +173,8 @@ public:
/// It cares about consistency of sizes of Nested arrays.
void addNestedSubcolumn(const PathInData & key, const FieldInfo & field_info, size_t new_size);
const SubcolumnsTree & getSubcolumns() const { return subcolumns; }
SubcolumnsTree & getSubcolumns() { return subcolumns; }
const Subcolumns & getSubcolumns() const { return subcolumns; }
Subcolumns & getSubcolumns() { return subcolumns; }
PathsInData getKeys() const;
/// Finalizes all subcolumns.

View File

@ -437,6 +437,7 @@ String FileSegment::stateToString(FileSegment::State state)
case FileSegment::State::SKIP_CACHE:
return "SKIP_CACHE";
}
__builtin_unreachable();
}
String FileSegmentsHolder::toString()

View File

@ -23,6 +23,12 @@ void OvercommitTracker::setMaxWaitTime(UInt64 wait_time)
bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker)
{
// NOTE: Do not change the order of locks
//
// global_mutex must be acquired before overcommit_m, because
// method OvercommitTracker::unsubscribe(MemoryTracker *) is
// always called with already acquired global_mutex in
// ProcessListEntry::~ProcessListEntry().
std::unique_lock<std::mutex> global_lock(global_mutex);
std::unique_lock<std::mutex> lk(overcommit_m);
@ -76,7 +82,7 @@ void UserOvercommitTracker::pickQueryToExcludeImpl()
MemoryTracker * query_tracker = nullptr;
OvercommitRatio current_ratio{0, 0};
// At this moment query list must be read only.
// BlockQueryIfMemoryLimit is used in ProcessList to guarantee this.
// This is guaranteed by locking global_mutex in OvercommitTracker::needToStopQuery.
auto & queries = user_process_list->queries;
LOG_DEBUG(logger, "Trying to choose query to stop from {} queries", queries.size());
for (auto const & query : queries)
@ -111,9 +117,9 @@ void GlobalOvercommitTracker::pickQueryToExcludeImpl()
MemoryTracker * query_tracker = nullptr;
OvercommitRatio current_ratio{0, 0};
// At this moment query list must be read only.
// BlockQueryIfMemoryLimit is used in ProcessList to guarantee this.
LOG_DEBUG(logger, "Trying to choose query to stop");
process_list->processEachQueryStatus([&](DB::QueryStatus const & query)
// This is guaranteed by locking global_mutex in OvercommitTracker::needToStopQuery.
LOG_DEBUG(logger, "Trying to choose query to stop from {} queries", process_list->size());
for (auto const & query : process_list->processes)
{
if (query.isKilled())
return;
@ -134,7 +140,7 @@ void GlobalOvercommitTracker::pickQueryToExcludeImpl()
query_tracker = memory_tracker;
current_ratio = ratio;
}
});
}
LOG_DEBUG(logger, "Selected to stop query with overcommit ratio {}/{}",
current_ratio.committed, current_ratio.soft_limit);
picked_tracker = query_tracker;

View File

@ -43,8 +43,6 @@ class MemoryTracker;
// is killed to free memory.
struct OvercommitTracker : boost::noncopyable
{
explicit OvercommitTracker(std::mutex & global_mutex_);
void setMaxWaitTime(UInt64 wait_time);
bool needToStopQuery(MemoryTracker * tracker);
@ -54,8 +52,12 @@ struct OvercommitTracker : boost::noncopyable
virtual ~OvercommitTracker() = default;
protected:
explicit OvercommitTracker(std::mutex & global_mutex_);
virtual void pickQueryToExcludeImpl() = 0;
// This mutex is used to disallow concurrent access
// to picked_tracker and cancelation_state variables.
mutable std::mutex overcommit_m;
mutable std::condition_variable cv;
@ -87,6 +89,11 @@ private:
}
}
// Global mutex which is used in ProcessList to synchronize
// insertion and deletion of queries.
// OvercommitTracker::pickQueryToExcludeImpl() implementations
// require this mutex to be locked, because they read list (or sublist)
// of queries.
std::mutex & global_mutex;
};

View File

@ -9,6 +9,7 @@
M(SelectQuery, "Same as Query, but only for SELECT queries.") \
M(InsertQuery, "Same as Query, but only for INSERT queries.") \
M(AsyncInsertQuery, "Same as InsertQuery, but only for asynchronous INSERT queries.") \
M(AsyncInsertBytes, "Data size in bytes of asynchronous INSERT queries.") \
M(FailedQuery, "Number of failed queries.") \
M(FailedSelectQuery, "Same as FailedQuery, but only for SELECT queries.") \
M(FailedInsertQuery, "Same as FailedQuery, but only for INSERT queries.") \
@ -284,6 +285,13 @@
\
M(MainConfigLoads, "Number of times the main configuration was reloaded.") \
\
M(MergeTreeMetadataCacheGet, "Number of rocksdb reads(used for merge tree metadata cache)") \
M(MergeTreeMetadataCachePut, "Number of rocksdb puts(used for merge tree metadata cache)") \
M(MergeTreeMetadataCacheDelete, "Number of rocksdb deletes(used for merge tree metadata cache)") \
M(MergeTreeMetadataCacheSeek, "Number of rocksdb seeks(used for merge tree metadata cache)") \
M(MergeTreeMetadataCacheHit, "Number of times the read of meta file was done from MergeTree metadata cache") \
M(MergeTreeMetadataCacheMiss, "Number of times the read of meta file was not done from MergeTree metadata cache") \
\
M(ScalarSubqueriesGlobalCacheHit, "Number of times a read from a scalar subquery was done using the global cache") \
M(ScalarSubqueriesLocalCacheHit, "Number of times a read from a scalar subquery was done using the local cache") \
M(ScalarSubqueriesCacheMiss, "Number of times a read from a scalar subquery was not cached and had to be calculated completely")

View File

@ -1,7 +1,18 @@
#include "gtest_global_context.h"
const ContextHolder & getContext()
{
return getMutableContext();
}
ContextHolder & getMutableContext()
{
static ContextHolder holder;
return holder;
}
void destroyContext()
{
auto & holder = getMutableContext();
return holder.destroy();
}

View File

@ -16,6 +16,17 @@ struct ContextHolder
}
ContextHolder(ContextHolder &&) = default;
void destroy()
{
context->shutdown();
context.reset();
shared_context.reset();
}
};
const ContextHolder & getContext();
ContextHolder & getMutableContext();
void destroyContext();

View File

@ -165,25 +165,36 @@ void registerCodecNone(CompressionCodecFactory & factory);
void registerCodecLZ4(CompressionCodecFactory & factory);
void registerCodecLZ4HC(CompressionCodecFactory & factory);
void registerCodecZSTD(CompressionCodecFactory & factory);
void registerCodecMultiple(CompressionCodecFactory & factory);
/// Keeper use only general-purpose codecs, so we don't need these special codecs
/// in standalone build
#ifndef KEEPER_STANDALONE_BUILD
void registerCodecDelta(CompressionCodecFactory & factory);
void registerCodecT64(CompressionCodecFactory & factory);
void registerCodecDoubleDelta(CompressionCodecFactory & factory);
void registerCodecGorilla(CompressionCodecFactory & factory);
void registerCodecEncrypted(CompressionCodecFactory & factory);
void registerCodecMultiple(CompressionCodecFactory & factory);
#endif
CompressionCodecFactory::CompressionCodecFactory()
{
registerCodecLZ4(*this);
registerCodecNone(*this);
registerCodecLZ4(*this);
registerCodecZSTD(*this);
registerCodecLZ4HC(*this);
registerCodecMultiple(*this);
#ifndef KEEPER_STANDALONE_BUILD
registerCodecDelta(*this);
registerCodecT64(*this);
registerCodecDoubleDelta(*this);
registerCodecGorilla(*this);
registerCodecEncrypted(*this);
registerCodecMultiple(*this);
#endif
default_codec = get("LZ4", {});
}

View File

@ -187,5 +187,4 @@ DECLARE_SETTING_ENUM_WITH_RENAME(EnumComparingMode, FormatSettings::EnumComparin
DECLARE_SETTING_ENUM_WITH_RENAME(EscapingRule, FormatSettings::EscapingRule)
DECLARE_SETTING_ENUM_WITH_RENAME(MsgPackUUIDRepresentation, FormatSettings::MsgPackUUIDRepresentation)
}

View File

@ -63,12 +63,12 @@ private:
size_t num_dimensions_to_keep;
};
using Node = typename ColumnObject::SubcolumnsTree::Node;
using Node = typename ColumnObject::Subcolumns::Node;
/// Finds a subcolumn from the same Nested type as @entry and inserts
/// an array with default values with consistent sizes as in Nested type.
bool tryInsertDefaultFromNested(
const std::shared_ptr<Node> & entry, const ColumnObject::SubcolumnsTree & subcolumns)
const std::shared_ptr<Node> & entry, const ColumnObject::Subcolumns & subcolumns)
{
if (!entry->path.hasNested())
return false;
@ -198,7 +198,7 @@ void SerializationObject<Parser>::deserializeWholeText(IColumn & column, ReadBuf
template <typename Parser>
void SerializationObject<Parser>::deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings &) const
{
deserializeTextImpl(column, [&](String & s) { readEscapedStringInto(s, istr); });
deserializeTextImpl(column, [&](String & s) { readEscapedString(s, istr); });
}
template <typename Parser>

View File

@ -96,6 +96,7 @@ private:
case ReadType::REMOTE_FS_READ_AND_PUT_IN_CACHE:
return "REMOTE_FS_READ_AND_PUT_IN_CACHE";
}
__builtin_unreachable();
}
size_t first_offset = 0;
};

View File

@ -9,6 +9,7 @@
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeObject.h>
#include <Common/JSONParsers/SimdJSONParser.h>
#include <Common/JSONParsers/RapidJSONParser.h>
#include <Common/JSONParsers/DummyJSONParser.h>
@ -158,22 +159,37 @@ DataTypePtr getDataTypeFromJSONFieldImpl(const Element & field)
{
auto object = field.getObject();
DataTypePtr value_type;
bool is_object = false;
for (const auto key_value_pair : object)
{
auto type = getDataTypeFromJSONFieldImpl(key_value_pair.second);
if (!type)
return nullptr;
continue;
if (value_type && value_type->getName() != type->getName())
return nullptr;
if (isObject(type))
{
is_object = true;
break;
}
value_type = type;
if (!value_type)
{
value_type = type;
}
else if (!value_type->equals(*type))
{
is_object = true;
break;
}
}
if (!value_type)
return nullptr;
if (is_object)
return std::make_shared<DataTypeObject>("json", false);
return std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), value_type);
if (value_type)
return std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), value_type);
return nullptr;
}
throw Exception{ErrorCodes::INCORRECT_DATA, "Unexpected JSON type"};

View File

@ -7,6 +7,8 @@
#include <Formats/ReadSchemaUtils.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Common/assert_cast.h>
#include <Interpreters/Context.h>
#include <Storages/IStorage.h>
namespace DB
{
@ -17,6 +19,28 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
static std::optional<NamesAndTypesList> getOrderedColumnsList(
const NamesAndTypesList & columns_list, const Names & columns_order_hint)
{
if (columns_list.size() != columns_order_hint.size())
return {};
std::unordered_map<String, DataTypePtr> available_columns;
for (const auto & [name, type] : columns_list)
available_columns.emplace(name, type);
NamesAndTypesList res;
for (const auto & name : columns_order_hint)
{
auto it = available_columns.find(name);
if (it == available_columns.end())
return {};
res.emplace_back(name, it->second);
}
return res;
}
ColumnsDescription readSchemaFromFormat(
const String & format_name,
const std::optional<FormatSettings> & format_settings,
@ -52,6 +76,22 @@ ColumnsDescription readSchemaFromFormat(
{
throw Exception(ErrorCodes::CANNOT_EXTRACT_TABLE_STRUCTURE, "Cannot extract table structure from {} format file. Error: {}", format_name, e.message());
}
/// If we have "INSERT SELECT" query then try to order
/// columns as they are ordered in table schema for formats
/// without strict column order (like JSON and TSKV).
/// It will allow to execute simple data loading with query
/// "INSERT INTO table SELECT * FROM ..."
const auto & insertion_table = context->getInsertionTable();
if (!schema_reader->hasStrictOrderOfColumns() && !insertion_table.empty())
{
auto storage = DatabaseCatalog::instance().getTable(insertion_table, context);
auto metadata = storage->getInMemoryMetadataPtr();
auto names_in_storage = metadata->getColumns().getNamesOfPhysical();
auto ordered_list = getOrderedColumnsList(names_and_types, names_in_storage);
if (ordered_list)
names_and_types = *ordered_list;
}
}
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{} file format doesn't support schema inference", format_name);

View File

@ -13,6 +13,7 @@ void registerFileSegmentationEngineCSV(FormatFactory & factory);
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory);
void registerFileSegmentationEngineRegexp(FormatFactory & factory);
void registerFileSegmentationEngineJSONAsString(FormatFactory & factory);
void registerFileSegmentationEngineJSONAsObject(FormatFactory & factory);
void registerFileSegmentationEngineJSONCompactEachRow(FormatFactory & factory);
/// Formats for both input/output.
@ -103,6 +104,7 @@ void registerProtobufSchemaReader(FormatFactory & factory);
void registerProtobufListSchemaReader(FormatFactory & factory);
void registerLineAsStringSchemaReader(FormatFactory & factory);
void registerJSONAsStringSchemaReader(FormatFactory & factory);
void registerJSONAsObjectSchemaReader(FormatFactory & factory);
void registerRawBLOBSchemaReader(FormatFactory & factory);
void registerMsgPackSchemaReader(FormatFactory & factory);
void registerCapnProtoSchemaReader(FormatFactory & factory);
@ -123,6 +125,7 @@ void registerFormats()
registerFileSegmentationEngineJSONEachRow(factory);
registerFileSegmentationEngineRegexp(factory);
registerFileSegmentationEngineJSONAsString(factory);
registerFileSegmentationEngineJSONAsObject(factory);
registerFileSegmentationEngineJSONCompactEachRow(factory);
registerInputFormatNative(factory);
@ -207,6 +210,7 @@ void registerFormats()
registerProtobufListSchemaReader(factory);
registerLineAsStringSchemaReader(factory);
registerJSONAsStringSchemaReader(factory);
registerJSONAsObjectSchemaReader(factory);
registerRawBLOBSchemaReader(factory);
registerMsgPackSchemaReader(factory);
registerCapnProtoSchemaReader(factory);

View File

@ -53,6 +53,7 @@
#include <DataTypes/DataTypeLowCardinality.h>
#include <Columns/ColumnLowCardinality.h>
#include <Interpreters/Context.h>
#include <Common/HashTable/HashMap.h>
namespace DB
@ -3140,52 +3141,138 @@ private:
}
}
WrapperType createTupleToObjectWrapper(const DataTypeTuple & from_tuple, bool has_nullable_subcolumns) const
{
if (!from_tuple.haveExplicitNames())
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Cast to Object can be performed only from flatten Named Tuple. Got: {}", from_tuple.getName());
PathsInData paths;
DataTypes from_types;
std::tie(paths, from_types) = flattenTuple(from_tuple.getPtr());
auto to_types = from_types;
for (auto & type : to_types)
{
if (isTuple(type) || isNested(type))
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Cast to Object can be performed only from flatten Named Tuple. Got: {}",
from_tuple.getName());
type = recursiveRemoveLowCardinality(type);
}
return [element_wrappers = getElementWrappers(from_types, to_types),
has_nullable_subcolumns, from_types, to_types, paths]
(ColumnsWithTypeAndName & arguments, const DataTypePtr &, const ColumnNullable * nullable_source, size_t input_rows_count)
{
size_t tuple_size = to_types.size();
auto flattened_column = flattenTuple(arguments.front().column);
const auto & column_tuple = assert_cast<const ColumnTuple &>(*flattened_column);
if (tuple_size != column_tuple.getColumns().size())
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Expected tuple with {} subcolumn, but got {} subcolumns",
tuple_size, column_tuple.getColumns().size());
auto res = ColumnObject::create(has_nullable_subcolumns);
for (size_t i = 0; i < tuple_size; ++i)
{
ColumnsWithTypeAndName element = {{column_tuple.getColumns()[i], from_types[i], "" }};
auto converted_column = element_wrappers[i](element, to_types[i], nullable_source, input_rows_count);
res->addSubcolumn(paths[i], converted_column->assumeMutable());
}
return res;
};
}
WrapperType createMapToObjectWrapper(const DataTypeMap & from_map, bool has_nullable_subcolumns) const
{
auto key_value_types = from_map.getKeyValueTypes();
if (!isStringOrFixedString(key_value_types[0]))
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Cast to Object from Map can be performed only from Map "
"with String or FixedString key. Got: {}", from_map.getName());
const auto & value_type = key_value_types[1];
auto to_value_type = value_type;
if (!has_nullable_subcolumns && value_type->isNullable())
to_value_type = removeNullable(value_type);
if (has_nullable_subcolumns && !value_type->isNullable())
to_value_type = makeNullable(value_type);
DataTypes to_key_value_types{std::make_shared<DataTypeString>(), std::move(to_value_type)};
auto element_wrappers = getElementWrappers(key_value_types, to_key_value_types);
return [has_nullable_subcolumns, element_wrappers, key_value_types, to_key_value_types]
(ColumnsWithTypeAndName & arguments, const DataTypePtr &, const ColumnNullable * nullable_source, size_t) -> ColumnPtr
{
const auto & column_map = assert_cast<const ColumnMap &>(*arguments.front().column);
const auto & offsets = column_map.getNestedColumn().getOffsets();
auto key_value_columns = column_map.getNestedData().getColumnsCopy();
for (size_t i = 0; i < 2; ++i)
{
ColumnsWithTypeAndName element{{key_value_columns[i], key_value_types[i], ""}};
key_value_columns[i] = element_wrappers[i](element, to_key_value_types[i], nullable_source, key_value_columns[i]->size());
}
const auto & key_column_str = assert_cast<const ColumnString &>(*key_value_columns[0]);
const auto & value_column = *key_value_columns[1];
using SubcolumnsMap = HashMap<StringRef, MutableColumnPtr, StringRefHash>;
SubcolumnsMap subcolumns;
for (size_t row = 0; row < offsets.size(); ++row)
{
for (size_t i = offsets[static_cast<ssize_t>(row) - 1]; i < offsets[row]; ++i)
{
auto ref = key_column_str.getDataAt(i);
bool inserted;
SubcolumnsMap::LookupResult it;
subcolumns.emplace(ref, it, inserted);
auto & subcolumn = it->getMapped();
if (inserted)
subcolumn = value_column.cloneEmpty()->cloneResized(row);
/// Map can have duplicated keys. We insert only first one.
if (subcolumn->size() == row)
subcolumn->insertFrom(value_column, i);
}
/// Insert default values for keys missed in current row.
for (const auto & [_, subcolumn] : subcolumns)
if (subcolumn->size() == row)
subcolumn->insertDefault();
}
auto column_object = ColumnObject::create(has_nullable_subcolumns);
for (auto && [key, subcolumn] : subcolumns)
{
PathInData path(key.toView());
column_object->addSubcolumn(path, std::move(subcolumn));
}
return column_object;
};
}
WrapperType createObjectWrapper(const DataTypePtr & from_type, const DataTypeObject * to_type) const
{
if (const auto * from_tuple = checkAndGetDataType<DataTypeTuple>(from_type.get()))
{
if (!from_tuple->haveExplicitNames())
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Cast to Object can be performed only from flatten Named Tuple. Got: {}", from_type->getName());
PathsInData paths;
DataTypes from_types;
std::tie(paths, from_types) = flattenTuple(from_type);
auto to_types = from_types;
for (auto & type : to_types)
{
if (isTuple(type) || isNested(type))
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Cast to Object can be performed only from flatten Named Tuple. Got: {}", from_type->getName());
type = recursiveRemoveLowCardinality(type);
}
return [element_wrappers = getElementWrappers(from_types, to_types),
has_nullable_subcolumns = to_type->hasNullableSubcolumns(), from_types, to_types, paths]
(ColumnsWithTypeAndName & arguments, const DataTypePtr &, const ColumnNullable * nullable_source, size_t input_rows_count)
{
size_t tuple_size = to_types.size();
auto flattened_column = flattenTuple(arguments.front().column);
const auto & column_tuple = assert_cast<const ColumnTuple &>(*flattened_column);
if (tuple_size != column_tuple.getColumns().size())
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Expected tuple with {} subcolumn, but got {} subcolumns",
tuple_size, column_tuple.getColumns().size());
auto res = ColumnObject::create(has_nullable_subcolumns);
for (size_t i = 0; i < tuple_size; ++i)
{
ColumnsWithTypeAndName element = {{column_tuple.getColumns()[i], from_types[i], "" }};
auto converted_column = element_wrappers[i](element, to_types[i], nullable_source, input_rows_count);
res->addSubcolumn(paths[i], converted_column->assumeMutable());
}
return res;
};
return createTupleToObjectWrapper(*from_tuple, to_type->hasNullableSubcolumns());
}
else if (const auto * from_map = checkAndGetDataType<DataTypeMap>(from_type.get()))
{
return createMapToObjectWrapper(*from_map, to_type->hasNullableSubcolumns());
}
else if (checkAndGetDataType<DataTypeString>(from_type.get()))
{
@ -3199,7 +3286,7 @@ private:
}
throw Exception(ErrorCodes::TYPE_MISMATCH,
"Cast to Object can be performed only from flatten named tuple or string. Got: {}", from_type->getName());
"Cast to Object can be performed only from flatten named Tuple, Map or String. Got: {}", from_type->getName());
}
template <typename FieldType>

View File

@ -43,6 +43,9 @@ public:
for (size_t i = 2; i < args.size() - 1; i += 2)
dst_array_types.push_back(args[i]);
// Type of the ELSE branch
dst_array_types.push_back(args.back());
return getLeastSupertype(dst_array_types);
}

155
src/Functions/makeDate.cpp Normal file
View File

@ -0,0 +1,155 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Interpreters/castColumn.h>
#include <Common/DateLUT.h>
#include <Common/typeid_cast.h>
#include <array>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace
{
// A helper function to simplify comparisons of valid YYYY-MM-DD values for <,>,=
inline constexpr Int64 YearMonthDayToSingleInt(Int64 year, Int64 month, Int64 day)
{
return year * 512 + month * 32 + day;
}
// Common implementation for makeDate, makeDate32
template <typename Traits>
class FunctionMakeDate : public IFunction
{
private:
static constexpr std::array<const char*, 3> argument_names = {"year", "month", "day"};
public:
static constexpr auto name = Traits::name;
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionMakeDate>(); }
String getName() const override { return name; }
bool isVariadic() const override { return false; }
size_t getNumberOfArguments() const override { return argument_names.size(); }
bool isInjective(const ColumnsWithTypeAndName &) const override
{
return false; // {year,month,day} that are out of supported range are converted into a default value
}
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
bool useDefaultImplementationForNulls() const override { return true; }
bool useDefaultImplementationForConstants() const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
if (arguments.size() != argument_names.size())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Function {} requires 3 arguments, but {} given", getName(), arguments.size());
for (size_t i = 0; i < argument_names.size(); ++i)
{
DataTypePtr argument_type = arguments[i];
if (!isNumber(argument_type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Argument '{}' for function {} must be number", std::string(argument_names[i]), getName());
}
return std::make_shared<typename Traits::ReturnDataType>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const DataTypePtr converted_argument_type = std::make_shared<DataTypeFloat32>();
Columns converted_arguments;
converted_arguments.reserve(arguments.size());
for (const auto & argument : arguments)
{
ColumnPtr argument_column = castColumn(argument, converted_argument_type);
argument_column = argument_column->convertToFullColumnIfConst();
converted_arguments.push_back(argument_column);
}
auto res_column = Traits::ReturnColumnType::create(input_rows_count);
auto & result_data = res_column->getData();
const auto & year_data = typeid_cast<const ColumnFloat32 &>(*converted_arguments[0]).getData();
const auto & month_data = typeid_cast<const ColumnFloat32 &>(*converted_arguments[1]).getData();
const auto & day_data = typeid_cast<const ColumnFloat32 &>(*converted_arguments[2]).getData();
const auto & date_lut = DateLUT::instance();
for (size_t i = 0; i < input_rows_count; ++i)
{
const auto year = year_data[i];
const auto month = month_data[i];
const auto day = day_data[i];
Int32 day_num = 0;
if (year >= Traits::MIN_YEAR &&
year <= Traits::MAX_YEAR &&
month >= 1 && month <= 12 &&
day >= 1 && day <= 31 &&
YearMonthDayToSingleInt(year, month, day) <= Traits::MAX_DATE)
{
day_num = date_lut.makeDayNum(year, month, day);
}
result_data[i] = day_num;
}
return res_column;
}
};
// makeDate(year, month, day)
struct MakeDateTraits
{
static constexpr auto name = "makeDate";
using ReturnDataType = DataTypeDate;
using ReturnColumnType = ColumnUInt16;
static constexpr auto MIN_YEAR = 1970;
static constexpr auto MAX_YEAR = 2149;
// This date has the maximum day number that fits in 16-bit uint
static constexpr auto MAX_DATE = YearMonthDayToSingleInt(MAX_YEAR, 6, 6);
};
// makeDate32(year, month, day)
struct MakeDate32Traits
{
static constexpr auto name = "makeDate32";
using ReturnDataType = DataTypeDate32;
using ReturnColumnType = ColumnInt32;
static constexpr auto MIN_YEAR = 1925;
static constexpr auto MAX_YEAR = 2283;
static constexpr auto MAX_DATE = YearMonthDayToSingleInt(MAX_YEAR, 11, 11);
};
}
void registerFunctionsMakeDate(FunctionFactory & factory)
{
factory.registerFunction<FunctionMakeDate<MakeDateTraits>>();
factory.registerFunction<FunctionMakeDate<MakeDate32Traits>>();
}
}

View File

@ -8,6 +8,7 @@ namespace DB
void registerFunctionsArithmetic(FunctionFactory &);
void registerFunctionsArray(FunctionFactory &);
void registerFunctionsTuple(FunctionFactory &);
void registerFunctionsMakeDate(FunctionFactory &);
void registerFunctionsMap(FunctionFactory &);
void registerFunctionsBitmap(FunctionFactory &);
void registerFunctionsBinaryRepr(FunctionFactory &);
@ -73,6 +74,7 @@ void registerFunctions()
registerFunctionsArithmetic(factory);
registerFunctionsArray(factory);
registerFunctionsTuple(factory);
registerFunctionsMakeDate(factory);
registerFunctionsMap(factory);
registerFunctionsBitmap(factory);
registerFunctionsBinaryRepr(factory);

View File

@ -39,6 +39,10 @@ public:
{
}
virtual ~ReadBufferFromFileDescriptor() override
{
}
int getFD() const
{
return fd;
@ -80,6 +84,9 @@ public:
{
use_pread = true;
}
virtual ~ReadBufferFromFileDescriptorPRead() override
{
}
};
}

View File

@ -15,4 +15,11 @@ public:
explicit ReadBufferFromString(std::string_view s) : ReadBufferFromMemory(s.data(), s.size()) {}
};
class ReadBufferFromOwnString : public String, public ReadBufferFromString
{
public:
explicit ReadBufferFromOwnString(const String & s_): String(s_), ReadBufferFromString(*this) {}
};
}

View File

@ -663,7 +663,7 @@ public:
Range range{from, to};
from = to;
return std::move(range);
return range;
}
private:

View File

@ -2,18 +2,15 @@
#include <iostream>
#include <base/types.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <base/types.h>
int main(int, char **)
int readAndPrint(DB::ReadBuffer & in)
{
try
{
std::string s = "-123456 123.456 вася пе\\tтя\t'\\'xyz\\\\'";
DB::ReadBufferFromString in(s);
DB::Int64 a;
DB::Float64 b;
DB::String c, d;
@ -31,12 +28,32 @@ int main(int, char **)
std::cout << a << ' ' << b << ' ' << c << '\t' << '\'' << d << '\'' << std::endl;
std::cout << in.count() << std::endl;
return 0;
}
catch (const DB::Exception & e)
{
std::cerr << e.what() << ", " << e.displayText() << std::endl;
return 1;
}
}
int main(int, char **)
{
{
std::string s = "-123456 123.456 вася пе\\tтя\t'\\'xyz\\\\'";
DB::ReadBufferFromString in(s);
if (readAndPrint(in))
std::cout << "readAndPrint from ReadBufferFromString failed" << std::endl;
}
std::shared_ptr<DB::ReadBufferFromOwnString> in;
{
std::string s = "-123456 123.456 вася пе\\tтя\t'\\'xyz\\\\'";
in = std::make_shared<DB::ReadBufferFromOwnString>(s);
}
if (readAndPrint(*in))
std::cout << "readAndPrint from ReadBufferFromOwnString failed" << std::endl;
return 0;
}

View File

@ -32,6 +32,7 @@ namespace CurrentMetrics
namespace ProfileEvents
{
extern const Event AsyncInsertQuery;
extern const Event AsyncInsertBytes;
}
namespace DB
@ -222,7 +223,9 @@ void AsynchronousInsertQueue::pushImpl(InsertData::EntryPtr entry, QueueIterator
if (!data)
data = std::make_unique<InsertData>();
data->size += entry->bytes.size();
size_t entry_data_size = entry->bytes.size();
data->size += entry_data_size;
data->last_update = std::chrono::steady_clock::now();
data->entries.emplace_back(entry);
@ -239,6 +242,7 @@ void AsynchronousInsertQueue::pushImpl(InsertData::EntryPtr entry, QueueIterator
CurrentMetrics::add(CurrentMetrics::PendingAsyncInsert);
ProfileEvents::increment(ProfileEvents::AsyncInsertQuery);
ProfileEvents::increment(ProfileEvents::AsyncInsertBytes, entry_data_size);
}
void AsynchronousInsertQueue::waitForProcessingQuery(const String & query_id, const Milliseconds & timeout)

View File

@ -12,6 +12,7 @@
#include <Storages/MarkCache.h>
#include <Storages/StorageMergeTree.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeMetadataCache.h>
#include <IO/UncompressedCache.h>
#include <IO/MMappedFileCache.h>
#include <IO/ReadHelpers.h>
@ -607,6 +608,15 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
}
}
#if USE_ROCKSDB
{
if (auto metadata_cache = getContext()->tryGetMergeTreeMetadataCache())
{
new_values["MergeTreeMetadataCacheSize"] = metadata_cache->getEstimateNumKeys();
}
}
#endif
#if USE_EMBEDDED_COMPILER
{
if (auto * compiled_expression_cache = CompiledExpressionCacheFactory::instance().tryGetCache())
@ -617,6 +627,7 @@ void AsynchronousMetrics::update(std::chrono::system_clock::time_point update_ti
}
#endif
new_values["Uptime"] = getContext()->getUptimeSeconds();
/// Process process memory usage according to OS

View File

@ -85,11 +85,15 @@
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/MergeTreeBackgroundExecutor.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <Storages/MergeTree/MergeTreeMetadataCache.h>
#include <Interpreters/SynonymsExtensions.h>
#include <Interpreters/Lemmatizers.h>
#include <Interpreters/ClusterDiscovery.h>
#include <filesystem>
#if USE_ROCKSDB
#include <rocksdb/table.h>
#endif
namespace fs = std::filesystem;
@ -276,6 +280,11 @@ struct ContextSharedPart
Context::ConfigReloadCallback config_reload_callback;
#if USE_ROCKSDB
/// Global merge tree metadata cache, stored in rocksdb.
MergeTreeMetadataCachePtr merge_tree_metadata_cache;
#endif
ContextSharedPart()
: access_control(std::make_unique<AccessControl>())
, global_overcommit_tracker(&process_list)
@ -410,6 +419,15 @@ struct ContextSharedPart
trace_collector.reset();
/// Stop zookeeper connection
zookeeper.reset();
#if USE_ROCKSDB
/// Shutdown merge tree metadata cache
if (merge_tree_metadata_cache)
{
merge_tree_metadata_cache->shutdown();
merge_tree_metadata_cache.reset();
}
#endif
}
/// Can be removed w/o context lock
@ -2048,6 +2066,23 @@ zkutil::ZooKeeperPtr Context::getAuxiliaryZooKeeper(const String & name) const
return zookeeper->second;
}
#if USE_ROCKSDB
MergeTreeMetadataCachePtr Context::getMergeTreeMetadataCache() const
{
auto cache = tryGetMergeTreeMetadataCache();
if (!cache)
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"Merge tree metadata cache is not initialized, please add config merge_tree_metadata_cache in config.xml and restart");
return cache;
}
MergeTreeMetadataCachePtr Context::tryGetMergeTreeMetadataCache() const
{
return shared->merge_tree_metadata_cache;
}
#endif
void Context::resetZooKeeper() const
{
std::lock_guard lock(shared->zookeeper_mutex);
@ -2291,6 +2326,13 @@ void Context::initializeTraceCollector()
shared->initializeTraceCollector(getTraceLog());
}
#if USE_ROCKSDB
void Context::initializeMergeTreeMetadataCache(const String & dir, size_t size)
{
shared->merge_tree_metadata_cache = MergeTreeMetadataCache::create(dir, size);
}
#endif
bool Context::hasTraceCollector() const
{
return shared->hasTraceCollector();

View File

@ -16,6 +16,7 @@
#include <base/types.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include "config_core.h"
#include <boost/container/flat_set.hpp>
@ -152,6 +153,12 @@ using ReadTaskCallback = std::function<String()>;
using MergeTreeReadTaskCallback = std::function<std::optional<PartitionReadResponse>(PartitionReadRequest)>;
#if USE_ROCKSDB
class MergeTreeMetadataCache;
using MergeTreeMetadataCachePtr = std::shared_ptr<MergeTreeMetadataCache>;
#endif
/// An empty interface for an arbitrary object that may be attached by a shared pointer
/// to query context, when using ClickHouse as a library.
struct IHostContext
@ -179,6 +186,7 @@ private:
std::unique_ptr<ContextSharedPart> shared;
};
/** A set of known objects that can be used in the query.
* Consists of a shared part (always common to all sessions and queries)
* and copied part (which can be its own for each session or query).
@ -680,6 +688,11 @@ public:
UInt32 getZooKeeperSessionUptime() const;
#if USE_ROCKSDB
MergeTreeMetadataCachePtr getMergeTreeMetadataCache() const;
MergeTreeMetadataCachePtr tryGetMergeTreeMetadataCache() const;
#endif
#if USE_NURAFT
std::shared_ptr<KeeperDispatcher> & getKeeperDispatcher() const;
#endif
@ -769,6 +782,10 @@ public:
/// Call after initialization before using trace collector.
void initializeTraceCollector();
#if USE_ROCKSDB
void initializeMergeTreeMetadataCache(const String & dir, size_t size);
#endif
bool hasTraceCollector() const;
/// Nullptr if the query log is not ready for this moment.

View File

@ -100,20 +100,9 @@ bool checkPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_q
{
auto columns = select_query->select()->children;
const auto * group_by_expr_with_alias = dynamic_cast<const ASTWithAlias *>(argument.get());
if (group_by_expr_with_alias && !group_by_expr_with_alias->alias.empty())
{
for (const auto & column : columns)
{
const auto * col_with_alias = dynamic_cast<const ASTWithAlias *>(column.get());
if (col_with_alias)
{
const auto & alias = col_with_alias->alias;
if (!alias.empty() && alias == group_by_expr_with_alias->alias)
return false;
}
}
}
const auto * expr_with_alias = dynamic_cast<const ASTWithAlias *>(argument.get());
if (expr_with_alias && !expr_with_alias->alias.empty())
return false;
const auto * ast_literal = typeid_cast<const ASTLiteral *>(argument.get());
if (!ast_literal)
@ -130,7 +119,7 @@ bool checkPositionalArguments(ASTPtr & argument, const ASTSelectQuery * select_q
pos, columns.size());
const auto & column = columns[--pos];
if (typeid_cast<const ASTIdentifier *>(column.get()))
if (typeid_cast<const ASTIdentifier *>(column.get()) || typeid_cast<const ASTLiteral *>(column.get()))
{
argument = column->clone();
}
@ -1324,7 +1313,9 @@ ActionsDAGPtr SelectQueryExpressionAnalyzer::appendOrderBy(ExpressionActionsChai
throw Exception("Bad ORDER BY expression AST", ErrorCodes::UNKNOWN_TYPE_OF_AST_NODE);
if (getContext()->getSettingsRef().enable_positional_arguments)
{
replaceForPositionalArguments(ast->children.at(0), select_query, ASTSelectQuery::Expression::ORDER_BY);
}
}
getRootActions(select_query->orderBy(), only_types, step.actions());

View File

@ -962,18 +962,29 @@ public:
/// If it's joinGetOrNull, we need to wrap not-nullable columns in StorageJoin.
for (size_t j = 0, size = right_indexes.size(); j < size; ++j)
{
const auto & column = *block.getByPosition(right_indexes[j]).column;
if (auto * nullable_col = typeid_cast<ColumnNullable *>(columns[j].get()); nullable_col && !column.isNullable())
nullable_col->insertFromNotNullable(column, row_num);
auto column_from_block = block.getByPosition(right_indexes[j]);
if (type_name[j].type->lowCardinality() != column_from_block.type->lowCardinality())
{
JoinCommon::changeLowCardinalityInplace(column_from_block);
}
if (auto * nullable_col = typeid_cast<ColumnNullable *>(columns[j].get());
nullable_col && !column_from_block.column->isNullable())
nullable_col->insertFromNotNullable(*column_from_block.column, row_num);
else
columns[j]->insertFrom(column, row_num);
columns[j]->insertFrom(*column_from_block.column, row_num);
}
}
else
{
for (size_t j = 0, size = right_indexes.size(); j < size; ++j)
{
columns[j]->insertFrom(*block.getByPosition(right_indexes[j]).column, row_num);
auto column_from_block = block.getByPosition(right_indexes[j]);
if (type_name[j].type->lowCardinality() != column_from_block.type->lowCardinality())
{
JoinCommon::changeLowCardinalityInplace(column_from_block);
}
columns[j]->insertFrom(*column_from_block.column, row_num);
}
}
}
@ -1013,6 +1024,7 @@ private:
void addColumn(const ColumnWithTypeAndName & src_column, const std::string & qualified_name)
{
columns.push_back(src_column.column->cloneEmpty());
columns.back()->reserve(src_column.column->size());
type_name.emplace_back(src_column.type, src_column.name, qualified_name);

View File

@ -358,6 +358,7 @@ BlockIO InterpreterInsertQuery::execute()
auto new_context = Context::createCopy(context);
new_context->setSettings(new_settings);
new_context->setInsertionTable(getContext()->getInsertionTable());
InterpreterSelectWithUnionQuery interpreter_select{
query.select, new_context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};

View File

@ -150,12 +150,12 @@ static ColumnsDescription createColumnsDescription(const NamesAndTypesList & col
ColumnsDescription columns_description;
for (
auto [column_name_and_type, declare_column_ast] = std::tuple{columns_name_and_type.begin(), columns_definition->children.begin()};
column_name_and_type != columns_name_and_type.end();
column_name_and_type++,
declare_column_ast++
)
/// FIXME: we could write it like auto [a, b] = std::tuple(x, y),
/// but this produce endless recursion in gcc-11, and leads to SIGSEGV
/// (see git blame for details).
auto column_name_and_type = columns_name_and_type.begin();
auto declare_column_ast = columns_definition->children.begin();
for (; column_name_and_type != columns_name_and_type.end(); column_name_and_type++, declare_column_ast++)
{
const auto & declare_column = (*declare_column_ast)->as<MySQLParser::ASTDeclareColumn>();
String comment;

View File

@ -351,15 +351,6 @@ public:
max_size = max_size_;
}
// Before calling this method you should be sure
// that lock is acquired.
template <typename F>
void processEachQueryStatus(F && func) const
{
for (auto && query : processes)
func(query);
}
void setMaxInsertQueriesAmount(size_t max_insert_queries_amount_)
{
std::lock_guard lock(mutex);

View File

@ -192,7 +192,7 @@ private:
using Result = Element;
static TKey & extractKey(Element & elem) { return elem.value; }
static Element extractResult(Element & elem) { return elem; }
static Result extractResult(Element & elem) { return elem; }
};
if constexpr (is_descending)

View File

@ -379,7 +379,7 @@ void SystemLog<LogElement>::flushImpl(const std::vector<LogElement> & to_flush,
for (const auto & name_and_type : log_element_names_and_types)
log_element_columns.emplace_back(name_and_type.type, name_and_type.name);
Block block(std::move(log_element_columns));
Block block(log_element_columns);
MutableColumns columns = block.mutateColumns();
for (const auto & elem : to_flush)

View File

@ -345,7 +345,10 @@ void replaceWithSumCount(String column_name, ASTFunction & func)
{
/// Rewrite "avg" to sumCount().1 / sumCount().2
auto new_arg1 = makeASTFunction("tupleElement", func_base, std::make_shared<ASTLiteral>(UInt8(1)));
auto new_arg2 = makeASTFunction("tupleElement", func_base, std::make_shared<ASTLiteral>(UInt8(2)));
auto new_arg2 = makeASTFunction("CAST",
makeASTFunction("tupleElement", func_base, std::make_shared<ASTLiteral>(UInt8(2))),
std::make_shared<ASTLiteral>("Float64"));
func.name = "divide";
exp_list->children.push_back(new_arg1);
exp_list->children.push_back(new_arg2);

View File

@ -607,6 +607,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
if (async_insert)
{
quota = context->getQuota();
if (quota)
{
quota->used(QuotaType::QUERY_INSERTS, 1);
quota->used(QuotaType::QUERIES, 1);
quota->checkExceeded(QuotaType::ERRORS);
}
queue->push(ast, context);
if (settings.wait_for_async_insert)
@ -617,13 +625,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
res.pipeline = QueryPipeline(Pipe(std::move(source)));
}
quota = context->getQuota();
if (quota)
{
quota->used(QuotaType::QUERY_INSERTS, 1);
quota->used(QuotaType::QUERIES, 1);
}
const auto & table_id = insert_query->table_id;
if (!table_id.empty())
context->setInsertionTable(table_id);
@ -656,6 +657,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
limits.size_limits = SizeLimits(settings.max_result_rows, settings.max_result_bytes, settings.result_overflow_mode);
}
if (const auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
{
/// Save insertion table (not table function). TODO: support remote() table function.
auto table_id = insert_interpreter->getDatabaseTable();
if (!table_id.empty())
context->setInsertionTable(std::move(table_id));
}
{
std::unique_ptr<OpenTelemetrySpanHolder> span;
if (context->query_trace_context.trace_id != UUID())
@ -666,14 +675,6 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
res = interpreter->execute();
}
if (const auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
{
/// Save insertion table (not table function). TODO: support remote() table function.
auto table_id = insert_interpreter->getDatabaseTable();
if (!table_id.empty())
context->setInsertionTable(std::move(table_id));
}
}
if (process_list_entry)

View File

@ -326,9 +326,10 @@ ColumnRawPtrMap materializeColumnsInplaceMap(Block & block, const Names & names)
for (const auto & column_name : names)
{
auto & column = block.getByName(column_name).column;
column = recursiveRemoveLowCardinality(column->convertToFullColumnIfConst());
ptrs[column_name] = column.get();
auto & column = block.getByName(column_name);
column.column = recursiveRemoveLowCardinality(column.column->convertToFullColumnIfConst());
column.type = recursiveRemoveLowCardinality(column.type);
ptrs[column_name] = column.column.get();
}
return ptrs;

View File

@ -25,7 +25,7 @@ CallbackRunner threadPoolCallbackRunner(ThreadPool & pool)
/// Usually it could be ok, because thread pool task is executed before user-level memory tracker is destroyed.
/// However, thread could stay alive inside the thread pool, and it's ThreadStatus as well.
/// When, finally, we destroy the thread (and the ThreadStatus),
/// it can use memory tracker in the ~ThreadStatus in order to alloc/free untracked_memory,\
/// it can use memory tracker in the ~ThreadStatus in order to alloc/free untracked_memory,
/// and by this time user-level memory tracker may be already destroyed.
///
/// As a work-around, reset memory tracker to total, which is always alive.

View File

@ -18,6 +18,10 @@ public:
virtual NamesAndTypesList readSchema() = 0;
/// True if order of columns is important in format.
/// Exceptions: JSON, TSKV.
virtual bool hasStrictOrderOfColumns() const { return true; }
virtual ~ISchemaReader() = default;
protected:
@ -60,6 +64,7 @@ class IRowWithNamesSchemaReader : public ISchemaReader
public:
IRowWithNamesSchemaReader(ReadBuffer & in_, size_t max_rows_to_read_, DataTypePtr default_type_ = nullptr);
NamesAndTypesList readSchema() override;
bool hasStrictOrderOfColumns() const override { return false; }
protected:
/// Read one row and determine types of columns in it.

View File

@ -228,6 +228,14 @@ void registerNonTrivialPrefixAndSuffixCheckerJSONAsString(FormatFactory & factor
factory.registerNonTrivialPrefixAndSuffixChecker("JSONAsString", nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl);
}
void registerJSONAsStringSchemaReader(FormatFactory & factory)
{
factory.registerExternalSchemaReader("JSONAsString", [](const FormatSettings &)
{
return std::make_shared<JSONAsStringExternalSchemaReader>();
});
}
void registerInputFormatJSONAsObject(FormatFactory & factory)
{
factory.registerInputFormat("JSONAsObject", [](
@ -245,11 +253,16 @@ void registerNonTrivialPrefixAndSuffixCheckerJSONAsObject(FormatFactory & factor
factory.registerNonTrivialPrefixAndSuffixChecker("JSONAsObject", nonTrivialPrefixAndSuffixCheckerJSONEachRowImpl);
}
void registerJSONAsStringSchemaReader(FormatFactory & factory)
void registerFileSegmentationEngineJSONAsObject(FormatFactory & factory)
{
factory.registerExternalSchemaReader("JSONAsString", [](const FormatSettings &)
factory.registerFileSegmentationEngine("JSONAsObject", &fileSegmentationEngineJSONEachRow);
}
void registerJSONAsObjectSchemaReader(FormatFactory & factory)
{
factory.registerExternalSchemaReader("JSONAsObject", [](const FormatSettings &)
{
return std::make_shared<JSONAsStringExternalSchemaReader>();
return std::make_shared<JSONAsObjectExternalSchemaReader>();
});
}

View File

@ -5,6 +5,7 @@
#include <Formats/FormatFactory.h>
#include <IO/PeekableReadBuffer.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeObject.h>
namespace DB
{
@ -73,4 +74,13 @@ public:
}
};
class JSONAsObjectExternalSchemaReader : public IExternalSchemaReader
{
public:
NamesAndTypesList readSchema() override
{
return {{"json", std::make_shared<DataTypeObject>("json", false)}};
}
};
}

View File

@ -1,16 +1,22 @@
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <IO/WriteBuffer.h>
#include <IO/Operators.h>
#include <stack>
#include <Common/JSONBuilder.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ArrayJoinAction.h>
#include <stack>
#include <IO/Operators.h>
#include <IO/WriteBuffer.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/QueryPlan/ReadFromMergeTree.h>
#include <Common/JSONBuilder.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace DB
{
@ -388,6 +394,7 @@ void QueryPlan::explainPlan(WriteBuffer & buffer, const ExplainPlanOptions & opt
static void explainPipelineStep(IQueryPlanStep & step, IQueryPlanStep::FormatSettings & settings)
{
settings.out << String(settings.offset, settings.indent_char) << "(" << step.getName() << ")\n";
size_t current_offset = settings.offset;
step.describePipeline(settings);
if (current_offset == settings.offset)

View File

@ -112,6 +112,9 @@ ReadFromMergeTree::ReadFromMergeTree(
if (enable_parallel_reading)
read_task_callback = context->getMergeTreeReadTaskCallback();
/// Add explicit description.
setStepDescription(data.getStorageID().getFullNameNotQuoted());
}
Pipe ReadFromMergeTree::readFromPool(

View File

@ -100,7 +100,8 @@ public:
bool enable_parallel_reading
);
String getName() const override { return "ReadFromMergeTree"; }
static constexpr auto name = "ReadFromMergeTree";
String getName() const override { return name; }
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;

View File

@ -488,7 +488,7 @@ auto WindowTransform::moveRowNumberNoCheck(const RowNumber & _x, int64_t offset)
}
}
return std::tuple{x, offset};
return std::tuple<RowNumber, int64_t>{x, offset};
}
auto WindowTransform::moveRowNumber(const RowNumber & _x, int64_t offset) const
@ -505,7 +505,7 @@ auto WindowTransform::moveRowNumber(const RowNumber & _x, int64_t offset) const
assert(oo == 0);
#endif
return std::tuple{x, o};
return std::tuple<RowNumber, int64_t>{x, o};
}

View File

@ -122,7 +122,7 @@ void ColumnDescription::readText(ReadBuffer & buf)
if (col_ast->default_expression)
{
default_desc.kind = columnDefaultKindFromString(col_ast->default_specifier);
default_desc.expression = std::move(col_ast->default_expression);
default_desc.expression = col_ast->default_expression;
}
if (col_ast->comment)

View File

@ -325,6 +325,7 @@ void URLBasedDataSourceConfiguration::set(const URLBasedDataSourceConfiguration
compression_method = conf.compression_method;
structure = conf.structure;
http_method = conf.http_method;
headers = conf.headers;
}
@ -364,6 +365,10 @@ std::optional<URLBasedDataSourceConfig> getURLBasedDataSourceConfiguration(const
{
configuration.structure = config.getString(config_prefix + ".structure", "");
}
else if (key == "compression_method")
{
configuration.compression_method = config.getString(config_prefix + ".compression_method", "");
}
else if (key == "headers")
{
Poco::Util::AbstractConfiguration::Keys header_keys;

View File

@ -114,6 +114,12 @@ struct StorageS3Configuration : URLBasedDataSourceConfiguration
String secret_access_key;
};
struct StorageS3ClusterConfiguration : StorageS3Configuration
{
String cluster_name;
};
struct URLBasedDataSourceConfig
{
URLBasedDataSourceConfiguration configuration;

View File

@ -1,9 +1,11 @@
#include "IMergeTreeDataPart.h"
#include <optional>
#include <boost/algorithm/string/join.hpp>
#include <string_view>
#include <Core/Defines.h>
#include <IO/HashingWriteBuffer.h>
#include <IO/HashingReadBuffer.h>
#include <IO/ReadBufferFromString.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
@ -11,6 +13,8 @@
#include <Storages/MergeTree/localBackup.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/PartMetadataManagerOrdinary.h>
#include <Storages/MergeTree/PartMetadataManagerWithCache.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h>
#include <Common/ZooKeeper/ZooKeeper.h>
@ -61,13 +65,7 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
static std::unique_ptr<ReadBufferFromFileBase> openForReading(const DiskPtr & disk, const String & path)
{
size_t file_size = disk->getFileSize(path);
return disk->readFile(path, ReadSettings().adjustBufferSize(file_size), file_size);
}
void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const DiskPtr & disk_, const String & part_path)
void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const PartMetadataManagerPtr & manager)
{
auto metadata_snapshot = data.getInMemoryMetadataPtr();
const auto & partition_key = metadata_snapshot->getPartitionKey();
@ -79,8 +77,8 @@ void IMergeTreeDataPart::MinMaxIndex::load(const MergeTreeData & data, const Dis
hyperrectangle.reserve(minmax_idx_size);
for (size_t i = 0; i < minmax_idx_size; ++i)
{
String file_name = fs::path(part_path) / ("minmax_" + escapeForFileName(minmax_column_names[i]) + ".idx");
auto file = openForReading(disk_, file_name);
String file_name = "minmax_" + escapeForFileName(minmax_column_names[i]) + ".idx";
auto file = manager->read(file_name);
auto serialization = minmax_column_types[i]->getDefaultSerialization();
Field min_val;
@ -192,6 +190,19 @@ void IMergeTreeDataPart::MinMaxIndex::merge(const MinMaxIndex & other)
}
}
void IMergeTreeDataPart::MinMaxIndex::appendFiles(const MergeTreeData & data, Strings & files)
{
auto metadata_snapshot = data.getInMemoryMetadataPtr();
const auto & partition_key = metadata_snapshot->getPartitionKey();
auto minmax_column_names = data.getMinMaxColumnsNames(partition_key);
size_t minmax_idx_size = minmax_column_names.size();
for (size_t i = 0; i < minmax_idx_size; ++i)
{
String file_name = "minmax_" + escapeForFileName(minmax_column_names[i]) + ".idx";
files.push_back(file_name);
}
}
static void incrementStateMetric(IMergeTreeDataPart::State state)
{
@ -299,6 +310,7 @@ IMergeTreeDataPart::IMergeTreeDataPart(
, index_granularity_info(storage_, part_type_)
, part_type(part_type_)
, parent_part(parent_part_)
, use_metadata_cache(storage.use_metadata_cache)
{
if (parent_part)
state = State::Active;
@ -306,6 +318,8 @@ IMergeTreeDataPart::IMergeTreeDataPart(
incrementTypeMetric(part_type);
minmax_idx = std::make_shared<MinMaxIndex>();
initializePartMetadataManager();
}
IMergeTreeDataPart::IMergeTreeDataPart(
@ -324,6 +338,7 @@ IMergeTreeDataPart::IMergeTreeDataPart(
, index_granularity_info(storage_, part_type_)
, part_type(part_type_)
, parent_part(parent_part_)
, use_metadata_cache(storage.use_metadata_cache)
{
if (parent_part)
state = State::Active;
@ -331,6 +346,8 @@ IMergeTreeDataPart::IMergeTreeDataPart(
incrementTypeMetric(part_type);
minmax_idx = std::make_shared<MinMaxIndex>();
initializePartMetadataManager();
}
IMergeTreeDataPart::~IMergeTreeDataPart()
@ -637,6 +654,33 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
loadDefaultCompressionCodec();
}
void IMergeTreeDataPart::appendFilesOfColumnsChecksumsIndexes(Strings & files, bool include_projection) const
{
if (isStoredOnDisk())
{
appendFilesOfUUID(files);
appendFilesOfColumns(files);
appendFilesOfChecksums(files);
appendFilesOfIndexGranularity(files);
appendFilesOfIndex(files);
appendFilesOfRowsCount(files);
appendFilesOfPartitionAndMinMaxIndex(files);
appendFilesOfTTLInfos(files);
appendFilesOfDefaultCompressionCodec(files);
}
if (!parent_part && include_projection)
{
for (const auto & [projection_name, projection_part] : projection_parts)
{
Strings projection_files;
projection_part->appendFilesOfColumnsChecksumsIndexes(projection_files, true);
for (const auto & projection_file : projection_files)
files.push_back(fs::path(projection_part->relative_path) / projection_file);
}
}
}
void IMergeTreeDataPart::loadProjections(bool require_columns_checksums, bool check_consistency)
{
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
@ -657,6 +701,11 @@ void IMergeTreeDataPart::loadIndexGranularity()
throw Exception("Method 'loadIndexGranularity' is not implemented for part with type " + getType().toString(), ErrorCodes::NOT_IMPLEMENTED);
}
/// Currently we don't cache mark files of part, because cache other meta files is enough to speed up loading.
void IMergeTreeDataPart::appendFilesOfIndexGranularity(Strings & /* files */) const
{
}
void IMergeTreeDataPart::loadIndex()
{
/// It can be empty in case of mutations
@ -680,9 +729,9 @@ void IMergeTreeDataPart::loadIndex()
loaded_index[i]->reserve(index_granularity.getMarksCount());
}
String index_path = fs::path(getFullRelativePath()) / "primary.idx";
auto index_file = openForReading(volume->getDisk(), index_path);
String index_name = "primary.idx";
String index_path = fs::path(getFullRelativePath()) / index_name;
auto index_file = metadata_manager->read(index_name);
size_t marks_count = index_granularity.getMarksCount();
Serializations key_serializations(key_size);
@ -709,6 +758,19 @@ void IMergeTreeDataPart::loadIndex()
}
}
void IMergeTreeDataPart::appendFilesOfIndex(Strings & files) const
{
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
if (parent_part)
metadata_snapshot = metadata_snapshot->projections.has(name) ? metadata_snapshot->projections.get(name).metadata : nullptr;
if (!metadata_snapshot)
return;
if (metadata_snapshot->hasPrimaryKey())
files.push_back("primary.idx");
}
NameSet IMergeTreeDataPart::getFileNamesWithoutChecksums() const
{
if (!isStoredOnDisk())
@ -733,14 +795,14 @@ void IMergeTreeDataPart::loadDefaultCompressionCodec()
}
String path = fs::path(getFullRelativePath()) / DEFAULT_COMPRESSION_CODEC_FILE_NAME;
if (!volume->getDisk()->exists(path))
bool exists = metadata_manager->exists(DEFAULT_COMPRESSION_CODEC_FILE_NAME);
if (!exists)
{
default_codec = detectDefaultCompressionCodec();
}
else
{
auto file_buf = openForReading(volume->getDisk(), path);
auto file_buf = metadata_manager->read(DEFAULT_COMPRESSION_CODEC_FILE_NAME);
String codec_line;
readEscapedStringUntilEOL(codec_line, *file_buf);
@ -748,7 +810,13 @@ void IMergeTreeDataPart::loadDefaultCompressionCodec()
if (!checkString("CODEC", buf))
{
LOG_WARNING(storage.log, "Cannot parse default codec for part {} from file {}, content '{}'. Default compression codec will be deduced automatically, from data on disk", name, path, codec_line);
LOG_WARNING(
storage.log,
"Cannot parse default codec for part {} from file {}, content '{}'. Default compression codec will be deduced "
"automatically, from data on disk",
name,
path,
codec_line);
default_codec = detectDefaultCompressionCodec();
}
@ -766,6 +834,11 @@ void IMergeTreeDataPart::loadDefaultCompressionCodec()
}
}
void IMergeTreeDataPart::appendFilesOfDefaultCompressionCodec(Strings & files)
{
files.push_back(DEFAULT_COMPRESSION_CODEC_FILE_NAME);
}
CompressionCodecPtr IMergeTreeDataPart::detectDefaultCompressionCodec() const
{
/// In memory parts doesn't have any compression
@ -828,7 +901,7 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
{
String path = getFullRelativePath();
if (!parent_part)
partition.load(storage, volume->getDisk(), path);
partition.load(storage, metadata_manager);
if (!isEmpty())
{
@ -836,7 +909,7 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
// projection parts don't have minmax_idx, and it's always initialized
minmax_idx->initialized = true;
else
minmax_idx->load(storage, volume->getDisk(), path);
minmax_idx->load(storage, metadata_manager);
}
if (parent_part)
return;
@ -851,13 +924,26 @@ void IMergeTreeDataPart::loadPartitionAndMinMaxIndex()
ErrorCodes::CORRUPTED_DATA);
}
void IMergeTreeDataPart::appendFilesOfPartitionAndMinMaxIndex(Strings & files) const
{
if (storage.format_version < MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING && !parent_part)
return;
if (!parent_part)
partition.appendFiles(storage, files);
if (!isEmpty())
if (!parent_part)
minmax_idx->appendFiles(storage, files);
}
void IMergeTreeDataPart::loadChecksums(bool require)
{
const String path = fs::path(getFullRelativePath()) / "checksums.txt";
if (volume->getDisk()->exists(path))
bool exists = metadata_manager->exists("checksums.txt");
if (exists)
{
auto buf = openForReading(volume->getDisk(), path);
auto buf = metadata_manager->read("checksums.txt");
if (checksums.read(*buf))
{
assertEOF(*buf);
@ -888,13 +974,18 @@ void IMergeTreeDataPart::loadChecksums(bool require)
}
}
void IMergeTreeDataPart::appendFilesOfChecksums(Strings & files)
{
files.push_back("checksums.txt");
}
void IMergeTreeDataPart::loadRowsCount()
{
String path = fs::path(getFullRelativePath()) / "count.txt";
auto read_rows_count = [&]()
{
auto buf = openForReading(volume->getDisk(), path);
auto buf = metadata_manager->read("count.txt");
readIntText(rows_count, *buf);
assertEOF(*buf);
};
@ -905,7 +996,8 @@ void IMergeTreeDataPart::loadRowsCount()
}
else if (storage.format_version >= MERGE_TREE_DATA_MIN_FORMAT_VERSION_WITH_CUSTOM_PARTITIONING || part_type == Type::COMPACT || parent_part)
{
if (!volume->getDisk()->exists(path))
bool exists = metadata_manager->exists("count.txt");
if (!exists)
throw Exception("No count.txt in part " + name, ErrorCodes::NO_FILE_IN_DATA_PART);
read_rows_count();
@ -1003,12 +1095,17 @@ void IMergeTreeDataPart::loadRowsCount()
}
}
void IMergeTreeDataPart::appendFilesOfRowsCount(Strings & files)
{
files.push_back("count.txt");
}
void IMergeTreeDataPart::loadTTLInfos()
{
String path = fs::path(getFullRelativePath()) / "ttl.txt";
if (volume->getDisk()->exists(path))
bool exists = metadata_manager->exists("ttl.txt");
if (exists)
{
auto in = openForReading(volume->getDisk(), path);
auto in = metadata_manager->read("ttl.txt");
assertString("ttl format version: ", *in);
size_t format_version;
readText(format_version, *in);
@ -1030,19 +1127,29 @@ void IMergeTreeDataPart::loadTTLInfos()
}
}
void IMergeTreeDataPart::appendFilesOfTTLInfos(Strings & files)
{
files.push_back("ttl.txt");
}
void IMergeTreeDataPart::loadUUID()
{
String path = fs::path(getFullRelativePath()) / UUID_FILE_NAME;
if (volume->getDisk()->exists(path))
bool exists = metadata_manager->exists(UUID_FILE_NAME);
if (exists)
{
auto in = openForReading(volume->getDisk(), path);
auto in = metadata_manager->read(UUID_FILE_NAME);
readText(uuid, *in);
if (uuid == UUIDHelpers::Nil)
throw Exception("Unexpected empty " + String(UUID_FILE_NAME) + " in part: " + name, ErrorCodes::LOGICAL_ERROR);
}
}
void IMergeTreeDataPart::appendFilesOfUUID(Strings & files)
{
files.push_back(UUID_FILE_NAME);
}
void IMergeTreeDataPart::loadColumns(bool require)
{
String path = fs::path(getFullRelativePath()) / "columns.txt";
@ -1051,7 +1158,8 @@ void IMergeTreeDataPart::loadColumns(bool require)
metadata_snapshot = metadata_snapshot->projections.get(name).metadata;
NamesAndTypesList loaded_columns;
if (!volume->getDisk()->exists(path))
bool exists = metadata_manager->exists("columns.txt");
if (!exists)
{
/// We can get list of columns only from columns.txt in compact parts.
if (require || part_type == Type::COMPACT)
@ -1074,7 +1182,8 @@ void IMergeTreeDataPart::loadColumns(bool require)
}
else
{
loaded_columns.readText(*volume->getDisk()->readFile(path));
auto in = metadata_manager->read("columns.txt");
loaded_columns.readText(*in);
for (const auto & column : loaded_columns)
{
@ -1091,14 +1200,23 @@ void IMergeTreeDataPart::loadColumns(bool require)
};
SerializationInfoByName infos(loaded_columns, settings);
path = getFullRelativePath() + SERIALIZATION_FILE_NAME;
if (volume->getDisk()->exists(path))
infos.readJSON(*volume->getDisk()->readFile(path));
exists = metadata_manager->exists(SERIALIZATION_FILE_NAME);
if (exists)
{
auto in = metadata_manager->read(SERIALIZATION_FILE_NAME);
infos.readJSON(*in);
}
setColumns(loaded_columns);
setSerializationInfos(infos);
}
void IMergeTreeDataPart::appendFilesOfColumns(Strings & files)
{
files.push_back("columns.txt");
files.push_back(SERIALIZATION_FILE_NAME);
}
bool IMergeTreeDataPart::shallParticipateInMerges(const StoragePolicyPtr & storage_policy) const
{
/// `IMergeTreeDataPart::volume` describes space where current part belongs, and holds
@ -1150,9 +1268,12 @@ try
}
}
metadata_manager->deleteAll(true);
metadata_manager->assertAllDeleted(true);
volume->getDisk()->setLastModified(from, Poco::Timestamp::fromEpochTime(time(nullptr)));
volume->getDisk()->moveDirectory(from, to);
relative_path = new_relative_path;
metadata_manager->updateAll(true);
SyncGuardPtr sync_guard;
if (storage.getSettings()->fsync_part_directory)
@ -1190,6 +1311,18 @@ std::optional<bool> IMergeTreeDataPart::keepSharedDataInDecoupledStorage() const
return !storage.unlockSharedData(*this);
}
void IMergeTreeDataPart::initializePartMetadataManager()
{
#if USE_ROCKSDB
if (use_metadata_cache)
metadata_manager = std::make_shared<PartMetadataManagerWithCache>(this, storage.getContext()->getMergeTreeMetadataCache());
else
metadata_manager = std::make_shared<PartMetadataManagerOrdinary>(this);
#else
metadata_manager = std::make_shared<PartMetadataManagerOrdinary>(this);
#endif
}
void IMergeTreeDataPart::remove() const
{
std::optional<bool> keep_shared_data = keepSharedDataInDecoupledStorage();
@ -1209,6 +1342,9 @@ void IMergeTreeDataPart::remove() const
return;
}
metadata_manager->deleteAll(false);
metadata_manager->assertAllDeleted(false);
/** Atomic directory removal:
* - rename directory to temporary name;
* - remove it recursive.
@ -1314,6 +1450,9 @@ void IMergeTreeDataPart::remove() const
void IMergeTreeDataPart::projectionRemove(const String & parent_to, bool keep_shared_data) const
{
metadata_manager->deleteAll(false);
metadata_manager->assertAllDeleted(false);
String to = fs::path(parent_to) / relative_path;
auto disk = volume->getDisk();
if (checksums.empty())
@ -1661,6 +1800,35 @@ String IMergeTreeDataPart::getZeroLevelPartBlockID(std::string_view token) const
return info.partition_id + "_" + toString(hash_value.words[0]) + "_" + toString(hash_value.words[1]);
}
IMergeTreeDataPart::uint128 IMergeTreeDataPart::getActualChecksumByFile(const String & file_path) const
{
assert(use_metadata_cache);
String file_name = std::filesystem::path(file_path).filename();
const auto filenames_without_checksums = getFileNamesWithoutChecksums();
auto it = checksums.files.find(file_name);
if (filenames_without_checksums.count(file_name) == 0 && it != checksums.files.end())
{
return it->second.file_hash;
}
if (!volume->getDisk()->exists(file_path))
{
return {};
}
std::unique_ptr<ReadBufferFromFileBase> in_file = volume->getDisk()->readFile(file_path);
HashingReadBuffer in_hash(*in_file);
String value;
readStringUntilEOF(value, in_hash);
return in_hash.getHash();
}
std::unordered_map<String, IMergeTreeDataPart::uint128> IMergeTreeDataPart::checkMetadata() const
{
return metadata_manager->check();
}
bool isCompactPart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::COMPACT);

View File

@ -14,6 +14,7 @@
#include <Storages/MergeTree/MergeTreeIOSettings.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <DataTypes/Serializations/SerializationInfo.h>
#include <Storages/MergeTree/IPartMetadataManager.h>
#include <shared_mutex>
@ -60,6 +61,8 @@ public:
using Type = MergeTreeDataPartType;
using uint128 = IPartMetadataManager::uint128;
IMergeTreeDataPart(
const MergeTreeData & storage_,
@ -148,6 +151,7 @@ public:
/// Initialize columns (from columns.txt if exists, or create from column files if not).
/// Load checksums from checksums.txt if exists. Load index if required.
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);
void appendFilesOfColumnsChecksumsIndexes(Strings & files, bool include_projection = false) const;
String getMarksFileExtension() const { return index_granularity_info.marks_file_extension; }
@ -243,7 +247,7 @@ public:
using TTLInfo = MergeTreeDataPartTTLInfo;
using TTLInfos = MergeTreeDataPartTTLInfos;
TTLInfos ttl_infos;
mutable TTLInfos ttl_infos;
/// Current state of the part. If the part is in working set already, it should be accessed via data_parts mutex
void setState(State new_state) const;
@ -300,14 +304,16 @@ public:
{
}
void load(const MergeTreeData & data, const PartMetadataManagerPtr & manager);
using WrittenFiles = std::vector<std::unique_ptr<WriteBufferFromFileBase>>;
void load(const MergeTreeData & data, const DiskPtr & disk_, const String & part_path);
[[nodiscard]] WrittenFiles store(const MergeTreeData & data, const DiskPtr & disk_, const String & part_path, Checksums & checksums) const;
[[nodiscard]] WrittenFiles store(const Names & column_names, const DataTypes & data_types, const DiskPtr & disk_, const String & part_path, Checksums & checksums) const;
void update(const Block & block, const Names & column_names);
void merge(const MinMaxIndex & other);
static void appendFiles(const MergeTreeData & data, Strings & files);
};
using MinMaxIndexPtr = std::shared_ptr<MinMaxIndex>;
@ -429,6 +435,12 @@ public:
/// Required for distinguish different copies of the same part on remote FS.
String getUniqueId() const;
/// Get checksums of metadata file in part directory
IMergeTreeDataPart::uint128 getActualChecksumByFile(const String & file_path) const;
/// Check metadata in cache is consistent with actual metadata on disk(if use_metadata_cache is true)
std::unordered_map<String, uint128> checkMetadata() const;
protected:
/// Total size of all columns, calculated once in calcuateColumnSizesOnDisk
@ -455,6 +467,11 @@ protected:
std::map<String, std::shared_ptr<IMergeTreeDataPart>> projection_parts;
/// Disabled when USE_ROCKSDB is OFF or use_metadata_cache is set to false in merge tree settings
bool use_metadata_cache = false;
mutable PartMetadataManagerPtr metadata_manager;
void removeIfNeeded();
virtual void checkConsistency(bool require_part_metadata) const;
@ -468,6 +485,9 @@ protected:
std::optional<bool> keepSharedDataInDecoupledStorage() const;
void initializePartMetadataManager();
private:
/// In compact parts order of columns is necessary
NameToNumber column_name_to_position;
@ -478,36 +498,54 @@ private:
/// Reads part unique identifier (if exists) from uuid.txt
void loadUUID();
static void appendFilesOfUUID(Strings & files);
/// Reads columns names and types from columns.txt
void loadColumns(bool require);
static void appendFilesOfColumns(Strings & files);
/// If checksums.txt exists, reads file's checksums (and sizes) from it
void loadChecksums(bool require);
static void appendFilesOfChecksums(Strings & files);
/// Loads marks index granularity into memory
virtual void loadIndexGranularity();
virtual void appendFilesOfIndexGranularity(Strings & files) const;
/// Loads index file.
void loadIndex();
void appendFilesOfIndex(Strings & files) const;
/// Load rows count for this part from disk (for the newer storage format version).
/// For the older format version calculates rows count from the size of a column with a fixed size.
void loadRowsCount();
static void appendFilesOfRowsCount(Strings & files);
/// Loads ttl infos in json format from file ttl.txt. If file doesn't exists assigns ttl infos with all zeros
void loadTTLInfos();
static void appendFilesOfTTLInfos(Strings & files);
void loadPartitionAndMinMaxIndex();
void calculateColumnsSizesOnDisk();
void calculateSecondaryIndicesSizesOnDisk();
void appendFilesOfPartitionAndMinMaxIndex(Strings & files) const;
/// Load default compression codec from file default_compression_codec.txt
/// if it not exists tries to deduce codec from compressed column without
/// any specifial compression.
void loadDefaultCompressionCodec();
static void appendFilesOfDefaultCompressionCodec(Strings & files);
/// Found column without specific compression and return codec
/// for this column with default parameters.
CompressionCodecPtr detectDefaultCompressionCodec() const;

View File

@ -0,0 +1,11 @@
#include "IPartMetadataManager.h"
#include <Disks/IVolume.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace DB
{
IPartMetadataManager::IPartMetadataManager(const IMergeTreeDataPart * part_) : part(part_), disk(part->volume->getDisk())
{
}
}

View File

@ -0,0 +1,59 @@
#pragma once
#include <unordered_map>
#include <city.h>
#include <base/types.h>
namespace DB
{
class IMergeTreeDataPart;
class SeekableReadBuffer;
class IDisk;
using DiskPtr = std::shared_ptr<IDisk>;
/// Interface for managing metadata of merge tree part.
/// IPartMetadataManager has two implementations:
/// - PartMetadataManagerOrdinary: manage metadata from disk directly. deleteAll/assertAllDeleted/updateAll/check
/// are all empty implementations because they are not needed for PartMetadataManagerOrdinary(those operations
/// are done implicitly when removing or renaming part directory).
/// - PartMetadataManagerWithCache: manage metadata from RocksDB cache and disk.
class IPartMetadataManager
{
public:
using uint128 = CityHash_v1_0_2::uint128;
explicit IPartMetadataManager(const IMergeTreeDataPart * part_);
virtual ~IPartMetadataManager() = default;
/// Read metadata content and return SeekableReadBuffer object.
virtual std::unique_ptr<SeekableReadBuffer> read(const String & file_name) const = 0;
/// Return true if metadata exists in part.
virtual bool exists(const String & file_name) const = 0;
/// Delete all metadatas in part.
/// If include_projection is true, also delete metadatas in projection parts.
virtual void deleteAll(bool include_projection) = 0;
/// Assert that all metadatas in part are deleted.
/// If include_projection is true, also assert that all metadatas in projection parts are deleted.
virtual void assertAllDeleted(bool include_projection) const = 0;
/// Update all metadatas in part.
/// If include_projection is true, also update metadatas in projection parts.
virtual void updateAll(bool include_projection) = 0;
/// Check all metadatas in part.
virtual std::unordered_map<String, uint128> check() const = 0;
protected:
const IMergeTreeDataPart * part;
const DiskPtr disk;
};
using PartMetadataManagerPtr = std::shared_ptr<IPartMetadataManager>;
}

View File

@ -53,7 +53,7 @@ String Range::toString() const
/// Example: for `Hello\_World% ...` string it returns `Hello_World`, and for `%test%` returns an empty string.
static String extractFixedPrefixFromLikePattern(const String & like_pattern)
String extractFixedPrefixFromLikePattern(const String & like_pattern)
{
String fixed_prefix;

View File

@ -442,4 +442,6 @@ private:
bool strict;
};
String extractFixedPrefixFromLikePattern(const String & like_pattern);
}

View File

@ -279,14 +279,17 @@ bool MergeFromLogEntryTask::finalize(ReplicatedMergeMutateTaskBase::PartLogWrite
ProfileEvents::increment(ProfileEvents::DataAfterMergeDiffersFromReplica);
LOG_ERROR(log,
"{}. Data after merge is not byte-identical to data on another replicas. There could be several"
" reasons: 1. Using newer version of compression library after server update. 2. Using another"
" compression method. 3. Non-deterministic compression algorithm (highly unlikely). 4."
" Non-deterministic merge algorithm due to logical error in code. 5. Data corruption in memory due"
" to bug in code. 6. Data corruption in memory due to hardware issue. 7. Manual modification of"
" source data after server startup. 8. Manual modification of checksums stored in ZooKeeper. 9."
" Part format related settings like 'enable_mixed_granularity_parts' are different on different"
" replicas. We will download merged part from replica to force byte-identical result.",
"{}. Data after merge is not byte-identical to data on another replicas. There could be several reasons:"
" 1. Using newer version of compression library after server update."
" 2. Using another compression method."
" 3. Non-deterministic compression algorithm (highly unlikely)."
" 4. Non-deterministic merge algorithm due to logical error in code."
" 5. Data corruption in memory due to bug in code."
" 6. Data corruption in memory due to hardware issue."
" 7. Manual modification of source data after server startup."
" 8. Manual modification of checksums stored in ZooKeeper."
" 9. Part format related settings like 'enable_mixed_granularity_parts' are different on different replicas."
" We will download merged part from replica to force byte-identical result.",
getCurrentExceptionMessage(false));
write_part_log(ExecutionStatus::fromCurrentException());

View File

@ -214,6 +214,7 @@ MergeTreeData::MergeTreeData(
, parts_mover(this)
, background_operations_assignee(*this, BackgroundJobsAssignee::Type::DataProcessing, getContext())
, background_moves_assignee(*this, BackgroundJobsAssignee::Type::Moving, getContext())
, use_metadata_cache(getSettings()->use_metadata_cache)
{
context_->getGlobalContext()->initializeBackgroundExecutorsIfNeeded();
@ -333,6 +334,11 @@ MergeTreeData::MergeTreeData(
LOG_WARNING(log, "{} Settings 'min_rows_for_wide_part', 'min_bytes_for_wide_part', "
"'min_rows_for_compact_part' and 'min_bytes_for_compact_part' will be ignored.", reason);
#if !USE_ROCKSDB
if (use_metadata_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't use merge tree metadata cache if clickhouse was compiled without rocksdb");
#endif
common_assignee_trigger = [this] (bool delay) noexcept
{
if (delay)
@ -1372,7 +1378,6 @@ void MergeTreeData::loadDataParts(bool skip_sanity_checks)
LOG_DEBUG(log, "Loaded data parts ({} items)", data_parts_indexes.size());
}
/// Is the part directory old.
/// True if its modification time and the modification time of all files inside it is less then threshold.
/// (Only files on the first level of nesting are considered).

View File

@ -946,6 +946,7 @@ protected:
friend class StorageReplicatedMergeTree;
friend class MergeTreeDataWriter;
friend class MergeTask;
friend class IPartMetadataManager;
bool require_part_metadata;
@ -1028,6 +1029,7 @@ protected:
/// And for ReplicatedMergeTree we don't have LogEntry type for this operation.
BackgroundJobsAssignee background_operations_assignee;
BackgroundJobsAssignee background_moves_assignee;
bool use_metadata_cache;
/// Strongly connected with two fields above.
/// Every task that is finished will ask to assign a new one into an executor.

View File

@ -69,6 +69,7 @@ private:
ColumnSize getColumnSizeImpl(const NameAndTypePair & column, std::unordered_set<String> * processed_substreams) const;
void calculateEachColumnSizes(ColumnSizeByName & each_columns_size, ColumnSize & total_size) const override;
};
}

View File

@ -0,0 +1,107 @@
#include "MergeTreeMetadataCache.h"
#if USE_ROCKSDB
#include <Common/ProfileEvents.h>
#include <base/logger_useful.h>
namespace ProfileEvents
{
extern const Event MergeTreeMetadataCachePut;
extern const Event MergeTreeMetadataCacheGet;
extern const Event MergeTreeMetadataCacheDelete;
extern const Event MergeTreeMetadataCacheSeek;
}
namespace DB
{
namespace ErrorCodes
{
extern const int SYSTEM_ERROR;
}
std::unique_ptr<MergeTreeMetadataCache> MergeTreeMetadataCache::create(const String & dir, size_t size)
{
assert(size != 0);
rocksdb::Options options;
rocksdb::BlockBasedTableOptions table_options;
rocksdb::DB * db;
options.create_if_missing = true;
auto cache = rocksdb::NewLRUCache(size);
table_options.block_cache = cache;
options.table_factory.reset(rocksdb::NewBlockBasedTableFactory(table_options));
rocksdb::Status status = rocksdb::DB::Open(options, dir, &db);
if (status != rocksdb::Status::OK())
throw Exception(
ErrorCodes::SYSTEM_ERROR,
"Fail to open rocksdb path at: {} status:{}. You can try to remove the cache (this will not affect any table data).",
dir,
status.ToString());
return std::make_unique<MergeTreeMetadataCache>(db);
}
MergeTreeMetadataCache::Status MergeTreeMetadataCache::put(const String & key, const String & value)
{
auto options = rocksdb::WriteOptions();
options.sync = true;
options.disableWAL = false;
auto status = rocksdb->Put(options, key, value);
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCachePut);
return status;
}
MergeTreeMetadataCache::Status MergeTreeMetadataCache::del(const String & key)
{
auto options = rocksdb::WriteOptions();
options.sync = true;
options.disableWAL = false;
auto status = rocksdb->Delete(options, key);
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheDelete);
LOG_TRACE(log, "Delete key:{} from MergeTreeMetadataCache status:{}", key, status.ToString());
return status;
}
MergeTreeMetadataCache::Status MergeTreeMetadataCache::get(const String & key, String & value)
{
auto status = rocksdb->Get(rocksdb::ReadOptions(), key, &value);
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheGet);
LOG_TRACE(log, "Get key:{} from MergeTreeMetadataCache status:{}", key, status.ToString());
return status;
}
void MergeTreeMetadataCache::getByPrefix(const String & prefix, Strings & keys, Strings & values)
{
auto * it = rocksdb->NewIterator(rocksdb::ReadOptions());
rocksdb::Slice target(prefix);
for (it->Seek(target); it->Valid(); it->Next())
{
const auto key = it->key();
if (!key.starts_with(target))
break;
const auto value = it->value();
keys.emplace_back(key.data(), key.size());
values.emplace_back(value.data(), value.size());
}
LOG_TRACE(log, "Seek with prefix:{} from MergeTreeMetadataCache items:{}", prefix, keys.size());
ProfileEvents::increment(ProfileEvents::MergeTreeMetadataCacheSeek);
delete it;
}
uint64_t MergeTreeMetadataCache::getEstimateNumKeys() const
{
uint64_t keys = 0;
rocksdb->GetAggregatedIntProperty("rocksdb.estimate-num-keys", &keys);
return keys;
}
void MergeTreeMetadataCache::shutdown()
{
rocksdb->Close();
rocksdb.reset();
}
}
#endif

View File

@ -0,0 +1,45 @@
#pragma once
#include "config_core.h"
#if USE_ROCKSDB
#include <base/types.h>
#include <Core/Types.h>
#include <Poco/Logger.h>
#include <rocksdb/table.h>
#include <rocksdb/db.h>
namespace DB
{
class MergeTreeMetadataCache
{
public:
using Status = rocksdb::Status;
static std::unique_ptr<MergeTreeMetadataCache> create(const String & dir, size_t size);
explicit MergeTreeMetadataCache(rocksdb::DB * rocksdb_) : rocksdb{rocksdb_}
{
assert(rocksdb);
}
MergeTreeMetadataCache(const MergeTreeMetadataCache &) = delete;
MergeTreeMetadataCache & operator=(const MergeTreeMetadataCache &) = delete;
Status put(const String & key, const String & value);
Status del(const String & key);
Status get(const String & key, String & value);
void getByPrefix(const String & prefix, Strings & keys, Strings & values);
uint64_t getEstimateNumKeys() const;
void shutdown();
private:
std::unique_ptr<rocksdb::DB> rocksdb;
Poco::Logger * log = &Poco::Logger::get("MergeTreeMetadataCache");
};
using MergeTreeMetadataCachePtr = std::shared_ptr<MergeTreeMetadataCache>;
}
#endif

View File

@ -178,12 +178,6 @@ namespace
};
}
static std::unique_ptr<ReadBufferFromFileBase> openForReading(const DiskPtr & disk, const String & path)
{
size_t file_size = disk->getFileSize(path);
return disk->readFile(path, ReadSettings().adjustBufferSize(file_size), file_size);
}
String MergeTreePartition::getID(const MergeTreeData & storage) const
{
return getID(storage.getInMemoryMetadataPtr()->getPartitionKey().sample_block);
@ -373,15 +367,15 @@ void MergeTreePartition::serializeText(const MergeTreeData & storage, WriteBuffe
}
}
void MergeTreePartition::load(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path)
void MergeTreePartition::load(const MergeTreeData & storage, const PartMetadataManagerPtr & manager)
{
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
if (!metadata_snapshot->hasPartitionKey())
return;
const auto & partition_key_sample = adjustPartitionKey(metadata_snapshot, storage.getContext()).sample_block;
auto partition_file_path = part_path + "partition.dat";
auto file = openForReading(disk, partition_file_path);
auto file = manager->read("partition.dat");
value.resize(partition_key_sample.columns());
for (size_t i = 0; i < partition_key_sample.columns(); ++i)
partition_key_sample.getByPosition(i).type->getDefaultSerialization()->deserializeBinary(value[i], *file);
@ -402,7 +396,9 @@ std::unique_ptr<WriteBufferFromFileBase> MergeTreePartition::store(const Block &
auto out = disk->writeFile(part_path + "partition.dat");
HashingWriteBuffer out_hashing(*out);
for (size_t i = 0; i < value.size(); ++i)
{
partition_key_sample.getByPosition(i).type->getDefaultSerialization()->serializeBinary(value[i], out_hashing);
}
out_hashing.next();
checksums.files["partition.dat"].file_size = out_hashing.count();
@ -462,4 +458,14 @@ KeyDescription MergeTreePartition::adjustPartitionKey(const StorageMetadataPtr &
return partition_key;
}
void MergeTreePartition::appendFiles(const MergeTreeData & storage, Strings& files)
{
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
if (!metadata_snapshot->hasPartitionKey())
return;
files.push_back("partition.dat");
}
}

View File

@ -4,6 +4,7 @@
#include <Disks/IDisk.h>
#include <IO/WriteBuffer.h>
#include <Storages/KeyDescription.h>
#include <Storages/MergeTree/IPartMetadataManager.h>
#include <Core/Field.h>
namespace DB
@ -37,7 +38,8 @@ public:
void serializeText(const MergeTreeData & storage, WriteBuffer & out, const FormatSettings & format_settings) const;
void load(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path);
void load(const MergeTreeData & storage, const PartMetadataManagerPtr & manager);
/// Store functions return write buffer with written but not finalized data.
/// User must call finish() for returned object.
[[nodiscard]] std::unique_ptr<WriteBufferFromFileBase> store(const MergeTreeData & storage, const DiskPtr & disk, const String & part_path, MergeTreeDataPartChecksums & checksums) const;
@ -47,6 +49,8 @@ public:
void create(const StorageMetadataPtr & metadata_snapshot, Block block, size_t row, ContextPtr context);
static void appendFiles(const MergeTreeData & storage, Strings & files);
/// Adjust partition key and execute its expression on block. Return sample block according to used expression.
static NamesAndTypesList executePartitionByExpression(const StorageMetadataPtr & metadata_snapshot, Block & block, ContextPtr context);

View File

@ -139,6 +139,7 @@ struct Settings;
/** Experimental/work in progress feature. Unsafe for production. */ \
M(UInt64, part_moves_between_shards_enable, 0, "Experimental/Incomplete feature to move parts between shards. Does not take into account sharding expressions.", 0) \
M(UInt64, part_moves_between_shards_delay_seconds, 30, "Time to wait before/after moving parts between shards.", 0) \
M(Bool, use_metadata_cache, false, "Experimental feature to speed up parts loading process by using MergeTree metadata cache", 0) \
\
/** Obsolete settings. Kept for backward compatibility only. */ \
M(UInt64, min_relative_delay_to_yield_leadership, 120, "Obsolete setting, does nothing.", 0) \

Some files were not shown because too many files have changed in this diff Show More