mirror of
https://github.com/ClickHouse/ClickHouse.git
synced 2024-11-23 08:02:02 +00:00
Merge remote-tracking branch 'origin/master' into parallel-replicas-not-enough-replicas
This commit is contained in:
commit
2d446fc1ca
95
.github/workflows/libfuzzer.yml
vendored
Normal file
95
.github/workflows/libfuzzer.yml
vendored
Normal file
@ -0,0 +1,95 @@
|
||||
name: libFuzzer
|
||||
|
||||
env:
|
||||
# Force the stdout and stderr streams to be unbuffered
|
||||
PYTHONUNBUFFERED: 1
|
||||
|
||||
on: # yamllint disable-line rule:truthy
|
||||
# schedule:
|
||||
# - cron: '0 0 2 31 1' # never for now
|
||||
workflow_call:
|
||||
jobs:
|
||||
BuilderFuzzers:
|
||||
runs-on: [self-hosted, builder]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/build_check
|
||||
IMAGES_PATH=${{runner.temp}}/images_path
|
||||
REPO_COPY=${{runner.temp}}/build_check/ClickHouse
|
||||
CACHES_PATH=${{runner.temp}}/../ccaches
|
||||
BUILD_NAME=fuzzers
|
||||
EOF
|
||||
- name: Download changed images
|
||||
# even if artifact does not exist, e.g. on `do not test` label or failed Docker job
|
||||
continue-on-error: true
|
||||
uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: changed_images
|
||||
path: ${{ env.IMAGES_PATH }}
|
||||
- name: Check out repository code
|
||||
uses: ClickHouse/checkout@v1
|
||||
with:
|
||||
clear-repository: true
|
||||
submodules: true
|
||||
ref: ${{github.ref}}
|
||||
- name: Build
|
||||
run: |
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY/tests/ci" && python3 build_check.py "$BUILD_NAME"
|
||||
- name: Upload build URLs to artifacts
|
||||
if: ${{ success() || failure() }}
|
||||
uses: actions/upload-artifact@v3
|
||||
with:
|
||||
name: ${{ env.BUILD_URLS }}
|
||||
path: ${{ env.TEMP_PATH }}/${{ env.BUILD_URLS }}.json
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
|
||||
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
|
||||
sudo rm -fr "$TEMP_PATH" "$CACHES_PATH"
|
||||
libFuzzerTest:
|
||||
needs: [BuilderFuzzers]
|
||||
runs-on: [self-hosted, func-tester]
|
||||
steps:
|
||||
- name: Set envs
|
||||
run: |
|
||||
cat >> "$GITHUB_ENV" << 'EOF'
|
||||
TEMP_PATH=${{runner.temp}}/libfuzzer
|
||||
REPORTS_PATH=${{runner.temp}}/reports_dir
|
||||
CHECK_NAME=libFuzzer tests
|
||||
REPO_COPY=${{runner.temp}}/libfuzzer/ClickHouse
|
||||
KILL_TIMEOUT=10800
|
||||
EOF
|
||||
- name: Download changed images
|
||||
# even if artifact does not exist, e.g. on `do not test` label or failed Docker job
|
||||
continue-on-error: true
|
||||
uses: actions/download-artifact@v3
|
||||
with:
|
||||
name: changed_images
|
||||
path: ${{ env.TEMP_PATH }}
|
||||
- name: Download json reports
|
||||
uses: actions/download-artifact@v3
|
||||
with:
|
||||
path: ${{ env.REPORTS_PATH }}
|
||||
- name: Check out repository code
|
||||
uses: ClickHouse/checkout@v1
|
||||
with:
|
||||
clear-repository: true
|
||||
- name: libFuzzer test
|
||||
run: |
|
||||
sudo rm -fr "$TEMP_PATH"
|
||||
mkdir -p "$TEMP_PATH"
|
||||
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
|
||||
cd "$REPO_COPY/tests/ci"
|
||||
python3 libfuzzer_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
|
||||
- name: Cleanup
|
||||
if: always()
|
||||
run: |
|
||||
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
|
||||
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
|
||||
sudo rm -fr "$TEMP_PATH"
|
13
.github/workflows/pull_request.yml
vendored
13
.github/workflows/pull_request.yml
vendored
@ -5187,9 +5187,16 @@ jobs:
|
||||
cd "$GITHUB_WORKSPACE/tests/ci"
|
||||
python3 finish_check.py
|
||||
python3 merge_pr.py --check-approved
|
||||
##############################################################################################
|
||||
########################### SQLLOGIC TEST ###################################################
|
||||
##############################################################################################
|
||||
#############################################################################################
|
||||
####################################### libFuzzer ###########################################
|
||||
#############################################################################################
|
||||
libFuzzer:
|
||||
if: contains(github.event.pull_request.labels.*.name, 'libFuzzer')
|
||||
needs: [DockerHubPush, StyleCheck]
|
||||
uses: ./.github/workflows/libfuzzer.yml
|
||||
##############################################################################################
|
||||
############################ SQLLOGIC TEST ###################################################
|
||||
##############################################################################################
|
||||
SQLLogicTestRelease:
|
||||
needs: [BuilderDebRelease]
|
||||
runs-on: [self-hosted, func-tester]
|
||||
|
@ -19,6 +19,7 @@ include (cmake/tools.cmake)
|
||||
include (cmake/ccache.cmake)
|
||||
include (cmake/clang_tidy.cmake)
|
||||
include (cmake/git.cmake)
|
||||
include (cmake/utils.cmake)
|
||||
|
||||
# Ignore export() since we don't use it,
|
||||
# but it gets broken with a global targets via link_libraries()
|
||||
@ -562,22 +563,6 @@ add_subdirectory (programs)
|
||||
add_subdirectory (tests)
|
||||
add_subdirectory (utils)
|
||||
|
||||
# Function get_all_targets collects all targets recursively
|
||||
function(get_all_targets var)
|
||||
macro(get_all_targets_recursive targets dir)
|
||||
get_property(subdirectories DIRECTORY ${dir} PROPERTY SUBDIRECTORIES)
|
||||
foreach(subdir ${subdirectories})
|
||||
get_all_targets_recursive(${targets} ${subdir})
|
||||
endforeach()
|
||||
get_property(current_targets DIRECTORY ${dir} PROPERTY BUILDSYSTEM_TARGETS)
|
||||
list(APPEND ${targets} ${current_targets})
|
||||
endmacro()
|
||||
|
||||
set(targets)
|
||||
get_all_targets_recursive(targets ${CMAKE_CURRENT_SOURCE_DIR})
|
||||
set(${var} ${targets} PARENT_SCOPE)
|
||||
endfunction()
|
||||
|
||||
if (FUZZER)
|
||||
# Bundle fuzzers target
|
||||
add_custom_target(fuzzers)
|
||||
@ -592,14 +577,18 @@ if (FUZZER)
|
||||
# clickhouse fuzzer isn't working correctly
|
||||
# initial PR https://github.com/ClickHouse/ClickHouse/pull/27526
|
||||
#if (target MATCHES ".+_fuzzer" OR target STREQUAL "clickhouse")
|
||||
if (target MATCHES ".+_fuzzer")
|
||||
if (target_type STREQUAL "EXECUTABLE" AND target MATCHES ".+_fuzzer")
|
||||
message(STATUS "${target} instrumented with fuzzer")
|
||||
target_link_libraries(${target} PUBLIC ch_contrib::fuzzer)
|
||||
# Add to fuzzers bundle
|
||||
add_dependencies(fuzzers ${target})
|
||||
get_target_filename(${target} target_bin_name)
|
||||
get_target_property(target_bin_dir ${target} BINARY_DIR)
|
||||
add_custom_command(TARGET fuzzers POST_BUILD COMMAND mv "${target_bin_dir}/${target_bin_name}" "${CMAKE_CURRENT_BINARY_DIR}/programs/" VERBATIM)
|
||||
endif()
|
||||
endif()
|
||||
endforeach()
|
||||
add_custom_command(TARGET fuzzers POST_BUILD COMMAND SRC=${CMAKE_SOURCE_DIR} BIN=${CMAKE_BINARY_DIR} OUT=${CMAKE_BINARY_DIR}/programs ${CMAKE_SOURCE_DIR}/tests/fuzz/build.sh VERBATIM)
|
||||
endif()
|
||||
|
||||
include (cmake/sanitize_targets.cmake)
|
||||
|
120
cmake/utils.cmake
Normal file
120
cmake/utils.cmake
Normal file
@ -0,0 +1,120 @@
|
||||
# Useful stuff
|
||||
|
||||
# Function get_all_targets collects all targets recursively
|
||||
function(get_all_targets outvar)
|
||||
macro(get_all_targets_recursive targets dir)
|
||||
get_property(subdirectories DIRECTORY ${dir} PROPERTY SUBDIRECTORIES)
|
||||
foreach(subdir ${subdirectories})
|
||||
get_all_targets_recursive(${targets} ${subdir})
|
||||
endforeach()
|
||||
get_property(current_targets DIRECTORY ${dir} PROPERTY BUILDSYSTEM_TARGETS)
|
||||
list(APPEND ${targets} ${current_targets})
|
||||
endmacro()
|
||||
|
||||
set(targets)
|
||||
get_all_targets_recursive(targets ${CMAKE_CURRENT_SOURCE_DIR})
|
||||
set(${outvar} ${targets} PARENT_SCOPE)
|
||||
endfunction()
|
||||
|
||||
|
||||
# Function get_target_filename calculates target's output file name
|
||||
function(get_target_filename target outvar)
|
||||
get_target_property(prop_type "${target}" TYPE)
|
||||
get_target_property(prop_is_framework "${target}" FRAMEWORK)
|
||||
get_target_property(prop_outname "${target}" OUTPUT_NAME)
|
||||
get_target_property(prop_archive_outname "${target}" ARCHIVE_OUTPUT_NAME)
|
||||
get_target_property(prop_library_outname "${target}" LIBRARY_OUTPUT_NAME)
|
||||
get_target_property(prop_runtime_outname "${target}" RUNTIME_OUTPUT_NAME)
|
||||
# message("prop_archive_outname: ${prop_archive_outname}")
|
||||
# message("prop_library_outname: ${prop_library_outname}")
|
||||
# message("prop_runtime_outname: ${prop_runtime_outname}")
|
||||
if(DEFINED CMAKE_BUILD_TYPE)
|
||||
get_target_property(prop_cfg_outname "${target}" "${OUTPUT_NAME}_${CMAKE_BUILD_TYPE}")
|
||||
get_target_property(prop_archive_cfg_outname "${target}" "${ARCHIVE_OUTPUT_NAME}_${CMAKE_BUILD_TYPE}")
|
||||
get_target_property(prop_library_cfg_outname "${target}" "${LIBRARY_OUTPUT_NAME}_${CMAKE_BUILD_TYPE}")
|
||||
get_target_property(prop_runtime_cfg_outname "${target}" "${RUNTIME_OUTPUT_NAME}_${CMAKE_BUILD_TYPE}")
|
||||
# message("prop_archive_cfg_outname: ${prop_archive_cfg_outname}")
|
||||
# message("prop_library_cfg_outname: ${prop_library_cfg_outname}")
|
||||
# message("prop_runtime_cfg_outname: ${prop_runtime_cfg_outname}")
|
||||
if(NOT ("${prop_cfg_outname}" STREQUAL "prop_cfg_outname-NOTFOUND"))
|
||||
set(prop_outname "${prop_cfg_outname}")
|
||||
endif()
|
||||
if(NOT ("${prop_archive_cfg_outname}" STREQUAL "prop_archive_cfg_outname-NOTFOUND"))
|
||||
set(prop_archive_outname "${prop_archive_cfg_outname}")
|
||||
endif()
|
||||
if(NOT ("${prop_library_cfg_outname}" STREQUAL "prop_library_cfg_outname-NOTFOUND"))
|
||||
set(prop_library_outname "${prop_library_cfg_outname}")
|
||||
endif()
|
||||
if(NOT ("${prop_runtime_cfg_outname}" STREQUAL "prop_runtime_cfg_outname-NOTFOUND"))
|
||||
set(prop_runtime_outname "${prop_runtime_cfg_outname}")
|
||||
endif()
|
||||
endif()
|
||||
set(outname "${target}")
|
||||
if(NOT ("${prop_outname}" STREQUAL "prop_outname-NOTFOUND"))
|
||||
set(outname "${prop_outname}")
|
||||
endif()
|
||||
if("${prop_is_framework}")
|
||||
set(filename "${outname}")
|
||||
elseif(prop_type STREQUAL "STATIC_LIBRARY")
|
||||
if(NOT ("${prop_archive_outname}" STREQUAL "prop_archive_outname-NOTFOUND"))
|
||||
set(outname "${prop_archive_outname}")
|
||||
endif()
|
||||
set(filename "${CMAKE_STATIC_LIBRARY_PREFIX}${outname}${CMAKE_STATIC_LIBRARY_SUFFIX}")
|
||||
elseif(prop_type STREQUAL "MODULE_LIBRARY")
|
||||
if(NOT ("${prop_library_outname}" STREQUAL "prop_library_outname-NOTFOUND"))
|
||||
set(outname "${prop_library_outname}")
|
||||
endif()
|
||||
set(filename "${CMAKE_SHARED_MODULE_LIBRARY_PREFIX}${outname}${CMAKE_SHARED_MODULE_LIBRARY_SUFFIX}")
|
||||
elseif(prop_type STREQUAL "SHARED_LIBRARY")
|
||||
if(WIN32)
|
||||
if(NOT ("${prop_runtime_outname}" STREQUAL "prop_runtime_outname-NOTFOUND"))
|
||||
set(outname "${prop_runtime_outname}")
|
||||
endif()
|
||||
else()
|
||||
if(NOT ("${prop_library_outname}" STREQUAL "prop_library_outname-NOTFOUND"))
|
||||
set(outname "${prop_library_outname}")
|
||||
endif()
|
||||
endif()
|
||||
set(filename "${CMAKE_SHARED_LIBRARY_PREFIX}${outname}${CMAKE_SHARED_LIBRARY_SUFFIX}")
|
||||
elseif(prop_type STREQUAL "EXECUTABLE")
|
||||
if(NOT ("${prop_runtime_outname}" STREQUAL "prop_runtime_outname-NOTFOUND"))
|
||||
set(outname "${prop_runtime_outname}")
|
||||
endif()
|
||||
set(filename "${CMAKE_EXECUTABLE_PREFIX}${outname}${CMAKE_EXECUTABLE_SUFFIX}")
|
||||
else()
|
||||
message(FATAL_ERROR "target \"${target}\" is not of type STATIC_LIBRARY, MODULE_LIBRARY, SHARED_LIBRARY, or EXECUTABLE.")
|
||||
endif()
|
||||
set("${outvar}" "${filename}" PARENT_SCOPE)
|
||||
endfunction()
|
||||
|
||||
|
||||
# Function get_cmake_properties returns list of all propreties that cmake supports
|
||||
function(get_cmake_properties outvar)
|
||||
execute_process(COMMAND cmake --help-property-list OUTPUT_VARIABLE cmake_properties)
|
||||
# Convert command output into a CMake list
|
||||
string(REGEX REPLACE ";" "\\\\;" cmake_properties "${cmake_properties}")
|
||||
string(REGEX REPLACE "\n" ";" cmake_properties "${cmake_properties}")
|
||||
list(REMOVE_DUPLICATES cmake_properties)
|
||||
set("${outvar}" "${cmake_properties}" PARENT_SCOPE)
|
||||
endfunction()
|
||||
|
||||
# Function get_target_property_list returns list of all propreties set for target
|
||||
function(get_target_property_list target outvar)
|
||||
get_cmake_properties(cmake_property_list)
|
||||
foreach(property ${cmake_property_list})
|
||||
string(REPLACE "<CONFIG>" "${CMAKE_BUILD_TYPE}" property ${property})
|
||||
|
||||
# https://stackoverflow.com/questions/32197663/how-can-i-remove-the-the-location-property-may-not-be-read-from-target-error-i
|
||||
if(property STREQUAL "LOCATION" OR property MATCHES "^LOCATION_" OR property MATCHES "_LOCATION$")
|
||||
continue()
|
||||
endif()
|
||||
|
||||
get_property(was_set TARGET ${target} PROPERTY ${property} SET)
|
||||
if(was_set)
|
||||
get_target_property(value ${target} ${property})
|
||||
string(REGEX REPLACE ";" "\\\\\\\\;" value "${value}")
|
||||
list(APPEND outvar "${property} = ${value}")
|
||||
endif()
|
||||
endforeach()
|
||||
set(${outvar} ${${outvar}} PARENT_SCOPE)
|
||||
endfunction()
|
@ -21,6 +21,10 @@
|
||||
"name": "clickhouse/fuzzer",
|
||||
"dependent": []
|
||||
},
|
||||
"docker/test/libfuzzer": {
|
||||
"name": "clickhouse/libfuzzer",
|
||||
"dependent": []
|
||||
},
|
||||
"docker/test/performance-comparison": {
|
||||
"name": "clickhouse/performance-comparison",
|
||||
"dependent": []
|
||||
@ -121,6 +125,7 @@
|
||||
"name": "clickhouse/test-base",
|
||||
"dependent": [
|
||||
"docker/test/fuzzer",
|
||||
"docker/test/libfuzzer",
|
||||
"docker/test/integration/base",
|
||||
"docker/test/keeper-jepsen",
|
||||
"docker/test/server-jepsen",
|
||||
|
@ -78,6 +78,7 @@ RUN add-apt-repository ppa:ubuntu-toolchain-r/test --yes \
|
||||
python3-boto3 \
|
||||
yasm \
|
||||
zstd \
|
||||
zip \
|
||||
&& apt-get clean \
|
||||
&& rm -rf /var/lib/apt/lists
|
||||
|
||||
|
@ -97,11 +97,10 @@ if [ -n "$MAKE_DEB" ]; then
|
||||
bash -x /build/packages/build
|
||||
fi
|
||||
|
||||
if [ "$BUILD_TARGET" != "fuzzers" ]; then
|
||||
mv ./programs/clickhouse* /output
|
||||
[ -x ./programs/self-extracting/clickhouse ] && mv ./programs/self-extracting/clickhouse /output
|
||||
mv ./src/unit_tests_dbms /output ||: # may not exist for some binary builds
|
||||
fi
|
||||
mv ./programs/clickhouse* /output || mv ./programs/*_fuzzer /output
|
||||
[ -x ./programs/self-extracting/clickhouse ] && mv ./programs/self-extracting/clickhouse /output
|
||||
mv ./src/unit_tests_dbms /output ||: # may not exist for some binary builds
|
||||
mv ./programs/*.dict ./programs/*.options ./programs/*_seed_corpus.zip /output ||: # libFuzzer oss-fuzz compatible infrastructure
|
||||
|
||||
prepare_combined_output () {
|
||||
local OUTPUT
|
||||
|
43
docker/test/libfuzzer/Dockerfile
Normal file
43
docker/test/libfuzzer/Dockerfile
Normal file
@ -0,0 +1,43 @@
|
||||
ARG FROM_TAG=latest
|
||||
FROM clickhouse/test-base:$FROM_TAG
|
||||
|
||||
# ARG for quick switch to a given ubuntu mirror
|
||||
ARG apt_archive="http://archive.ubuntu.com"
|
||||
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
|
||||
|
||||
ENV LANG=C.UTF-8
|
||||
ENV TZ=Europe/Amsterdam
|
||||
RUN ln -snf /usr/share/zoneinfo/$TZ /etc/localtime && echo $TZ > /etc/timezone
|
||||
|
||||
RUN apt-get update \
|
||||
&& DEBIAN_FRONTEND=noninteractive apt-get install --yes --no-install-recommends \
|
||||
ca-certificates \
|
||||
libc6-dbg \
|
||||
moreutils \
|
||||
ncdu \
|
||||
p7zip-full \
|
||||
parallel \
|
||||
psmisc \
|
||||
python3 \
|
||||
python3-pip \
|
||||
rsync \
|
||||
tree \
|
||||
tzdata \
|
||||
vim \
|
||||
wget \
|
||||
&& apt-get autoremove --yes \
|
||||
&& apt-get clean \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
RUN pip3 install Jinja2
|
||||
|
||||
COPY * /
|
||||
|
||||
ENV FUZZER_ARGS="-max_total_time=60"
|
||||
|
||||
SHELL ["/bin/bash", "-c"]
|
||||
CMD set -o pipefail \
|
||||
&& timeout -s 9 1h /run_libfuzzer.py 2>&1 | ts "$(printf '%%Y-%%m-%%d %%H:%%M:%%S\t')" | tee main.log
|
||||
|
||||
# docker run --network=host --volume <workspace>:/workspace -e PR_TO_TEST=<> -e SHA_TO_TEST=<> clickhouse/libfuzzer
|
||||
|
77
docker/test/libfuzzer/run_libfuzzer.py
Executable file
77
docker/test/libfuzzer/run_libfuzzer.py
Executable file
@ -0,0 +1,77 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import configparser
|
||||
import logging
|
||||
import os
|
||||
from pathlib import Path
|
||||
import subprocess
|
||||
|
||||
DEBUGGER = os.getenv("DEBUGGER", "")
|
||||
FUZZER_ARGS = os.getenv("FUZZER_ARGS", "")
|
||||
|
||||
|
||||
def run_fuzzer(fuzzer: str):
|
||||
logging.info(f"Running fuzzer {fuzzer}...")
|
||||
|
||||
corpus_dir = f"{fuzzer}.in"
|
||||
with Path(corpus_dir) as path:
|
||||
if not path.exists() or not path.is_dir():
|
||||
corpus_dir = ""
|
||||
|
||||
options_file = f"{fuzzer}.options"
|
||||
custom_libfuzzer_options = ""
|
||||
|
||||
with Path(options_file) as path:
|
||||
if path.exists() and path.is_file():
|
||||
parser = configparser.ConfigParser()
|
||||
parser.read(path)
|
||||
|
||||
if parser.has_section("asan"):
|
||||
os.environ[
|
||||
"ASAN_OPTIONS"
|
||||
] = f"{os.environ['ASAN_OPTIONS']}:{':'.join('%s=%s' % (key, value) for key, value in parser['asan'].items())}"
|
||||
|
||||
if parser.has_section("msan"):
|
||||
os.environ[
|
||||
"MSAN_OPTIONS"
|
||||
] = f"{os.environ['MSAN_OPTIONS']}:{':'.join('%s=%s' % (key, value) for key, value in parser['msan'].items())}"
|
||||
|
||||
if parser.has_section("ubsan"):
|
||||
os.environ[
|
||||
"UBSAN_OPTIONS"
|
||||
] = f"{os.environ['UBSAN_OPTIONS']}:{':'.join('%s=%s' % (key, value) for key, value in parser['ubsan'].items())}"
|
||||
|
||||
if parser.has_section("libfuzzer"):
|
||||
custom_libfuzzer_options = " ".join(
|
||||
"-%s=%s" % (key, value)
|
||||
for key, value in parser["libfuzzer"].items()
|
||||
)
|
||||
|
||||
cmd_line = f"{DEBUGGER} ./{fuzzer} {FUZZER_ARGS} {corpus_dir}"
|
||||
if custom_libfuzzer_options:
|
||||
cmd_line += f" {custom_libfuzzer_options}"
|
||||
|
||||
if not "-dict=" in cmd_line and Path(f"{fuzzer}.dict").exists():
|
||||
cmd_line += f" -dict={fuzzer}.dict"
|
||||
|
||||
cmd_line += " < /dev/null"
|
||||
|
||||
logging.info(f"...will execute: {cmd_line}")
|
||||
subprocess.check_call(cmd_line, shell=True)
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
subprocess.check_call("ls -al", shell=True)
|
||||
|
||||
with Path() as current:
|
||||
for fuzzer in current.iterdir():
|
||||
if (current / fuzzer).is_file() and os.access(current / fuzzer, os.X_OK):
|
||||
run_fuzzer(fuzzer)
|
||||
|
||||
exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -1354,3 +1354,4 @@ In this sample configuration:
|
||||
- `_part_uuid` — Unique part identifier (if enabled MergeTree setting `assign_part_uuids`).
|
||||
- `_partition_value` — Values (a tuple) of a `partition by` expression.
|
||||
- `_sample_factor` — Sample factor (from the query).
|
||||
- `_block_number` — Block number of the row, it is persisted on merges when `allow_experimental_block_number_column` is set to true.
|
||||
|
@ -142,7 +142,7 @@ As a result, the query cache stores for each query multiple (partial)
|
||||
result blocks. While this behavior is a good default, it can be suppressed using setting
|
||||
[query_cache_squash_partial_results](settings/settings.md#query-cache-squash-partial-results).
|
||||
|
||||
Also, results of queries with non-deterministic functions are not cached. Such functions include
|
||||
Also, results of queries with non-deterministic functions are not cached by default. Such functions include
|
||||
- functions for accessing dictionaries: [`dictGet()`](../sql-reference/functions/ext-dict-functions.md#dictGet) etc.
|
||||
- [user-defined functions](../sql-reference/statements/create/function.md),
|
||||
- functions which return the current date or time: [`now()`](../sql-reference/functions/date-time-functions.md#now),
|
||||
@ -158,7 +158,7 @@ Also, results of queries with non-deterministic functions are not cached. Such f
|
||||
- functions which depend on the environment: [`currentUser()`](../sql-reference/functions/other-functions.md#currentUser),
|
||||
[`queryID()`](../sql-reference/functions/other-functions.md#queryID),
|
||||
[`getMacro()`](../sql-reference/functions/other-functions.md#getMacro) etc.
|
||||
Caching of non-deterministic functions can be forced regardless using setting
|
||||
To force caching of results of queries with non-deterministic functions regardless, use setting
|
||||
[query_cache_store_results_of_queries_with_nondeterministic_functions](settings/settings.md#query-cache-store-results-of-queries-with-nondeterministic-functions).
|
||||
|
||||
Finally, entries in the query cache are not shared between users due to security reasons. For example, user A must not be able to bypass a
|
||||
|
@ -854,3 +854,9 @@ Possible values:
|
||||
- `Always` or `Never`.
|
||||
|
||||
Default value: `Never`
|
||||
|
||||
## allow_experimental_block_number_column
|
||||
|
||||
Persists virtual column `_block_number` on merges.
|
||||
|
||||
Default value: false.
|
||||
|
@ -157,7 +157,7 @@ At this time, it is not checked for one of the sorting stages, or when merging a
|
||||
The `max_execution_time` parameter can be a bit tricky to understand.
|
||||
It operates based on interpolation relative to the current query execution speed (this behaviour is controlled by [timeout_before_checking_execution_speed](#timeout-before-checking-execution-speed)).
|
||||
ClickHouse will interrupt a query if the projected execution time exceeds the specified `max_execution_time`.
|
||||
By default, the timeout_before_checking_execution_speed is set to 1 second. This means that after just one second of query execution, ClickHouse will begin estimating the total execution time.
|
||||
By default, the timeout_before_checking_execution_speed is set to 10 seconds. This means that after 10 seconds of query execution, ClickHouse will begin estimating the total execution time.
|
||||
If, for example, `max_execution_time` is set to 3600 seconds (1 hour), ClickHouse will terminate the query if the estimated time exceeds this 3600-second limit.
|
||||
If you set `timeout_before_checking_execution_speed `to 0, ClickHouse will use clock time as the basis for `max_execution_time`.
|
||||
|
||||
|
@ -32,6 +32,8 @@
|
||||
#include <Common/randomSeed.h>
|
||||
#include <Common/ThreadPool.h>
|
||||
#include <Loggers/Loggers.h>
|
||||
#include <Loggers/OwnFormattingChannel.h>
|
||||
#include <Loggers/OwnPatternFormatter.h>
|
||||
#include <IO/ReadBufferFromFile.h>
|
||||
#include <IO/ReadBufferFromString.h>
|
||||
#include <IO/WriteBufferFromFileDescriptor.h>
|
||||
@ -599,7 +601,9 @@ void LocalServer::processConfig()
|
||||
{
|
||||
auto poco_logs_level = Poco::Logger::parseLevel(level);
|
||||
Poco::Logger::root().setLevel(poco_logs_level);
|
||||
Poco::Logger::root().setChannel(Poco::AutoPtr<Poco::SimpleFileChannel>(new Poco::SimpleFileChannel(server_logs_file)));
|
||||
Poco::AutoPtr<OwnPatternFormatter> pf = new OwnPatternFormatter;
|
||||
Poco::AutoPtr<OwnFormattingChannel> log = new OwnFormattingChannel(pf, new Poco::SimpleFileChannel(server_logs_file));
|
||||
Poco::Logger::root().setChannel(log);
|
||||
logging_initialized = true;
|
||||
}
|
||||
else if (logging || is_interactive)
|
||||
|
@ -585,6 +585,8 @@
|
||||
M(700, USER_SESSION_LIMIT_EXCEEDED) \
|
||||
M(701, CLUSTER_DOESNT_EXIST) \
|
||||
M(702, CLIENT_INFO_DOES_NOT_MATCH) \
|
||||
M(703, INVALID_IDENTIFIER) \
|
||||
M(704, CANNOT_USE_QUERY_CACHE_WITH_NONDETERMINISTIC_FUNCTIONS) \
|
||||
\
|
||||
M(999, KEEPER_EXCEPTION) \
|
||||
M(1000, POCO_EXCEPTION) \
|
||||
|
@ -169,7 +169,7 @@ class IColumn;
|
||||
M(String, parallel_replicas_custom_key, "", "Custom key assigning work to replicas when parallel replicas are used.", 0) \
|
||||
M(ParallelReplicasCustomKeyFilterType, parallel_replicas_custom_key_filter_type, ParallelReplicasCustomKeyFilterType::DEFAULT, "Type of filter to use with custom key for parallel replicas. default - use modulo operation on the custom key, range - use range filter on custom key using all possible values for the value type of custom key.", 0) \
|
||||
\
|
||||
M(String, cluster_for_parallel_replicas, "default", "Cluster for a shard in which current server is located", 0) \
|
||||
M(String, cluster_for_parallel_replicas, "", "Cluster for a shard in which current server is located", 0) \
|
||||
M(UInt64, allow_experimental_parallel_reading_from_replicas, 0, "Use all the replicas from a shard for SELECT query execution. Reading is parallelized and coordinated dynamically. 0 - disabled, 1 - enabled, silently disable them in case of failure, 2 - enabled, throw an exception in case of failure", 0) \
|
||||
M(Float, parallel_replicas_single_task_marks_count_multiplier, 2, "A multiplier which will be added during calculation for minimal number of marks to retrieve from coordinator. This will be applied only for remote replicas.", 0) \
|
||||
M(Bool, parallel_replicas_for_non_replicated_merge_tree, false, "If true, ClickHouse will use parallel replicas algorithm also for non-replicated MergeTree tables", 0) \
|
||||
|
231
src/DataTypes/Utils.cpp
Normal file
231
src/DataTypes/Utils.cpp
Normal file
@ -0,0 +1,231 @@
|
||||
#include <DataTypes/Utils.h>
|
||||
#include <DataTypes/DataTypesDecimal.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
#include <DataTypes/DataTypeLowCardinality.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <DataTypes/DataTypeMap.h>
|
||||
#include <DataTypes/DataTypeTuple.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
bool canBeSafelyCasted(const DataTypePtr & from_type, const DataTypePtr & to_type)
|
||||
{
|
||||
auto from_which_type = WhichDataType(from_type->getTypeId());
|
||||
bool to_type_was_nullable = isNullableOrLowCardinalityNullable(to_type);
|
||||
auto to_type_unwrapped = removeNullable(removeLowCardinality(to_type));
|
||||
|
||||
if (from_type->equals(*to_type_unwrapped))
|
||||
return true;
|
||||
|
||||
auto to_which_type = WhichDataType(to_type_unwrapped->getTypeId());
|
||||
|
||||
switch (from_which_type.idx)
|
||||
{
|
||||
case TypeIndex::UInt8:
|
||||
case TypeIndex::UInt16:
|
||||
case TypeIndex::UInt32:
|
||||
case TypeIndex::UInt64:
|
||||
case TypeIndex::UInt128:
|
||||
case TypeIndex::UInt256:
|
||||
{
|
||||
if (to_which_type.isUInt() &&
|
||||
to_type_unwrapped->getSizeOfValueInMemory() >= from_type->getSizeOfValueInMemory())
|
||||
return true;
|
||||
|
||||
if (to_which_type.isString())
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
case TypeIndex::Int8:
|
||||
case TypeIndex::Int16:
|
||||
case TypeIndex::Int32:
|
||||
case TypeIndex::Int64:
|
||||
case TypeIndex::Int128:
|
||||
case TypeIndex::Int256:
|
||||
{
|
||||
if (to_which_type.isInt() &&
|
||||
to_type_unwrapped->getSizeOfValueInMemory() >= from_type->getSizeOfValueInMemory())
|
||||
return true;
|
||||
|
||||
if (to_which_type.isString())
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
case TypeIndex::Float32:
|
||||
{
|
||||
if (to_which_type.isFloat64() || to_which_type.isString())
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
case TypeIndex::Float64:
|
||||
case TypeIndex::Date:
|
||||
case TypeIndex::Date32:
|
||||
case TypeIndex::DateTime:
|
||||
case TypeIndex::DateTime64:
|
||||
case TypeIndex::FixedString:
|
||||
case TypeIndex::Enum8:
|
||||
case TypeIndex::Enum16:
|
||||
case TypeIndex::IPv6:
|
||||
{
|
||||
if (to_which_type.isString())
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
case TypeIndex::Decimal32:
|
||||
case TypeIndex::Decimal64:
|
||||
case TypeIndex::Decimal128:
|
||||
case TypeIndex::Decimal256:
|
||||
{
|
||||
if (to_which_type.isDecimal())
|
||||
{
|
||||
auto from_type_decimal_precision = getDecimalPrecision(*from_type);
|
||||
auto to_type_decimal_precision = getDecimalPrecision(*to_type_unwrapped);
|
||||
if (from_type_decimal_precision > to_type_decimal_precision)
|
||||
return false;
|
||||
|
||||
auto from_type_decimal_scale = getDecimalScale(*from_type);
|
||||
auto to_type_decimal_scale = getDecimalScale(*to_type_unwrapped);
|
||||
if (from_type_decimal_scale > to_type_decimal_scale)
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
if (to_which_type.isString())
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
case TypeIndex::UUID:
|
||||
{
|
||||
if (to_which_type.isUInt128() || to_which_type.isString())
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
case TypeIndex::IPv4:
|
||||
{
|
||||
if (to_which_type.isUInt32() || to_which_type.isUInt64() || to_which_type.isString())
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
case TypeIndex::Nullable:
|
||||
{
|
||||
if (to_type_was_nullable)
|
||||
{
|
||||
const auto & from_type_nullable = assert_cast<const DataTypeNullable &>(*from_type);
|
||||
return canBeSafelyCasted(from_type_nullable.getNestedType(), to_type_unwrapped);
|
||||
}
|
||||
|
||||
if (to_which_type.isString())
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
case TypeIndex::LowCardinality:
|
||||
{
|
||||
const auto & from_type_low_cardinality = assert_cast<const DataTypeLowCardinality &>(*from_type);
|
||||
return canBeSafelyCasted(from_type_low_cardinality.getDictionaryType(), to_type_unwrapped);
|
||||
}
|
||||
case TypeIndex::Array:
|
||||
{
|
||||
if (to_which_type.isArray())
|
||||
{
|
||||
const auto & from_type_array = assert_cast<const DataTypeArray &>(*from_type);
|
||||
const auto & to_type_array = assert_cast<const DataTypeArray &>(*to_type_unwrapped);
|
||||
return canBeSafelyCasted(from_type_array.getNestedType(), to_type_array.getNestedType());
|
||||
}
|
||||
|
||||
if (to_which_type.isString())
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
case TypeIndex::Map:
|
||||
{
|
||||
if (to_which_type.isMap())
|
||||
{
|
||||
const auto & from_type_map = assert_cast<const DataTypeMap &>(*from_type);
|
||||
const auto & to_type_map = assert_cast<const DataTypeMap &>(*to_type_unwrapped);
|
||||
if (!canBeSafelyCasted(from_type_map.getKeyType(), to_type_map.getKeyType()))
|
||||
return false;
|
||||
|
||||
if (!canBeSafelyCasted(from_type_map.getValueType(), to_type_map.getValueType()))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
if (to_which_type.isArray())
|
||||
{
|
||||
// Map nested type is Array(Tuple(key_type, value_type))
|
||||
const auto & from_type_map = assert_cast<const DataTypeMap &>(*from_type);
|
||||
const auto & to_type_array = assert_cast<const DataTypeArray &>(*to_type_unwrapped);
|
||||
const auto * to_type_nested_tuple_type = typeid_cast<const DataTypeTuple *>(to_type_array.getNestedType().get());
|
||||
if (!to_type_nested_tuple_type)
|
||||
return false;
|
||||
|
||||
const auto & to_type_tuple_elements = to_type_nested_tuple_type->getElements();
|
||||
if (to_type_tuple_elements.size() != 2)
|
||||
return false;
|
||||
|
||||
if (!canBeSafelyCasted(from_type_map.getKeyType(), to_type_tuple_elements[0]))
|
||||
return false;
|
||||
|
||||
if (!canBeSafelyCasted(from_type_map.getValueType(), to_type_tuple_elements[1]))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
if (to_which_type.isString())
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
case TypeIndex::Tuple:
|
||||
{
|
||||
if (to_which_type.isTuple())
|
||||
{
|
||||
const auto & from_type_tuple = assert_cast<const DataTypeTuple &>(*from_type);
|
||||
const auto & to_type_tuple = assert_cast<const DataTypeTuple &>(*to_type_unwrapped);
|
||||
|
||||
const auto & from_tuple_type_elements = from_type_tuple.getElements();
|
||||
const auto & to_tuple_type_elements = to_type_tuple.getElements();
|
||||
|
||||
size_t lhs_type_elements_size = from_tuple_type_elements.size();
|
||||
if (lhs_type_elements_size != to_tuple_type_elements.size())
|
||||
return false;
|
||||
|
||||
for (size_t i = 0; i < lhs_type_elements_size; ++i)
|
||||
if (!canBeSafelyCasted(from_tuple_type_elements[i], to_tuple_type_elements[i]))
|
||||
return false;
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
if (to_which_type.isString())
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
case TypeIndex::String:
|
||||
case TypeIndex::Object:
|
||||
case TypeIndex::Set:
|
||||
case TypeIndex::Interval:
|
||||
case TypeIndex::Function:
|
||||
case TypeIndex::AggregateFunction:
|
||||
case TypeIndex::Nothing:
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
}
|
19
src/DataTypes/Utils.h
Normal file
19
src/DataTypes/Utils.h
Normal file
@ -0,0 +1,19 @@
|
||||
#pragma once
|
||||
|
||||
#include <DataTypes/IDataType.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
/** Returns true if from_type can be safely casted to to_type.
|
||||
*
|
||||
* Examples:
|
||||
* From type UInt8 to type UInt16 returns true.
|
||||
* From type UInt16 to type UInt8 returns false.
|
||||
* From type String to type LowCardinality(String) returns true.
|
||||
* From type LowCardinality(String) to type String returns true.
|
||||
* From type String to type UInt8 returns false.
|
||||
*/
|
||||
bool canBeSafelyCasted(const DataTypePtr & from_type, const DataTypePtr & to_type);
|
||||
|
||||
}
|
@ -5,6 +5,7 @@
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <IO/WithFileName.h>
|
||||
#include <IO/WithFileSize.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -86,7 +87,16 @@ try
|
||||
buf = read_buffer_iterator.next();
|
||||
if (!buf)
|
||||
break;
|
||||
is_eof = buf->eof();
|
||||
|
||||
/// We just want to check for eof, but eof() can be pretty expensive.
|
||||
/// So we use getFileSize() when available, which has better worst case.
|
||||
/// (For remote files, typically eof() would read 1 MB from S3, which may be much
|
||||
/// more than what the schema reader and even data reader will read).
|
||||
auto size = tryGetFileSizeFromReadBuffer(*buf);
|
||||
if (size.has_value())
|
||||
is_eof = *size == 0;
|
||||
else
|
||||
is_eof = buf->eof();
|
||||
}
|
||||
catch (Exception & e)
|
||||
{
|
||||
|
@ -6,7 +6,6 @@
|
||||
#include <Columns/ColumnNullable.h>
|
||||
#include <Columns/ColumnLowCardinality.h>
|
||||
#include <Common/assert_cast.h>
|
||||
#include <DataTypes/DataTypeNullable.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
|
@ -199,6 +199,7 @@ public:
|
||||
|
||||
/// Parse JSON for every row
|
||||
Impl impl;
|
||||
GeneratorJSONPath<JSONParser> generator_json_path(res);
|
||||
for (const auto i : collections::range(0, input_rows_count))
|
||||
{
|
||||
std::string_view json{
|
||||
@ -208,7 +209,9 @@ public:
|
||||
bool added_to_column = false;
|
||||
if (document_ok)
|
||||
{
|
||||
added_to_column = impl.insertResultToColumn(*to, document, res, context);
|
||||
/// Instead of creating a new generator for each row, we can reuse the same one.
|
||||
generator_json_path.reinitialize();
|
||||
added_to_column = impl.insertResultToColumn(*to, document, generator_json_path, context);
|
||||
}
|
||||
if (!added_to_column)
|
||||
{
|
||||
@ -287,9 +290,8 @@ public:
|
||||
|
||||
static size_t getNumberOfIndexArguments(const ColumnsWithTypeAndName & arguments) { return arguments.size() - 1; }
|
||||
|
||||
static bool insertResultToColumn(IColumn & dest, const Element & root, ASTPtr & query_ptr, const ContextPtr &)
|
||||
static bool insertResultToColumn(IColumn & dest, const Element & root, GeneratorJSONPath<JSONParser> & generator_json_path, const ContextPtr &)
|
||||
{
|
||||
GeneratorJSONPath<JSONParser> generator_json_path(query_ptr);
|
||||
Element current_element = root;
|
||||
VisitorStatus status;
|
||||
while ((status = generator_json_path.getNextItem(current_element)) != VisitorStatus::Exhausted)
|
||||
@ -337,9 +339,8 @@ public:
|
||||
|
||||
static size_t getNumberOfIndexArguments(const ColumnsWithTypeAndName & arguments) { return arguments.size() - 1; }
|
||||
|
||||
static bool insertResultToColumn(IColumn & dest, const Element & root, ASTPtr & query_ptr, const ContextPtr & context)
|
||||
static bool insertResultToColumn(IColumn & dest, const Element & root, GeneratorJSONPath<JSONParser> & generator_json_path, const ContextPtr & context)
|
||||
{
|
||||
GeneratorJSONPath<JSONParser> generator_json_path(query_ptr);
|
||||
Element current_element = root;
|
||||
VisitorStatus status;
|
||||
|
||||
@ -405,11 +406,10 @@ public:
|
||||
|
||||
static size_t getNumberOfIndexArguments(const ColumnsWithTypeAndName & arguments) { return arguments.size() - 1; }
|
||||
|
||||
static bool insertResultToColumn(IColumn & dest, const Element & root, ASTPtr & query_ptr, const ContextPtr &)
|
||||
static bool insertResultToColumn(IColumn & dest, const Element & root, GeneratorJSONPath<JSONParser> & generator_json_path, const ContextPtr &)
|
||||
{
|
||||
ColumnString & col_str = assert_cast<ColumnString &>(dest);
|
||||
|
||||
GeneratorJSONPath<JSONParser> generator_json_path(query_ptr);
|
||||
Element current_element = root;
|
||||
VisitorStatus status;
|
||||
bool success = false;
|
||||
|
@ -105,6 +105,16 @@ public:
|
||||
}
|
||||
}
|
||||
|
||||
void reinitialize()
|
||||
{
|
||||
while (current_visitor >= 0)
|
||||
{
|
||||
visitors[current_visitor]->reinitialize();
|
||||
current_visitor--;
|
||||
}
|
||||
current_visitor = 0;
|
||||
}
|
||||
|
||||
private:
|
||||
bool updateVisitorsForNextRun()
|
||||
{
|
||||
|
@ -321,7 +321,7 @@ namespace
|
||||
/// To avoid such a deadlock we unlock `lock` before entering `pool_ptr->second->get`.
|
||||
lock.unlock();
|
||||
|
||||
auto retry_timeout = timeouts.connection_timeout.totalMicroseconds();
|
||||
auto retry_timeout = timeouts.connection_timeout.totalMilliseconds();
|
||||
auto session = pool_ptr->second->get(retry_timeout);
|
||||
|
||||
setTimeouts(*session, timeouts);
|
||||
|
@ -114,7 +114,8 @@ void SelectStreamFactory::createForShard(
|
||||
ContextPtr context,
|
||||
std::vector<QueryPlanPtr> & local_plans,
|
||||
Shards & remote_shards,
|
||||
UInt32 shard_count)
|
||||
UInt32 shard_count,
|
||||
bool parallel_replicas_enabled)
|
||||
{
|
||||
auto it = objects_by_shard.find(shard_info.shard_num);
|
||||
if (it != objects_by_shard.end())
|
||||
@ -146,7 +147,10 @@ void SelectStreamFactory::createForShard(
|
||||
return;
|
||||
});
|
||||
|
||||
if (settings.prefer_localhost_replica && shard_info.isLocal())
|
||||
// prefer_localhost_replica is not effective in case of parallel replicas
|
||||
// (1) prefer_localhost_replica is about choosing one replica on a shard
|
||||
// (2) parallel replica coordinator has own logic to choose replicas to read from
|
||||
if (settings.prefer_localhost_replica && shard_info.isLocal() && !parallel_replicas_enabled)
|
||||
{
|
||||
StoragePtr main_table_storage;
|
||||
|
||||
@ -187,7 +191,7 @@ void SelectStreamFactory::createForShard(
|
||||
return;
|
||||
}
|
||||
|
||||
UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;
|
||||
const UInt64 max_allowed_delay = settings.max_replica_delay_for_distributed_queries;
|
||||
|
||||
if (!max_allowed_delay)
|
||||
{
|
||||
|
@ -78,7 +78,8 @@ public:
|
||||
ContextPtr context,
|
||||
std::vector<QueryPlanPtr> & local_plans,
|
||||
Shards & remote_shards,
|
||||
UInt32 shard_count);
|
||||
UInt32 shard_count,
|
||||
bool parallel_replicas_enabled);
|
||||
|
||||
struct ShardPlans
|
||||
{
|
||||
|
@ -205,9 +205,11 @@ void executeQuery(
|
||||
|
||||
new_context->increaseDistributedDepth();
|
||||
|
||||
size_t shards = cluster->getShardCount();
|
||||
for (const auto & shard_info : cluster->getShardsInfo())
|
||||
const size_t shards = cluster->getShardCount();
|
||||
for (size_t i = 0, s = cluster->getShardsInfo().size(); i < s; ++i)
|
||||
{
|
||||
const auto & shard_info = cluster->getShardsInfo()[i];
|
||||
|
||||
ASTPtr query_ast_for_shard = query_ast->clone();
|
||||
if (sharding_key_expr && query_info.optimized_cluster && settings.optimize_skip_unused_shards_rewrite_in && shards > 1)
|
||||
{
|
||||
@ -237,9 +239,15 @@ void executeQuery(
|
||||
}
|
||||
}
|
||||
|
||||
// decide for each shard if parallel reading from replicas should be enabled
|
||||
// according to settings and number of replicas declared per shard
|
||||
const auto & addresses = cluster->getShardsAddresses().at(i);
|
||||
bool parallel_replicas_enabled = addresses.size() > 1 && context->canUseParallelReplicas();
|
||||
|
||||
stream_factory.createForShard(shard_info,
|
||||
query_ast_for_shard, main_table, table_func_ptr,
|
||||
new_context, plans, remote_shards, static_cast<UInt32>(shards));
|
||||
new_context, plans, remote_shards, static_cast<UInt32>(shards),
|
||||
parallel_replicas_enabled);
|
||||
}
|
||||
|
||||
if (!remote_shards.empty())
|
||||
@ -263,7 +271,7 @@ void executeQuery(
|
||||
log,
|
||||
shards,
|
||||
query_info.storage_limits,
|
||||
query_info.getCluster()->getName());
|
||||
not_optimized_cluster->getName());
|
||||
|
||||
read_from_remote->setStepDescription("Read from remote replica");
|
||||
plan->addStep(std::move(read_from_remote));
|
||||
|
@ -16,6 +16,7 @@ namespace DB
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int INVALID_IDENTIFIER;
|
||||
}
|
||||
|
||||
DatabaseAndTableWithAlias::DatabaseAndTableWithAlias(const ASTTableIdentifier & identifier, const String & current_database)
|
||||
@ -37,7 +38,7 @@ DatabaseAndTableWithAlias::DatabaseAndTableWithAlias(const ASTIdentifier & ident
|
||||
else if (identifier.name_parts.size() == 1)
|
||||
table = identifier.name_parts[0];
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: invalid identifier");
|
||||
throw Exception(ErrorCodes::INVALID_IDENTIFIER, "Invalid identifier");
|
||||
|
||||
if (database.empty())
|
||||
database = current_database;
|
||||
@ -50,7 +51,7 @@ DatabaseAndTableWithAlias::DatabaseAndTableWithAlias(const ASTPtr & node, const
|
||||
else if (const auto * identifier = node->as<ASTIdentifier>())
|
||||
*this = DatabaseAndTableWithAlias(*identifier, current_database);
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Logical error: identifier or table identifier expected");
|
||||
throw Exception(ErrorCodes::INVALID_IDENTIFIER, "Identifier or table identifier expected");
|
||||
}
|
||||
|
||||
DatabaseAndTableWithAlias::DatabaseAndTableWithAlias(const ASTTableExpression & table_expression, const String & current_database)
|
||||
|
@ -31,6 +31,7 @@
|
||||
#include <Storages/StorageInMemoryMetadata.h>
|
||||
#include <Storages/WindowView/StorageWindowView.h>
|
||||
#include <Storages/StorageReplicatedMergeTree.h>
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Interpreters/executeDDLQueryOnCluster.h>
|
||||
@ -837,6 +838,13 @@ void InterpreterCreateQuery::validateTableStructure(const ASTCreateQuery & creat
|
||||
"Cannot create table with column '{}' for *MergeTree engines because it "
|
||||
"is reserved for lightweight delete feature",
|
||||
LightweightDeleteDescription::FILTER_COLUMN.name);
|
||||
|
||||
auto search_block_number = all_columns.find(BlockNumberColumn::name);
|
||||
if (search_block_number != all_columns.end())
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
|
||||
"Cannot create table with column '{}' for *MergeTree engines because it "
|
||||
"is reserved for storing block number",
|
||||
BlockNumberColumn::name);
|
||||
}
|
||||
|
||||
const auto & settings = getContext()->getSettingsRef();
|
||||
|
@ -7,6 +7,7 @@
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Storages/MergeTree/StorageFromMergeTreeDataPart.h>
|
||||
#include <Storages/StorageMergeTree.h>
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
#include <Processors/Transforms/FilterTransform.h>
|
||||
#include <Processors/Transforms/ExpressionTransform.h>
|
||||
#include <Processors/Transforms/CreatingSetsTransform.h>
|
||||
@ -40,7 +41,6 @@
|
||||
#include <Parsers/makeASTForLogicalFunction.h>
|
||||
#include <Common/logger_useful.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
@ -56,6 +56,7 @@ namespace ErrorCodes
|
||||
extern const int THERE_IS_NO_COLUMN;
|
||||
}
|
||||
|
||||
|
||||
namespace
|
||||
{
|
||||
|
||||
@ -416,6 +417,12 @@ static void validateUpdateColumns(
|
||||
found = true;
|
||||
}
|
||||
|
||||
/// Dont allow to override value of block number virtual column
|
||||
if (!found && column_name == BlockNumberColumn::name)
|
||||
{
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Update is not supported for virtual column {} ", backQuote(column_name));
|
||||
}
|
||||
|
||||
if (!found)
|
||||
{
|
||||
for (const auto & col : metadata_snapshot->getColumns().getMaterialized())
|
||||
@ -511,7 +518,8 @@ void MutationsInterpreter::prepare(bool dry_run)
|
||||
|
||||
for (const auto & [name, _] : command.column_to_update_expression)
|
||||
{
|
||||
if (!available_columns_set.contains(name) && name != LightweightDeleteDescription::FILTER_COLUMN.name)
|
||||
if (!available_columns_set.contains(name) && name != LightweightDeleteDescription::FILTER_COLUMN.name
|
||||
&& name != BlockNumberColumn::name)
|
||||
throw Exception(ErrorCodes::THERE_IS_NO_COLUMN,
|
||||
"Column {} is updated but not requested to read", name);
|
||||
|
||||
@ -613,6 +621,8 @@ void MutationsInterpreter::prepare(bool dry_run)
|
||||
type = physical_column->type;
|
||||
else if (column == LightweightDeleteDescription::FILTER_COLUMN.name)
|
||||
type = LightweightDeleteDescription::FILTER_COLUMN.type;
|
||||
else if (column == BlockNumberColumn::name)
|
||||
type = BlockNumberColumn::type;
|
||||
else
|
||||
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unknown column {}", column);
|
||||
|
||||
@ -1087,6 +1097,18 @@ struct VirtualColumns
|
||||
|
||||
virtuals.emplace_back(ColumnAndPosition{.column = std::move(column), .position = i});
|
||||
}
|
||||
else if (columns_to_read[i] == BlockNumberColumn::name)
|
||||
{
|
||||
if (!part->getColumns().contains(BlockNumberColumn::name))
|
||||
{
|
||||
ColumnWithTypeAndName block_number_column;
|
||||
block_number_column.type = BlockNumberColumn::type;
|
||||
block_number_column.column = block_number_column.type->createColumnConst(0, part->info.min_block);
|
||||
block_number_column.name = std::move(columns_to_read[i]);
|
||||
|
||||
virtuals.emplace_back(ColumnAndPosition{.column = std::move(block_number_column), .position = i});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (!virtuals.empty())
|
||||
|
@ -48,7 +48,7 @@ static bool equals(const DataTypes & lhs, const DataTypes & rhs)
|
||||
|
||||
FutureSetFromStorage::FutureSetFromStorage(SetPtr set_) : set(std::move(set_)) {}
|
||||
SetPtr FutureSetFromStorage::get() const { return set; }
|
||||
const DataTypes & FutureSetFromStorage::getTypes() const { return set->getElementsTypes(); }
|
||||
DataTypes FutureSetFromStorage::getTypes() const { return set->getElementsTypes(); }
|
||||
|
||||
SetPtr FutureSetFromStorage::buildOrderedSetInplace(const ContextPtr &)
|
||||
{
|
||||
@ -73,7 +73,7 @@ FutureSetFromTuple::FutureSetFromTuple(Block block, const Settings & settings)
|
||||
set->finishInsert();
|
||||
}
|
||||
|
||||
const DataTypes & FutureSetFromTuple::getTypes() const { return set->getElementsTypes(); }
|
||||
DataTypes FutureSetFromTuple::getTypes() const { return set->getElementsTypes(); }
|
||||
|
||||
SetPtr FutureSetFromTuple::buildOrderedSetInplace(const ContextPtr & context)
|
||||
{
|
||||
@ -138,7 +138,7 @@ void FutureSetFromSubquery::setQueryPlan(std::unique_ptr<QueryPlan> source_)
|
||||
set_and_key->set->setHeader(source->getCurrentDataStream().header.getColumnsWithTypeAndName());
|
||||
}
|
||||
|
||||
const DataTypes & FutureSetFromSubquery::getTypes() const
|
||||
DataTypes FutureSetFromSubquery::getTypes() const
|
||||
{
|
||||
return set_and_key->set->getElementsTypes();
|
||||
}
|
||||
@ -183,7 +183,10 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context)
|
||||
{
|
||||
auto set = external_table_set->buildOrderedSetInplace(context);
|
||||
if (set)
|
||||
return set_and_key->set = set;
|
||||
{
|
||||
set_and_key->set = set;
|
||||
return set_and_key->set;
|
||||
}
|
||||
}
|
||||
|
||||
auto plan = build(context);
|
||||
|
@ -47,7 +47,7 @@ public:
|
||||
/// Returns set if set is ready (created and filled) or nullptr if not.
|
||||
virtual SetPtr get() const = 0;
|
||||
/// Returns set->getElementsTypes(), even if set is not created yet.
|
||||
virtual const DataTypes & getTypes() const = 0;
|
||||
virtual DataTypes getTypes() const = 0;
|
||||
/// If possible, return set with stored elements useful for PK analysis.
|
||||
virtual SetPtr buildOrderedSetInplace(const ContextPtr & context) = 0;
|
||||
};
|
||||
@ -62,7 +62,7 @@ public:
|
||||
FutureSetFromStorage(SetPtr set_);
|
||||
|
||||
SetPtr get() const override;
|
||||
const DataTypes & getTypes() const override;
|
||||
DataTypes getTypes() const override;
|
||||
SetPtr buildOrderedSetInplace(const ContextPtr &) override;
|
||||
|
||||
private:
|
||||
@ -79,7 +79,7 @@ public:
|
||||
SetPtr get() const override { return set; }
|
||||
SetPtr buildOrderedSetInplace(const ContextPtr & context) override;
|
||||
|
||||
const DataTypes & getTypes() const override;
|
||||
DataTypes getTypes() const override;
|
||||
|
||||
private:
|
||||
SetPtr set;
|
||||
@ -105,7 +105,7 @@ public:
|
||||
const Settings & settings);
|
||||
|
||||
SetPtr get() const override;
|
||||
const DataTypes & getTypes() const override;
|
||||
DataTypes getTypes() const override;
|
||||
SetPtr buildOrderedSetInplace(const ContextPtr & context) override;
|
||||
|
||||
std::unique_ptr<QueryPlan> build(const ContextPtr & context);
|
||||
|
@ -94,11 +94,12 @@ namespace DB
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int CANNOT_USE_QUERY_CACHE_WITH_NONDETERMINISTIC_FUNCTIONS;
|
||||
extern const int INTO_OUTFILE_NOT_ALLOWED;
|
||||
extern const int QUERY_WAS_CANCELLED;
|
||||
extern const int INVALID_TRANSACTION;
|
||||
extern const int LOGICAL_ERROR;
|
||||
extern const int NOT_IMPLEMENTED;
|
||||
extern const int QUERY_WAS_CANCELLED;
|
||||
}
|
||||
|
||||
|
||||
@ -991,7 +992,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
|
||||
if (!async_insert)
|
||||
{
|
||||
/// If it is a non-internal SELECT, and passive/read use of the query cache is enabled, and the cache knows the query, then set
|
||||
/// If it is a non-internal SELECT, and passive (read) use of the query cache is enabled, and the cache knows the query, then set
|
||||
/// a pipeline with a source populated by the query cache.
|
||||
auto get_result_from_query_cache = [&]()
|
||||
{
|
||||
@ -1091,11 +1092,14 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
|
||||
|
||||
res = interpreter->execute();
|
||||
|
||||
/// If it is a non-internal SELECT query, and active/write use of the query cache is enabled, then add a processor on
|
||||
/// If it is a non-internal SELECT query, and active (write) use of the query cache is enabled, then add a processor on
|
||||
/// top of the pipeline which stores the result in the query cache.
|
||||
if (can_use_query_cache && settings.enable_writes_to_query_cache
|
||||
&& (!astContainsNonDeterministicFunctions(ast, context) || settings.query_cache_store_results_of_queries_with_nondeterministic_functions))
|
||||
if (can_use_query_cache && settings.enable_writes_to_query_cache)
|
||||
{
|
||||
if (astContainsNonDeterministicFunctions(ast, context) && !settings.query_cache_store_results_of_queries_with_nondeterministic_functions)
|
||||
throw Exception(ErrorCodes::CANNOT_USE_QUERY_CACHE_WITH_NONDETERMINISTIC_FUNCTIONS,
|
||||
"Unable to cache the query result because the query contains a non-deterministic function. Use setting query_cache_store_results_of_queries_with_nondeterministic_functions = 1 to store the query result regardless.");
|
||||
|
||||
QueryCache::Key key(
|
||||
ast, res.pipeline.getHeader(),
|
||||
context->getUserName(), settings.query_cache_share_between_users,
|
||||
|
@ -20,6 +20,7 @@
|
||||
#include <Columns/ColumnArray.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <Storages/StorageInMemoryMetadata.h>
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -260,7 +261,7 @@ void fillMissingColumns(
|
||||
const NamesAndTypesList & requested_columns,
|
||||
const NamesAndTypesList & available_columns,
|
||||
const NameSet & partially_read_columns,
|
||||
StorageMetadataPtr metadata_snapshot)
|
||||
StorageMetadataPtr metadata_snapshot, size_t block_number)
|
||||
{
|
||||
size_t num_columns = requested_columns.size();
|
||||
if (num_columns != res_columns.size())
|
||||
@ -339,9 +340,14 @@ void fillMissingColumns(
|
||||
}
|
||||
else
|
||||
{
|
||||
/// We must turn a constant column into a full column because the interpreter could infer
|
||||
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
|
||||
res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
|
||||
if (requested_column->name == BlockNumberColumn::name)
|
||||
res_columns[i] = type->createColumnConst(num_rows, block_number)->convertToFullColumnIfConst();
|
||||
else
|
||||
/// We must turn a constant column into a full column because the interpreter could infer
|
||||
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
|
||||
res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
|
||||
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -46,6 +46,6 @@ void fillMissingColumns(
|
||||
const NamesAndTypesList & requested_columns,
|
||||
const NamesAndTypesList & available_columns,
|
||||
const NameSet & partially_read_columns,
|
||||
StorageMetadataPtr metadata_snapshot);
|
||||
StorageMetadataPtr metadata_snapshot, size_t block_number = 0);
|
||||
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
#include <string_view>
|
||||
#include <regex>
|
||||
#include <gtest/gtest.h>
|
||||
#include <boost/algorithm/string/replace.hpp>
|
||||
|
||||
namespace
|
||||
{
|
||||
@ -39,7 +40,11 @@ std::ostream & operator<<(std::ostream & ostr, const std::shared_ptr<IParser> pa
|
||||
|
||||
std::ostream & operator<<(std::ostream & ostr, const ParserTestCase & test_case)
|
||||
{
|
||||
return ostr << "ParserTestCase input: " << test_case.input_text;
|
||||
// New line characters are removed because at the time of writing this the unit test results are parsed from the
|
||||
// command line output, and multi-line string representations are breaking the parsing logic.
|
||||
std::string input_text{test_case.input_text};
|
||||
boost::replace_all(input_text, "\n", "\\n");
|
||||
return ostr << "ParserTestCase input: " << input_text;
|
||||
}
|
||||
|
||||
class ParserTest : public ::testing::TestWithParam<std::tuple<std::shared_ptr<IParser>, ParserTestCase>>
|
||||
@ -494,11 +499,11 @@ INSTANTIATE_TEST_SUITE_P(
|
||||
::testing::Values(std::make_shared<ParserPRQLQuery>(kDummyMaxQuerySize, kDummyMaxParserDepth)),
|
||||
::testing::ValuesIn(std::initializer_list<ParserTestCase>{
|
||||
{
|
||||
"from albums\ngroup [author_id] (\n aggregate [first_pushlied = min published]\n)\njoin a=author side:left [==author_id]\njoin p=purchases side:right [==author_id]\ngroup [a.id, p.purchase_id] (\n aggregate [avg_sell = min first_pushlied]\n)",
|
||||
"WITH table_1 AS\n (\n SELECT\n MIN(published) AS _expr_0,\n author_id\n FROM albums\n GROUP BY author_id\n )\nSELECT\n a.id,\n p.purchase_id,\n MIN(table_0._expr_0) AS avg_sell\nFROM table_1 AS table_0\nLEFT JOIN author AS a ON table_0.author_id = a.author_id\nRIGHT JOIN purchases AS p ON table_0.author_id = p.author_id\nGROUP BY\n a.id,\n p.purchase_id",
|
||||
"from albums\ngroup {author_id} (\n aggregate {first_published = min published}\n)\njoin a=author side:left (==author_id)\njoin p=purchases side:right (==author_id)\ngroup {a.id, p.purchase_id} (\n aggregate {avg_sell = min first_published}\n)",
|
||||
"WITH table_0 AS\n (\n SELECT\n MIN(published) AS _expr_0,\n author_id\n FROM albums\n GROUP BY author_id\n )\nSELECT\n a.id,\n p.purchase_id,\n MIN(table_0._expr_0) AS avg_sell\nFROM table_0\nLEFT JOIN author AS a ON table_0.author_id = a.author_id\nRIGHT JOIN purchases AS p ON table_0.author_id = p.author_id\nGROUP BY\n a.id,\n p.purchase_id",
|
||||
},
|
||||
{
|
||||
"from matches\nfilter start_date > @2023-05-30 # Some comment here\nderive [\n some_derived_value_1 = a + (b ?? 0), # And there\n some_derived_value_2 = c + some_derived_value\n]\nfilter some_derived_value_2 > 0\ngroup [country, city] (\n aggregate [\n average some_derived_value_2,\n aggr = max some_derived_value_2,\n ]\n)\nderive place = f\"{city} in {country}\"\nderive country_code = s\"LEFT(country, 2)\"\nsort [aggr, -country]\ntake 1..20",
|
||||
"WITH\n table_3 AS\n (\n SELECT\n country,\n city,\n c + some_derived_value AS _expr_1\n FROM matches\n WHERE start_date > toDate('2023-05-30')\n ),\n table_1 AS\n (\n SELECT\n country,\n city,\n AVG(_expr_1) AS _expr_0,\n MAX(_expr_1) AS aggr\n FROM table_3 AS table_2\n WHERE _expr_1 > 0\n GROUP BY\n country,\n city\n )\nSELECT\n country,\n city,\n _expr_0,\n aggr,\n CONCAT(city, ' in ', country) AS place,\n LEFT(country, 2) AS country_code\nFROM table_1 AS table_0\nORDER BY\n aggr ASC,\n country DESC\nLIMIT 20",
|
||||
"from matches\nfilter start_date > @2023-05-30 # Some comment here\nderive {\n some_derived_value_1 = a + (b ?? 0), # And there\n some_derived_value_2 = c + some_derived_value\n}\nfilter some_derived_value_2 > 0\ngroup {country, city} (\n aggregate {\n average some_derived_value_2,\n aggr = max some_derived_value_2\n }\n)\nderive place = f\"{city} in {country}\"\nderive country_code = s\"LEFT(country, 2)\"\nsort {aggr, -country}\ntake 1..20",
|
||||
"WITH\n table_1 AS\n (\n SELECT\n country,\n city,\n c + some_derived_value AS _expr_1\n FROM matches\n WHERE start_date > toDate('2023-05-30')\n ),\n table_0 AS\n (\n SELECT\n country,\n city,\n AVG(_expr_1) AS _expr_0,\n MAX(_expr_1) AS aggr\n FROM table_1\n WHERE _expr_1 > 0\n GROUP BY\n country,\n city\n )\nSELECT\n country,\n city,\n _expr_0,\n aggr,\n CONCAT(city, ' in ', country) AS place,\n LEFT(country, 2) AS country_code\nFROM table_0\nORDER BY\n aggr ASC,\n country DESC\nLIMIT 20",
|
||||
},
|
||||
})));
|
||||
|
@ -130,7 +130,7 @@ static std::shared_ptr<parquet::FileMetaData> getFileMetadata(
|
||||
const FormatSettings & format_settings,
|
||||
std::atomic<int> & is_stopped)
|
||||
{
|
||||
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES);
|
||||
auto arrow_file = asArrowFile(in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
|
||||
return parquet::ReadMetaData(arrow_file);
|
||||
}
|
||||
|
||||
@ -495,12 +495,15 @@ NamesAndTypesList ParquetMetadataSchemaReader::readSchema()
|
||||
|
||||
void registerInputFormatParquetMetadata(FormatFactory & factory)
|
||||
{
|
||||
factory.registerInputFormat(
|
||||
factory.registerRandomAccessInputFormat(
|
||||
"ParquetMetadata",
|
||||
[](ReadBuffer &buf,
|
||||
const Block &sample,
|
||||
const RowInputFormatParams &,
|
||||
const FormatSettings & settings)
|
||||
[](ReadBuffer & buf,
|
||||
const Block & sample,
|
||||
const FormatSettings & settings,
|
||||
const ReadSettings &,
|
||||
bool /* is_remote_fs */,
|
||||
size_t /* max_download_threads */,
|
||||
size_t /* max_parsing_threads */)
|
||||
{
|
||||
return std::make_shared<ParquetMetadataInputFormat>(buf, sample, settings);
|
||||
});
|
||||
|
@ -28,7 +28,6 @@ ReplacingSortedAlgorithm::ReplacingSortedAlgorithm(
|
||||
, cleanup(cleanup_)
|
||||
, cleanedup_rows_count(cleanedup_rows_count_)
|
||||
{
|
||||
|
||||
if (!is_deleted_column.empty())
|
||||
is_deleted_column_number = header_.getPositionByName(is_deleted_column);
|
||||
if (!version_column.empty())
|
||||
@ -83,8 +82,11 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
|
||||
uint8_t value = assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num];
|
||||
if (!cleanup || !value)
|
||||
insertRow();
|
||||
else if (cleanedup_rows_count != nullptr)
|
||||
else if (cleanup && cleanedup_rows_count != nullptr)
|
||||
{
|
||||
*cleanedup_rows_count += current_row_sources.size();
|
||||
current_row_sources.resize(0);
|
||||
}
|
||||
}
|
||||
else
|
||||
insertRow();
|
||||
@ -141,8 +143,11 @@ IMergingAlgorithm::Status ReplacingSortedAlgorithm::merge()
|
||||
uint8_t value = assert_cast<const ColumnUInt8 &>(*(*selected_row.all_columns)[is_deleted_column_number]).getData()[selected_row.row_num];
|
||||
if (!cleanup || !value)
|
||||
insertRow();
|
||||
else if (cleanedup_rows_count != nullptr)
|
||||
else if (cleanup && cleanedup_rows_count != nullptr)
|
||||
{
|
||||
*cleanedup_rows_count += current_row_sources.size();
|
||||
current_row_sources.resize(0);
|
||||
}
|
||||
}
|
||||
else
|
||||
insertRow();
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
#include <DataTypes/DataTypeLowCardinality.h>
|
||||
#include <IO/WriteHelpers.h>
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -222,6 +223,12 @@ static SummingSortedAlgorithm::ColumnsDefinition defineColumns(
|
||||
const ColumnWithTypeAndName & column = header.safeGetByPosition(i);
|
||||
|
||||
const auto * simple = dynamic_cast<const DataTypeCustomSimpleAggregateFunction *>(column.type->getCustomName());
|
||||
if (column.name == BlockNumberColumn::name)
|
||||
{
|
||||
def.column_numbers_not_to_aggregate.push_back(i);
|
||||
continue;
|
||||
}
|
||||
|
||||
/// Discover nested Maps and find columns for summation
|
||||
if (typeid_cast<const DataTypeArray *>(column.type.get()) && !simple)
|
||||
{
|
||||
|
@ -49,7 +49,8 @@ TTLTransform::TTLTransform(
|
||||
|
||||
for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs())
|
||||
algorithms.emplace_back(std::make_unique<TTLAggregationAlgorithm>(
|
||||
group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_, getInputPort().getHeader(), storage_));
|
||||
group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_,
|
||||
getInputPort().getHeader(), storage_));
|
||||
|
||||
if (metadata_snapshot_->hasAnyColumnTTL())
|
||||
{
|
||||
|
@ -27,6 +27,7 @@
|
||||
#include <Storages/AlterCommands.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <Storages/LightweightDeleteDescription.h>
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
#include <Storages/MergeTree/MergeTreeData.h>
|
||||
#include <Common/typeid_cast.h>
|
||||
#include <Common/randomSeed.h>
|
||||
@ -782,7 +783,7 @@ bool AlterCommand::isRequireMutationStage(const StorageInMemoryMetadata & metada
|
||||
/// Drop alias is metadata alter, in other case mutation is required.
|
||||
if (type == DROP_COLUMN)
|
||||
return metadata.columns.hasColumnOrNested(GetColumnsOptions::AllPhysical, column_name) ||
|
||||
column_name == LightweightDeleteDescription::FILTER_COLUMN.name;
|
||||
column_name == LightweightDeleteDescription::FILTER_COLUMN.name || column_name == BlockNumberColumn::name;
|
||||
|
||||
if (type != MODIFY_COLUMN || data_type == nullptr)
|
||||
return false;
|
||||
@ -1066,6 +1067,10 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot add column {}: "
|
||||
"this column name is reserved for lightweight delete feature", backQuote(column_name));
|
||||
|
||||
if (column_name == BlockNumberColumn::name && std::dynamic_pointer_cast<MergeTreeData>(table))
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot add column {}: "
|
||||
"this column name is reserved for _block_number persisting feature", backQuote(column_name));
|
||||
|
||||
if (command.codec)
|
||||
CompressionCodecFactory::instance().validateCodecAndGetPreprocessedAST(command.codec, command.data_type, !context->getSettingsRef().allow_suspicious_codecs, context->getSettingsRef().allow_experimental_codecs, context->getSettingsRef().enable_deflate_qpl_codec);
|
||||
|
||||
@ -1270,6 +1275,10 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot rename to {}: "
|
||||
"this column name is reserved for lightweight delete feature", backQuote(command.rename_to));
|
||||
|
||||
if (command.rename_to == BlockNumberColumn::name && std::dynamic_pointer_cast<MergeTreeData>(table))
|
||||
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Cannot rename to {}: "
|
||||
"this column name is reserved for _block_number persisting feature", backQuote(command.rename_to));
|
||||
|
||||
if (modified_columns.contains(column_name))
|
||||
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Cannot rename and modify the same column {} "
|
||||
"in a single ALTER query", backQuote(column_name));
|
||||
|
23
src/Storages/BlockNumberColumn.cpp
Normal file
23
src/Storages/BlockNumberColumn.cpp
Normal file
@ -0,0 +1,23 @@
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
#include <Compression/CompressionCodecMultiple.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
CompressionCodecPtr getCompressionCodecDelta(UInt8 delta_bytes_size);
|
||||
|
||||
CompressionCodecPtr getCompressionCodecForBlockNumberColumn()
|
||||
{
|
||||
std::vector <CompressionCodecPtr> codecs;
|
||||
codecs.reserve(2);
|
||||
auto data_bytes_size = BlockNumberColumn::type->getSizeOfValueInMemory();
|
||||
codecs.emplace_back(getCompressionCodecDelta(data_bytes_size));
|
||||
codecs.emplace_back(CompressionCodecFactory::instance().get("LZ4", {}));
|
||||
return std::make_shared<CompressionCodecMultiple>(codecs);
|
||||
}
|
||||
|
||||
const String BlockNumberColumn::name = "_block_number";
|
||||
const DataTypePtr BlockNumberColumn::type = std::make_shared<DataTypeUInt64>();
|
||||
const CompressionCodecPtr BlockNumberColumn::compression_codec = getCompressionCodecForBlockNumberColumn();
|
||||
|
||||
}
|
16
src/Storages/BlockNumberColumn.h
Normal file
16
src/Storages/BlockNumberColumn.h
Normal file
@ -0,0 +1,16 @@
|
||||
#pragma once
|
||||
#include <Core/NamesAndTypes.h>
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <Compression/CompressionFactory.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
struct BlockNumberColumn
|
||||
{
|
||||
static const String name;
|
||||
static const DataTypePtr type;
|
||||
static const CompressionCodecPtr compression_codec;
|
||||
};
|
||||
|
||||
}
|
@ -30,11 +30,15 @@
|
||||
#include <Interpreters/TreeRewriter.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
#include <Interpreters/FunctionNameNormalizer.h>
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
CompressionCodecPtr getCompressionCodecDelta(UInt8 delta_bytes_size);
|
||||
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int NO_SUCH_COLUMN_IN_TABLE;
|
||||
@ -721,11 +725,13 @@ CompressionCodecPtr ColumnsDescription::getCodecOrDefault(const String & column_
|
||||
|
||||
CompressionCodecPtr ColumnsDescription::getCodecOrDefault(const String & column_name) const
|
||||
{
|
||||
assert (column_name != BlockNumberColumn::name);
|
||||
return getCodecOrDefault(column_name, CompressionCodecFactory::instance().getDefaultCodec());
|
||||
}
|
||||
|
||||
ASTPtr ColumnsDescription::getCodecDescOrDefault(const String & column_name, CompressionCodecPtr default_codec) const
|
||||
{
|
||||
assert (column_name != BlockNumberColumn::name);
|
||||
const auto it = columns.get<1>().find(column_name);
|
||||
|
||||
if (it == columns.get<1>().end() || !it->codec)
|
||||
|
@ -477,10 +477,6 @@ public:
|
||||
/// Moar hardening: this method is supposed to be used for debug assertions
|
||||
bool assertHasValidVersionMetadata() const;
|
||||
|
||||
/// Return hardlink count for part.
|
||||
/// Required for keep data on remote FS when part has shadow copies.
|
||||
UInt32 getNumberOfRefereneces() const;
|
||||
|
||||
/// True if the part supports lightweight delete mutate.
|
||||
bool supportLightweightDeleteMutate() const;
|
||||
|
||||
|
@ -62,7 +62,7 @@ const IMergeTreeReader::ValueSizeMap & IMergeTreeReader::getAvgValueSizeHints()
|
||||
return avg_value_size_hints;
|
||||
}
|
||||
|
||||
void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_evaluate_missing_defaults, size_t num_rows) const
|
||||
void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_evaluate_missing_defaults, size_t num_rows, size_t block_number) const
|
||||
{
|
||||
try
|
||||
{
|
||||
@ -71,7 +71,7 @@ void IMergeTreeReader::fillMissingColumns(Columns & res_columns, bool & should_e
|
||||
res_columns, num_rows,
|
||||
Nested::convertToSubcolumns(requested_columns),
|
||||
Nested::convertToSubcolumns(available_columns),
|
||||
partially_read_columns, storage_snapshot->metadata);
|
||||
partially_read_columns, storage_snapshot->metadata, block_number);
|
||||
|
||||
should_evaluate_missing_defaults = std::any_of(
|
||||
res_columns.begin(), res_columns.end(), [](const auto & column) { return column == nullptr; });
|
||||
|
@ -45,7 +45,7 @@ public:
|
||||
/// Add columns from ordered_names that are not present in the block.
|
||||
/// Missing columns are added in the order specified by ordered_names.
|
||||
/// num_rows is needed in case if all res_columns are nullptr.
|
||||
void fillMissingColumns(Columns & res_columns, bool & should_evaluate_missing_defaults, size_t num_rows) const;
|
||||
void fillMissingColumns(Columns & res_columns, bool & should_evaluate_missing_defaults, size_t num_rows, size_t block_number = 0) const;
|
||||
/// Evaluate defaulted columns if necessary.
|
||||
void evaluateMissingDefaults(Block additional_columns, Columns & res_columns) const;
|
||||
|
||||
|
@ -3,6 +3,7 @@
|
||||
#include <DataTypes/DataTypesNumber.h>
|
||||
#include <DataTypes/FieldToDataType.h>
|
||||
#include <DataTypes/getLeastSupertype.h>
|
||||
#include <DataTypes/Utils.h>
|
||||
#include <Interpreters/TreeRewriter.h>
|
||||
#include <Interpreters/ExpressionAnalyzer.h>
|
||||
#include <Interpreters/ExpressionActions.h>
|
||||
@ -1258,10 +1259,18 @@ bool KeyCondition::tryPrepareSetIndex(
|
||||
|
||||
const auto right_arg = func.getArgumentAt(1);
|
||||
|
||||
auto future_set = right_arg.tryGetPreparedSet(indexes_mapping, data_types);
|
||||
auto future_set = right_arg.tryGetPreparedSet();
|
||||
if (!future_set)
|
||||
return false;
|
||||
|
||||
const auto set_types = future_set->getTypes();
|
||||
size_t set_types_size = set_types.size();
|
||||
size_t indexes_mapping_size = indexes_mapping.size();
|
||||
|
||||
for (auto & index_mapping : indexes_mapping)
|
||||
if (index_mapping.tuple_index >= set_types_size)
|
||||
return false;
|
||||
|
||||
auto prepared_set = future_set->buildOrderedSetInplace(right_arg.getTreeContext().getQueryContext());
|
||||
if (!prepared_set)
|
||||
return false;
|
||||
@ -1270,11 +1279,72 @@ bool KeyCondition::tryPrepareSetIndex(
|
||||
if (!prepared_set->hasExplicitSetElements())
|
||||
return false;
|
||||
|
||||
prepared_set->checkColumnsNumber(left_args_count);
|
||||
for (size_t i = 0; i < indexes_mapping.size(); ++i)
|
||||
prepared_set->checkTypesEqual(indexes_mapping[i].tuple_index, data_types[i]);
|
||||
/** Try to convert set columns to primary key columns.
|
||||
* Example: SELECT id FROM test_table WHERE id IN (SELECT 1);
|
||||
* In this example table `id` column has type UInt64, Set column has type UInt8. To use index
|
||||
* we need to convert set column to primary key column.
|
||||
*/
|
||||
auto set_columns = prepared_set->getSetElements();
|
||||
assert(set_types_size == set_columns.size());
|
||||
|
||||
out.set_index = std::make_shared<MergeTreeSetIndex>(prepared_set->getSetElements(), std::move(indexes_mapping));
|
||||
for (size_t indexes_mapping_index = 0; indexes_mapping_index < indexes_mapping_size; ++indexes_mapping_index)
|
||||
{
|
||||
const auto & key_column_type = data_types[indexes_mapping_index];
|
||||
size_t set_element_index = indexes_mapping[indexes_mapping_index].tuple_index;
|
||||
auto set_element_type = set_types[set_element_index];
|
||||
auto set_column = set_columns[set_element_index];
|
||||
|
||||
if (canBeSafelyCasted(set_element_type, key_column_type))
|
||||
{
|
||||
set_columns[set_element_index] = castColumn({set_column, set_element_type, {}}, key_column_type);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (!key_column_type->canBeInsideNullable())
|
||||
return false;
|
||||
|
||||
const NullMap * set_column_null_map = nullptr;
|
||||
|
||||
if (isNullableOrLowCardinalityNullable(set_element_type))
|
||||
{
|
||||
if (WhichDataType(set_element_type).isLowCardinality())
|
||||
{
|
||||
set_element_type = removeLowCardinality(set_element_type);
|
||||
set_column = set_column->convertToFullColumnIfLowCardinality();
|
||||
}
|
||||
|
||||
set_element_type = removeNullable(set_element_type);
|
||||
const auto & set_column_nullable = assert_cast<const ColumnNullable &>(*set_column);
|
||||
set_column_null_map = &set_column_nullable.getNullMapData();
|
||||
set_column = set_column_nullable.getNestedColumnPtr();
|
||||
}
|
||||
|
||||
auto nullable_set_column = castColumnAccurateOrNull({set_column, set_element_type, {}}, key_column_type);
|
||||
const auto & nullable_set_column_typed = assert_cast<const ColumnNullable &>(*nullable_set_column);
|
||||
const auto & nullable_set_column_null_map = nullable_set_column_typed.getNullMapData();
|
||||
size_t nullable_set_column_null_map_size = nullable_set_column_null_map.size();
|
||||
|
||||
IColumn::Filter filter(nullable_set_column_null_map_size);
|
||||
|
||||
if (set_column_null_map)
|
||||
{
|
||||
for (size_t i = 0; i < nullable_set_column_null_map_size; ++i)
|
||||
filter[i] = (*set_column_null_map)[i] || !nullable_set_column_null_map[i];
|
||||
|
||||
set_column = nullable_set_column_typed.filter(filter, 0);
|
||||
}
|
||||
else
|
||||
{
|
||||
for (size_t i = 0; i < nullable_set_column_null_map_size; ++i)
|
||||
filter[i] = !nullable_set_column_null_map[i];
|
||||
|
||||
set_column = nullable_set_column_typed.getNestedColumn().filter(filter, 0);
|
||||
}
|
||||
|
||||
set_columns[set_element_index] = std::move(set_column);
|
||||
}
|
||||
|
||||
out.set_index = std::make_shared<MergeTreeSetIndex>(set_columns, std::move(indexes_mapping));
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -218,6 +218,14 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
|
||||
ctx->need_remove_expired_values = false;
|
||||
ctx->force_ttl = false;
|
||||
|
||||
if (supportsBlockNumberColumn(global_ctx) && !global_ctx->storage_columns.contains(BlockNumberColumn::name))
|
||||
{
|
||||
global_ctx->storage_columns.emplace_back(NameAndTypePair{BlockNumberColumn::name,BlockNumberColumn::type});
|
||||
global_ctx->all_column_names.emplace_back(BlockNumberColumn::name);
|
||||
global_ctx->gathering_columns.emplace_back(NameAndTypePair{BlockNumberColumn::name,BlockNumberColumn::type});
|
||||
global_ctx->gathering_column_names.emplace_back(BlockNumberColumn::name);
|
||||
}
|
||||
|
||||
SerializationInfo::Settings info_settings =
|
||||
{
|
||||
.ratio_of_defaults_for_sparse = global_ctx->data->getSettings()->ratio_of_defaults_for_sparse_serialization,
|
||||
@ -251,12 +259,12 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
|
||||
}
|
||||
}
|
||||
|
||||
global_ctx->new_data_part->setColumns(global_ctx->storage_columns, infos, global_ctx->metadata_snapshot->getMetadataVersion());
|
||||
|
||||
const auto & local_part_min_ttl = global_ctx->new_data_part->ttl_infos.part_min_ttl;
|
||||
if (local_part_min_ttl && local_part_min_ttl <= global_ctx->time_of_merge)
|
||||
ctx->need_remove_expired_values = true;
|
||||
|
||||
global_ctx->new_data_part->setColumns(global_ctx->storage_columns, infos, global_ctx->metadata_snapshot->getMetadataVersion());
|
||||
|
||||
if (ctx->need_remove_expired_values && global_ctx->ttl_merges_blocker->isCancelled())
|
||||
{
|
||||
LOG_INFO(ctx->log, "Part {} has values with expired TTL, but merges with TTL are cancelled.", global_ctx->new_data_part->name);
|
||||
@ -998,6 +1006,17 @@ void MergeTask::ExecuteAndFinalizeHorizontalPart::createMergedStream()
|
||||
|
||||
if (global_ctx->deduplicate)
|
||||
{
|
||||
/// We don't want to deduplicate by block number column
|
||||
/// so if deduplicate_by_columns is empty, add all columns except _block_number
|
||||
if (supportsBlockNumberColumn(global_ctx) && global_ctx->deduplicate_by_columns.empty())
|
||||
{
|
||||
for (const auto & col : global_ctx->merging_column_names)
|
||||
{
|
||||
if (col != BlockNumberColumn::name)
|
||||
global_ctx->deduplicate_by_columns.emplace_back(col);
|
||||
}
|
||||
}
|
||||
|
||||
if (DistinctSortedTransform::isApplicable(header, sort_description, global_ctx->deduplicate_by_columns))
|
||||
res_pipe.addTransform(std::make_shared<DistinctSortedTransform>(
|
||||
res_pipe.getHeader(), sort_description, SizeLimits(), 0 /*limit_hint*/, global_ctx->deduplicate_by_columns));
|
||||
|
@ -13,6 +13,7 @@
|
||||
#include <QueryPipeline/QueryPipeline.h>
|
||||
#include <Compression/CompressedReadBufferFromFile.h>
|
||||
#include <Common/filesystemHelpers.h>
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
|
||||
#include <memory>
|
||||
#include <list>
|
||||
@ -388,6 +389,12 @@ private:
|
||||
|
||||
Stages::iterator stages_iterator = stages.begin();
|
||||
|
||||
/// Check for persisting block number column
|
||||
static bool supportsBlockNumberColumn(GlobalRuntimeContextPtr global_ctx)
|
||||
{
|
||||
return global_ctx->data->getSettings()->allow_experimental_block_number_column && global_ctx->metadata_snapshot->getGroupByTTLs().empty();
|
||||
}
|
||||
|
||||
};
|
||||
|
||||
/// FIXME
|
||||
|
@ -78,6 +78,7 @@
|
||||
#include <Storages/VirtualColumnUtils.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartBuilder.h>
|
||||
#include <Storages/MutationCommands.h>
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
|
||||
#include <boost/range/algorithm_ext/erase.hpp>
|
||||
#include <boost/algorithm/string/join.hpp>
|
||||
@ -3730,7 +3731,7 @@ void MergeTreeData::checkPartDynamicColumns(MutableDataPartPtr & part, DataParts
|
||||
const auto & part_columns = part->getColumns();
|
||||
for (const auto & part_column : part_columns)
|
||||
{
|
||||
if (part_column.name == LightweightDeleteDescription::FILTER_COLUMN.name)
|
||||
if (part_column.name == LightweightDeleteDescription::FILTER_COLUMN.name || part_column.name == BlockNumberColumn::name)
|
||||
continue;
|
||||
|
||||
auto storage_column = columns.getPhysical(part_column.name);
|
||||
@ -8269,6 +8270,7 @@ NamesAndTypesList MergeTreeData::getVirtuals() const
|
||||
NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>()),
|
||||
NameAndTypePair("_part_offset", std::make_shared<DataTypeUInt64>()),
|
||||
LightweightDeleteDescription::FILTER_COLUMN,
|
||||
NameAndTypePair(BlockNumberColumn::name, BlockNumberColumn::type),
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -5,6 +5,7 @@
|
||||
#include <Interpreters/Context.h>
|
||||
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
|
||||
#include <Compression/CompressedReadBufferFromFile.h>
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
|
||||
|
||||
namespace DB
|
||||
@ -64,6 +65,12 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
|
||||
ordered_columns_list.sort([this](const auto & lhs, const auto & rhs)
|
||||
{ return *getColumnPosition(lhs.name) < *getColumnPosition(rhs.name); });
|
||||
|
||||
/// _block_number column is not added by user, but is persisted in a part after merge
|
||||
/// If _block_number is not present in the parts to be merged, then it won't have a position
|
||||
/// So check if its not present and add it at the end
|
||||
if (columns_list.contains(BlockNumberColumn::name) && !ordered_columns_list.contains(BlockNumberColumn::name))
|
||||
ordered_columns_list.emplace_back(NameAndTypePair{BlockNumberColumn::name, BlockNumberColumn::type});
|
||||
|
||||
return std::make_unique<MergeTreeDataPartWriterCompact>(
|
||||
shared_from_this(), ordered_columns_list, metadata_snapshot,
|
||||
indices_to_recalc, getMarksFileExtension(),
|
||||
|
@ -1,9 +1,12 @@
|
||||
#include <Storages/MergeTree/MergeTreeDataPartWriterCompact.h>
|
||||
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
|
||||
CompressionCodecPtr getCompressionCodecDelta(UInt8 delta_bytes_size);
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
@ -53,7 +56,14 @@ MergeTreeDataPartWriterCompact::MergeTreeDataPartWriterCompact(
|
||||
|
||||
const auto & storage_columns = metadata_snapshot->getColumns();
|
||||
for (const auto & column : columns_list)
|
||||
addStreams(column, storage_columns.getCodecDescOrDefault(column.name, default_codec));
|
||||
{
|
||||
ASTPtr compression;
|
||||
if (column.name == BlockNumberColumn::name)
|
||||
compression = BlockNumberColumn::compression_codec->getFullCodecDesc();
|
||||
else
|
||||
compression = storage_columns.getCodecDescOrDefault(column.name, default_codec);
|
||||
addStreams(column, compression);
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterCompact::addStreams(const NameAndTypePair & column, const ASTPtr & effective_codec_desc)
|
||||
|
@ -6,9 +6,12 @@
|
||||
#include <Common/escapeForFileName.h>
|
||||
#include <Columns/ColumnSparse.h>
|
||||
#include <Common/logger_useful.h>
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
|
||||
namespace DB
|
||||
{
|
||||
CompressionCodecPtr getCompressionCodecDelta(UInt8 delta_bytes_size);
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int LOGICAL_ERROR;
|
||||
@ -87,7 +90,14 @@ MergeTreeDataPartWriterWide::MergeTreeDataPartWriterWide(
|
||||
{
|
||||
const auto & columns = metadata_snapshot->getColumns();
|
||||
for (const auto & it : columns_list)
|
||||
addStreams(it, columns.getCodecDescOrDefault(it.name, default_codec));
|
||||
{
|
||||
ASTPtr compression;
|
||||
if (it.name == BlockNumberColumn::name)
|
||||
compression = BlockNumberColumn::compression_codec->getFullCodecDesc();
|
||||
else
|
||||
compression = columns.getCodecDescOrDefault(it.name, default_codec);
|
||||
addStreams(it, compression);
|
||||
}
|
||||
}
|
||||
|
||||
void MergeTreeDataPartWriterWide::addStreams(
|
||||
|
@ -46,7 +46,7 @@
|
||||
#include <Functions/IFunction.h>
|
||||
|
||||
#include <IO/WriteBufferFromOStream.h>
|
||||
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
#include <Storages/MergeTree/ApproximateNearestNeighborIndexesCommon.h>
|
||||
|
||||
namespace CurrentMetrics
|
||||
@ -1232,6 +1232,10 @@ static void selectColumnNames(
|
||||
{
|
||||
virt_column_names.push_back(name);
|
||||
}
|
||||
else if (name == BlockNumberColumn::name)
|
||||
{
|
||||
virt_column_names.push_back(name);
|
||||
}
|
||||
else if (name == "_part_uuid")
|
||||
{
|
||||
virt_column_names.push_back(name);
|
||||
|
@ -9,6 +9,7 @@
|
||||
#include <DataTypes/DataTypeUUID.h>
|
||||
#include <DataTypes/DataTypeArray.h>
|
||||
#include <Processors/Transforms/AggregatingTransform.h>
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
#include <city.h>
|
||||
|
||||
namespace DB
|
||||
@ -24,7 +25,8 @@ namespace ErrorCodes
|
||||
static void injectNonConstVirtualColumns(
|
||||
size_t rows,
|
||||
Block & block,
|
||||
const Names & virtual_columns);
|
||||
const Names & virtual_columns,
|
||||
MergeTreeReadTask * task = nullptr);
|
||||
|
||||
static void injectPartConstVirtualColumns(
|
||||
size_t rows,
|
||||
@ -247,7 +249,8 @@ namespace
|
||||
static void injectNonConstVirtualColumns(
|
||||
size_t rows,
|
||||
Block & block,
|
||||
const Names & virtual_columns)
|
||||
const Names & virtual_columns,
|
||||
MergeTreeReadTask * task)
|
||||
{
|
||||
VirtualColumnsInserter inserter(block);
|
||||
for (const auto & virtual_column_name : virtual_columns)
|
||||
@ -278,6 +281,24 @@ static void injectNonConstVirtualColumns(
|
||||
|
||||
inserter.insertUInt8Column(column, virtual_column_name);
|
||||
}
|
||||
|
||||
if (virtual_column_name == BlockNumberColumn::name)
|
||||
{
|
||||
ColumnPtr column;
|
||||
if (rows)
|
||||
{
|
||||
size_t value = 0;
|
||||
if (task)
|
||||
{
|
||||
value = task->getInfo().data_part ? task->getInfo().data_part->info.min_block : 0;
|
||||
}
|
||||
column = BlockNumberColumn::type->createColumnConst(rows, value)->convertToFullColumnIfConst();
|
||||
}
|
||||
else
|
||||
column = BlockNumberColumn::type->createColumn();
|
||||
|
||||
inserter.insertUInt64Column(column, virtual_column_name);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -368,7 +389,7 @@ void MergeTreeSelectProcessor::injectVirtualColumns(
|
||||
{
|
||||
/// First add non-const columns that are filled by the range reader and then const columns that we will fill ourselves.
|
||||
/// Note that the order is important: virtual columns filled by the range reader must go first
|
||||
injectNonConstVirtualColumns(row_count, block, virtual_columns);
|
||||
injectNonConstVirtualColumns(row_count, block, virtual_columns,task);
|
||||
injectPartConstVirtualColumns(row_count, block, task, partition_value_type, virtual_columns);
|
||||
}
|
||||
|
||||
|
@ -176,7 +176,7 @@ try
|
||||
current_mark += (rows_to_read == rows_read);
|
||||
|
||||
bool should_evaluate_missing_defaults = false;
|
||||
reader->fillMissingColumns(columns, should_evaluate_missing_defaults, rows_read);
|
||||
reader->fillMissingColumns(columns, should_evaluate_missing_defaults, rows_read, data_part->info.min_block);
|
||||
|
||||
if (should_evaluate_missing_defaults)
|
||||
{
|
||||
|
@ -171,7 +171,8 @@ struct Settings;
|
||||
M(UInt64, part_moves_between_shards_delay_seconds, 30, "Time to wait before/after moving parts between shards.", 0) \
|
||||
M(Bool, allow_remote_fs_zero_copy_replication, false, "Don't use this setting in production, because it is not ready.", 0) \
|
||||
M(String, remote_fs_zero_copy_zookeeper_path, "/clickhouse/zero_copy", "ZooKeeper path for zero-copy table-independent info.", 0) \
|
||||
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
|
||||
M(Bool, remote_fs_zero_copy_path_compatible_mode, false, "Run zero-copy in compatible mode during conversion process.", 0) \
|
||||
M(Bool, allow_experimental_block_number_column, false, "Enable persisting column _block_number for each row.", 0) \
|
||||
\
|
||||
/** Compress marks and primary key. */ \
|
||||
M(Bool, compress_marks, true, "Marks support compression, reduce mark file size and speed up network transmission.", 0) \
|
||||
|
@ -351,64 +351,6 @@ FutureSetPtr RPNBuilderTreeNode::tryGetPreparedSet(const DataTypes & data_types)
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
FutureSetPtr RPNBuilderTreeNode::tryGetPreparedSet(
|
||||
const std::vector<MergeTreeSetIndex::KeyTuplePositionMapping> & indexes_mapping,
|
||||
const DataTypes & data_types) const
|
||||
{
|
||||
const auto & prepared_sets = getTreeContext().getPreparedSets();
|
||||
|
||||
/// We have `PreparedSetKey::forLiteral` but it is useless here as we don't have enough information
|
||||
/// about types in left argument of the IN operator. Instead, we manually iterate through all the sets
|
||||
/// and find the one for the right arg based on the AST structure (getTreeHash), after that we check
|
||||
/// that the types it was prepared with are compatible with the types of the primary key.
|
||||
auto types_match = [&indexes_mapping, &data_types](const DataTypes & set_types)
|
||||
{
|
||||
assert(indexes_mapping.size() == data_types.size());
|
||||
|
||||
for (size_t i = 0; i < indexes_mapping.size(); ++i)
|
||||
{
|
||||
if (indexes_mapping[i].tuple_index >= set_types.size())
|
||||
return false;
|
||||
|
||||
auto lhs = removeNullable(recursiveRemoveLowCardinality(data_types[i]));
|
||||
auto rhs = removeNullable(recursiveRemoveLowCardinality(set_types[indexes_mapping[i].tuple_index]));
|
||||
|
||||
if (!lhs->equals(*rhs))
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
};
|
||||
|
||||
if (prepared_sets && ast_node)
|
||||
{
|
||||
if (ast_node->as<ASTSubquery>() || ast_node->as<ASTTableIdentifier>())
|
||||
return prepared_sets->findSubquery(ast_node->getTreeHash());
|
||||
|
||||
auto tree_hash = ast_node->getTreeHash();
|
||||
const auto & sets = prepared_sets->getSetsFromTuple();
|
||||
auto it = sets.find(tree_hash);
|
||||
if (it == sets.end())
|
||||
return nullptr;
|
||||
|
||||
for (const auto & future_set : it->second)
|
||||
if (types_match(future_set->getTypes()))
|
||||
return future_set;
|
||||
}
|
||||
else
|
||||
{
|
||||
const auto * node_without_alias = getNodeWithoutAlias(dag_node);
|
||||
if (node_without_alias->column)
|
||||
{
|
||||
auto future_set = tryGetSetFromDAGNode(node_without_alias);
|
||||
if (types_match(future_set->getTypes()))
|
||||
return future_set;
|
||||
}
|
||||
}
|
||||
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
RPNBuilderFunctionTreeNode RPNBuilderTreeNode::toFunctionNode() const
|
||||
{
|
||||
if (!isFunction())
|
||||
|
@ -116,11 +116,6 @@ public:
|
||||
/// Try get prepared set from node that match data types
|
||||
FutureSetPtr tryGetPreparedSet(const DataTypes & data_types) const;
|
||||
|
||||
/// Try get prepared set from node that match indexes mapping and data types
|
||||
FutureSetPtr tryGetPreparedSet(
|
||||
const std::vector<MergeTreeSetIndex::KeyTuplePositionMapping> & indexes_mapping,
|
||||
const DataTypes & data_types) const;
|
||||
|
||||
/** Convert node to function node.
|
||||
* Node must be function before calling these method, otherwise exception is thrown.
|
||||
*/
|
||||
|
@ -150,9 +150,20 @@ static IMergeTreeDataPart::Checksums checkDataPart(
|
||||
|
||||
if (data_part_storage.exists(IMergeTreeDataPart::SERIALIZATION_FILE_NAME))
|
||||
{
|
||||
auto serialization_file = data_part_storage.readFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, read_settings, std::nullopt, std::nullopt);
|
||||
SerializationInfo::Settings settings{ratio_of_defaults, false};
|
||||
serialization_infos = SerializationInfoByName::readJSON(columns_txt, settings, *serialization_file);
|
||||
try
|
||||
{
|
||||
auto serialization_file = data_part_storage.readFile(IMergeTreeDataPart::SERIALIZATION_FILE_NAME, read_settings, std::nullopt, std::nullopt);
|
||||
SerializationInfo::Settings settings{ratio_of_defaults, false};
|
||||
serialization_infos = SerializationInfoByName::readJSON(columns_txt, settings, *serialization_file);
|
||||
}
|
||||
catch (const Poco::Exception & ex)
|
||||
{
|
||||
throw Exception(ErrorCodes::CORRUPTED_DATA, "Failed to load {}, with error {}", IMergeTreeDataPart::SERIALIZATION_FILE_NAME, ex.message());
|
||||
}
|
||||
catch (...)
|
||||
{
|
||||
throw;
|
||||
}
|
||||
}
|
||||
|
||||
auto get_serialization = [&serialization_infos](const auto & column)
|
||||
|
@ -102,6 +102,8 @@
|
||||
#include <IO/Operators.h>
|
||||
#include <IO/ConnectionTimeouts.h>
|
||||
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
|
||||
#include <memory>
|
||||
#include <filesystem>
|
||||
#include <optional>
|
||||
@ -298,6 +300,7 @@ NamesAndTypesList StorageDistributed::getVirtuals() const
|
||||
NameAndTypePair("_sample_factor", std::make_shared<DataTypeFloat64>()),
|
||||
NameAndTypePair("_part_offset", std::make_shared<DataTypeUInt64>()),
|
||||
NameAndTypePair("_row_exists", std::make_shared<DataTypeUInt8>()),
|
||||
NameAndTypePair(BlockNumberColumn::name, BlockNumberColumn::type),
|
||||
NameAndTypePair("_shard_num", std::make_shared<DataTypeUInt32>()), /// deprecated
|
||||
};
|
||||
}
|
||||
|
@ -33,6 +33,7 @@
|
||||
#include <Backups/IBackup.h>
|
||||
#include <Backups/RestorerFromBackup.h>
|
||||
#include <Disks/TemporaryFileOnDisk.h>
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
|
||||
#include <cassert>
|
||||
#include <chrono>
|
||||
@ -45,6 +46,8 @@
|
||||
namespace DB
|
||||
{
|
||||
|
||||
CompressionCodecPtr getCompressionCodecDelta(UInt8 delta_bytes_size);
|
||||
|
||||
namespace ErrorCodes
|
||||
{
|
||||
extern const int TIMEOUT_EXCEEDED;
|
||||
@ -452,10 +455,15 @@ void LogSink::writeData(const NameAndTypePair & name_and_type, const IColumn & c
|
||||
const auto & data_file = *data_file_it->second;
|
||||
const auto & columns = metadata_snapshot->getColumns();
|
||||
|
||||
CompressionCodecPtr compression;
|
||||
if (name_and_type.name == BlockNumberColumn::name)
|
||||
compression = BlockNumberColumn::compression_codec;
|
||||
else
|
||||
compression = columns.getCodecOrDefault(name_and_type.name);
|
||||
|
||||
it = streams.try_emplace(data_file.name, storage.disk, data_file.path,
|
||||
storage.file_checker.getFileSize(data_file.path),
|
||||
columns.getCodecOrDefault(name_and_type.name),
|
||||
storage.max_compress_block_size).first;
|
||||
compression, storage.max_compress_block_size).first;
|
||||
}
|
||||
|
||||
auto & stream = it->second;
|
||||
|
@ -2231,10 +2231,13 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
|
||||
|
||||
results.emplace_back(part->name, true, "Checksums recounted and written to disk.");
|
||||
}
|
||||
catch (const Exception & ex)
|
||||
catch (...)
|
||||
{
|
||||
if (isRetryableException(std::current_exception()))
|
||||
throw;
|
||||
|
||||
tryLogCurrentException(log, __PRETTY_FUNCTION__);
|
||||
results.emplace_back(part->name, false, "Check of part finished with error: '" + ex.message() + "'");
|
||||
results.emplace_back(part->name, false, "Check of part finished with error: '" + getCurrentExceptionMessage(false) + "'");
|
||||
}
|
||||
}
|
||||
else
|
||||
@ -2244,9 +2247,12 @@ CheckResults StorageMergeTree::checkData(const ASTPtr & query, ContextPtr local_
|
||||
checkDataPart(part, true);
|
||||
results.emplace_back(part->name, true, "");
|
||||
}
|
||||
catch (const Exception & ex)
|
||||
catch (...)
|
||||
{
|
||||
results.emplace_back(part->name, false, ex.message());
|
||||
if (isRetryableException(std::current_exception()))
|
||||
throw;
|
||||
|
||||
results.emplace_back(part->name, false, getCurrentExceptionMessage(false));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,5 +1,6 @@
|
||||
#include <Storages/StorageSnapshot.h>
|
||||
#include <Storages/LightweightDeleteDescription.h>
|
||||
#include <Storages/BlockNumberColumn.h>
|
||||
#include <Storages/IStorage.h>
|
||||
#include <DataTypes/ObjectUtils.h>
|
||||
#include <DataTypes/NestedUtils.h>
|
||||
@ -24,6 +25,7 @@ void StorageSnapshot::init()
|
||||
|
||||
if (storage.hasLightweightDeletedMask())
|
||||
system_columns[LightweightDeleteDescription::FILTER_COLUMN.name] = LightweightDeleteDescription::FILTER_COLUMN.type;
|
||||
system_columns[BlockNumberColumn::name] = BlockNumberColumn::type;
|
||||
}
|
||||
|
||||
NamesAndTypesList StorageSnapshot::getColumns(const GetColumnsOptions & options) const
|
||||
|
@ -210,3 +210,12 @@ def download_performance_build(check_name, reports_path, result_path):
|
||||
result_path,
|
||||
lambda x: x.endswith("performance.tar.zst"),
|
||||
)
|
||||
|
||||
|
||||
def download_fuzzers(check_name, reports_path, result_path):
|
||||
download_builds_filter(
|
||||
check_name,
|
||||
reports_path,
|
||||
result_path,
|
||||
lambda x: x.endswith(("_fuzzer", ".dict", ".options", "_seed_corpus.zip")),
|
||||
)
|
||||
|
@ -282,6 +282,7 @@ CI_CONFIG = CiConfig(
|
||||
"SQLancer (debug)": TestConfig("package_debug"),
|
||||
"Sqllogic test (release)": TestConfig("package_release"),
|
||||
"SQLTest": TestConfig("package_release"),
|
||||
"libFuzzer tests": TestConfig("fuzzers"),
|
||||
},
|
||||
)
|
||||
CI_CONFIG.validate()
|
||||
|
@ -141,16 +141,6 @@ STATUS_ICON_MAP = defaultdict(
|
||||
)
|
||||
|
||||
|
||||
def update_pr_status_label(pr: PullRequest, status: str) -> None:
|
||||
new_label = "pr-status-" + STATUS_ICON_MAP[status]
|
||||
for label in pr.get_labels():
|
||||
if label.name == new_label:
|
||||
return
|
||||
if label.name.startswith("pr-status-"):
|
||||
pr.remove_from_labels(label.name)
|
||||
pr.add_to_labels(new_label)
|
||||
|
||||
|
||||
def set_status_comment(commit: Commit, pr_info: PRInfo) -> None:
|
||||
"""It adds or updates the comment status to all Pull Requests but for release
|
||||
one, so the method does nothing for simple pushes and pull requests with
|
||||
@ -190,8 +180,6 @@ def set_status_comment(commit: Commit, pr_info: PRInfo) -> None:
|
||||
comment = ic
|
||||
break
|
||||
|
||||
update_pr_status_label(pr, get_worst_state(statuses))
|
||||
|
||||
if comment is None:
|
||||
pr.create_issue_comment(comment_body)
|
||||
return
|
||||
|
190
tests/ci/libfuzzer_test_check.py
Normal file
190
tests/ci/libfuzzer_test_check.py
Normal file
@ -0,0 +1,190 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
import atexit
|
||||
import zipfile
|
||||
from pathlib import Path
|
||||
from typing import List
|
||||
|
||||
from github import Github
|
||||
|
||||
from build_download_helper import download_fuzzers
|
||||
from clickhouse_helper import (
|
||||
CiLogsCredentials,
|
||||
)
|
||||
from commit_status_helper import (
|
||||
RerunHelper,
|
||||
get_commit,
|
||||
update_mergeable_check,
|
||||
)
|
||||
from docker_pull_helper import DockerImage, get_image_with_version
|
||||
|
||||
from env_helper import TEMP_PATH, REPO_COPY, REPORTS_PATH
|
||||
from get_robot_token import get_best_robot_token
|
||||
from pr_info import PRInfo
|
||||
from report import TestResults
|
||||
|
||||
from stopwatch import Stopwatch
|
||||
|
||||
from tee_popen import TeePopen
|
||||
|
||||
|
||||
NO_CHANGES_MSG = "Nothing to run"
|
||||
|
||||
|
||||
def get_additional_envs(check_name, run_by_hash_num, run_by_hash_total):
|
||||
result = []
|
||||
if "DatabaseReplicated" in check_name:
|
||||
result.append("USE_DATABASE_REPLICATED=1")
|
||||
if "DatabaseOrdinary" in check_name:
|
||||
result.append("USE_DATABASE_ORDINARY=1")
|
||||
if "wide parts enabled" in check_name:
|
||||
result.append("USE_POLYMORPHIC_PARTS=1")
|
||||
if "ParallelReplicas" in check_name:
|
||||
result.append("USE_PARALLEL_REPLICAS=1")
|
||||
if "s3 storage" in check_name:
|
||||
result.append("USE_S3_STORAGE_FOR_MERGE_TREE=1")
|
||||
if "analyzer" in check_name:
|
||||
result.append("USE_NEW_ANALYZER=1")
|
||||
|
||||
if run_by_hash_total != 0:
|
||||
result.append(f"RUN_BY_HASH_NUM={run_by_hash_num}")
|
||||
result.append(f"RUN_BY_HASH_TOTAL={run_by_hash_total}")
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def get_run_command(
|
||||
fuzzers_path: Path,
|
||||
repo_path: Path,
|
||||
result_path: Path,
|
||||
kill_timeout: int,
|
||||
additional_envs: List[str],
|
||||
ci_logs_args: str,
|
||||
image: DockerImage,
|
||||
) -> str:
|
||||
additional_options = ["--hung-check"]
|
||||
additional_options.append("--print-time")
|
||||
|
||||
additional_options_str = (
|
||||
'-e ADDITIONAL_OPTIONS="' + " ".join(additional_options) + '"'
|
||||
)
|
||||
|
||||
envs = [
|
||||
f"-e MAX_RUN_TIME={int(0.9 * kill_timeout)}",
|
||||
# a static link, don't use S3_URL or S3_DOWNLOAD
|
||||
'-e S3_URL="https://s3.amazonaws.com/clickhouse-datasets"',
|
||||
]
|
||||
|
||||
envs += [f"-e {e}" for e in additional_envs]
|
||||
|
||||
env_str = " ".join(envs)
|
||||
|
||||
return (
|
||||
f"docker run "
|
||||
f"{ci_logs_args} "
|
||||
f"--workdir=/fuzzers "
|
||||
f"--volume={fuzzers_path}:/fuzzers "
|
||||
f"--volume={repo_path}/tests:/usr/share/clickhouse-test "
|
||||
f"--volume={result_path}:/test_output "
|
||||
f"--cap-add=SYS_PTRACE {env_str} {additional_options_str} {image}"
|
||||
)
|
||||
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("check_name")
|
||||
parser.add_argument("kill_timeout", type=int)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main():
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
|
||||
stopwatch = Stopwatch()
|
||||
|
||||
temp_path = Path(TEMP_PATH)
|
||||
repo_path = Path(REPO_COPY)
|
||||
reports_path = REPORTS_PATH
|
||||
|
||||
args = parse_args()
|
||||
check_name = args.check_name
|
||||
kill_timeout = args.kill_timeout
|
||||
|
||||
gh = Github(get_best_robot_token(), per_page=100)
|
||||
pr_info = PRInfo()
|
||||
commit = get_commit(gh, pr_info.sha)
|
||||
atexit.register(update_mergeable_check, gh, pr_info, check_name)
|
||||
|
||||
temp_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
if "RUN_BY_HASH_NUM" in os.environ:
|
||||
run_by_hash_num = int(os.getenv("RUN_BY_HASH_NUM", "0"))
|
||||
run_by_hash_total = int(os.getenv("RUN_BY_HASH_TOTAL", "0"))
|
||||
check_name_with_group = (
|
||||
check_name + f" [{run_by_hash_num + 1}/{run_by_hash_total}]"
|
||||
)
|
||||
else:
|
||||
run_by_hash_num = 0
|
||||
run_by_hash_total = 0
|
||||
check_name_with_group = check_name
|
||||
|
||||
rerun_helper = RerunHelper(commit, check_name_with_group)
|
||||
if rerun_helper.is_already_finished_by_status():
|
||||
logging.info("Check is already finished according to github status, exiting")
|
||||
sys.exit(0)
|
||||
|
||||
docker_image = get_image_with_version(reports_path, "clickhouse/libfuzzer")
|
||||
|
||||
fuzzers_path = temp_path / "fuzzers"
|
||||
fuzzers_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
download_fuzzers(check_name, reports_path, fuzzers_path)
|
||||
|
||||
for file in os.listdir(fuzzers_path):
|
||||
if file.endswith("_fuzzer"):
|
||||
os.chmod(fuzzers_path / file, 0o777)
|
||||
elif file.endswith("_seed_corpus.zip"):
|
||||
corpus_path = fuzzers_path / (file.removesuffix("_seed_corpus.zip") + ".in")
|
||||
zipfile.ZipFile(fuzzers_path / file, "r").extractall(corpus_path)
|
||||
|
||||
result_path = temp_path / "result_path"
|
||||
result_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
run_log_path = result_path / "run.log"
|
||||
|
||||
additional_envs = get_additional_envs(
|
||||
check_name, run_by_hash_num, run_by_hash_total
|
||||
)
|
||||
|
||||
ci_logs_credentials = CiLogsCredentials(Path(temp_path) / "export-logs-config.sh")
|
||||
ci_logs_args = ci_logs_credentials.get_docker_arguments(
|
||||
pr_info, stopwatch.start_time_str, check_name
|
||||
)
|
||||
|
||||
run_command = get_run_command(
|
||||
fuzzers_path,
|
||||
repo_path,
|
||||
result_path,
|
||||
kill_timeout,
|
||||
additional_envs,
|
||||
ci_logs_args,
|
||||
docker_image,
|
||||
)
|
||||
logging.info("Going to run libFuzzer tests: %s", run_command)
|
||||
|
||||
with TeePopen(run_command, run_log_path) as process:
|
||||
retcode = process.wait()
|
||||
if retcode == 0:
|
||||
logging.info("Run successfully")
|
||||
else:
|
||||
logging.info("Run failed")
|
||||
|
||||
sys.exit(0)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
@ -55,6 +55,7 @@ class TeePopen:
|
||||
stderr=STDOUT,
|
||||
stdout=PIPE,
|
||||
bufsize=1,
|
||||
errors="backslashreplace",
|
||||
)
|
||||
if self.timeout is not None and self.timeout > 0:
|
||||
t = Thread(target=self._check_timeout)
|
||||
|
28
tests/fuzz/build.sh
Executable file
28
tests/fuzz/build.sh
Executable file
@ -0,0 +1,28 @@
|
||||
#!/bin/bash -eu
|
||||
|
||||
# copy fuzzer options and dictionaries
|
||||
cp $SRC/tests/fuzz/*.dict $OUT/
|
||||
cp $SRC/tests/fuzz/*.options $OUT/
|
||||
|
||||
# prepare corpus dirs
|
||||
mkdir -p $BIN/tests/fuzz/lexer_fuzzer.in/
|
||||
mkdir -p $BIN/tests/fuzz/select_parser_fuzzer.in/
|
||||
mkdir -p $BIN/tests/fuzz/create_parser_fuzzer.in/
|
||||
mkdir -p $BIN/tests/fuzz/execute_query_fuzzer.in/
|
||||
|
||||
# prepare corpus
|
||||
cp $SRC/tests/queries/0_stateless/*.sql $BIN/tests/fuzz/lexer_fuzzer.in/
|
||||
cp $SRC/tests/queries/0_stateless/*.sql $BIN/tests/fuzz/select_parser_fuzzer.in/
|
||||
cp $SRC/tests/queries/0_stateless/*.sql $BIN/tests/fuzz/create_parser_fuzzer.in/
|
||||
cp $SRC/tests/queries/0_stateless/*.sql $BIN/tests/fuzz/execute_query_fuzzer.in/
|
||||
cp $SRC/tests/queries/1_stateful/*.sql $BIN/tests/fuzz/lexer_fuzzer.in/
|
||||
cp $SRC/tests/queries/1_stateful/*.sql $BIN/tests/fuzz/select_parser_fuzzer.in/
|
||||
cp $SRC/tests/queries/1_stateful/*.sql $BIN/tests/fuzz/create_parser_fuzzer.in/
|
||||
cp $SRC/tests/queries/1_stateful/*.sql $BIN/tests/fuzz/execute_query_fuzzer.in/
|
||||
|
||||
# build corpus archives
|
||||
cd $BIN/tests/fuzz
|
||||
for dir in *_fuzzer.in; do
|
||||
fuzzer=$(basename $dir .in)
|
||||
zip -rj "$OUT/${fuzzer}_seed_corpus.zip" "${dir}/"
|
||||
done
|
@ -1096,6 +1096,7 @@ def test_stop_other_host_during_backup(kill):
|
||||
if status == "BACKUP_CREATED":
|
||||
node1.query("DROP TABLE tbl ON CLUSTER 'cluster' SYNC")
|
||||
node1.query(f"RESTORE TABLE tbl ON CLUSTER 'cluster' FROM {backup_name}")
|
||||
node1.query("SYSTEM SYNC REPLICA tbl")
|
||||
assert node1.query("SELECT * FROM tbl ORDER BY x") == TSV([3, 5])
|
||||
elif status == "BACKUP_FAILED":
|
||||
assert not os.path.exists(
|
||||
|
@ -109,21 +109,15 @@ def test_check_normal_table_corruption(started_cluster):
|
||||
|
||||
corrupt_data_part_on_disk(node1, "non_replicated_mt", "201902_1_1_0")
|
||||
|
||||
assert (
|
||||
node1.query(
|
||||
"CHECK TABLE non_replicated_mt",
|
||||
settings={"check_query_single_value_result": 0},
|
||||
).strip()
|
||||
== "201902_1_1_0\t0\tCannot read all data. Bytes read: 2. Bytes expected: 25."
|
||||
)
|
||||
assert node1.query(
|
||||
"CHECK TABLE non_replicated_mt",
|
||||
settings={"check_query_single_value_result": 0},
|
||||
).strip().split("\t")[0:2] == ["201902_1_1_0", "0"]
|
||||
|
||||
assert (
|
||||
node1.query(
|
||||
"CHECK TABLE non_replicated_mt",
|
||||
settings={"check_query_single_value_result": 0},
|
||||
).strip()
|
||||
== "201902_1_1_0\t0\tCannot read all data. Bytes read: 2. Bytes expected: 25."
|
||||
)
|
||||
assert node1.query(
|
||||
"CHECK TABLE non_replicated_mt",
|
||||
settings={"check_query_single_value_result": 0},
|
||||
).strip().split("\t")[0:2] == ["201902_1_1_0", "0"]
|
||||
|
||||
node1.query(
|
||||
"INSERT INTO non_replicated_mt VALUES (toDate('2019-01-01'), 1, 10), (toDate('2019-01-01'), 2, 12)"
|
||||
@ -141,13 +135,10 @@ def test_check_normal_table_corruption(started_cluster):
|
||||
|
||||
remove_checksums_on_disk(node1, "non_replicated_mt", "201901_2_2_0")
|
||||
|
||||
assert (
|
||||
node1.query(
|
||||
"CHECK TABLE non_replicated_mt PARTITION 201901",
|
||||
settings={"check_query_single_value_result": 0},
|
||||
)
|
||||
== "201901_2_2_0\t0\tCheck of part finished with error: \\'Cannot read all data. Bytes read: 2. Bytes expected: 25.\\'\n"
|
||||
)
|
||||
assert node1.query(
|
||||
"CHECK TABLE non_replicated_mt PARTITION 201901",
|
||||
settings={"check_query_single_value_result": 0},
|
||||
).strip().split("\t")[0:2] == ["201901_2_2_0", "0"]
|
||||
|
||||
|
||||
def test_check_replicated_table_simple(started_cluster):
|
||||
|
@ -95,7 +95,14 @@ def create_tables(cluster, table_name):
|
||||
return "60\t0\t59\t1770\n"
|
||||
|
||||
|
||||
def test_read_equally_from_each_replica(start_cluster):
|
||||
@pytest.mark.parametrize(
|
||||
"prefer_localhost_replica",
|
||||
[
|
||||
pytest.param(0),
|
||||
pytest.param(1),
|
||||
],
|
||||
)
|
||||
def test_read_equally_from_each_replica(start_cluster, prefer_localhost_replica):
|
||||
"""create and populate table in special way (see create_table()),
|
||||
so parallel replicas will read equal number of rows from each replica
|
||||
"""
|
||||
@ -110,7 +117,7 @@ def test_read_equally_from_each_replica(start_cluster):
|
||||
f"SELECT count(), min(key), max(key), sum(key) FROM {table_name}_d",
|
||||
settings={
|
||||
"allow_experimental_parallel_reading_from_replicas": 2,
|
||||
"prefer_localhost_replica": 0,
|
||||
"prefer_localhost_replica": prefer_localhost_replica,
|
||||
"max_parallel_replicas": 3,
|
||||
"use_hedged_requests": 0,
|
||||
},
|
||||
|
@ -106,10 +106,18 @@ def create_tables(cluster, table_name):
|
||||
pytest.param("test_single_shard_multiple_replicas", 3, 0),
|
||||
pytest.param("test_single_shard_multiple_replicas", 4, 0),
|
||||
pytest.param("test_single_shard_multiple_replicas", 10, 0),
|
||||
pytest.param("test_single_shard_multiple_replicas", 2, 1),
|
||||
pytest.param("test_single_shard_multiple_replicas", 3, 1),
|
||||
pytest.param("test_single_shard_multiple_replicas", 4, 1),
|
||||
pytest.param("test_single_shard_multiple_replicas", 10, 1),
|
||||
pytest.param("test_multiple_shards_multiple_replicas", 2, 0),
|
||||
pytest.param("test_multiple_shards_multiple_replicas", 3, 0),
|
||||
pytest.param("test_multiple_shards_multiple_replicas", 4, 0),
|
||||
pytest.param("test_multiple_shards_multiple_replicas", 10, 0),
|
||||
pytest.param("test_multiple_shards_multiple_replicas", 2, 1),
|
||||
pytest.param("test_multiple_shards_multiple_replicas", 3, 1),
|
||||
pytest.param("test_multiple_shards_multiple_replicas", 4, 1),
|
||||
pytest.param("test_multiple_shards_multiple_replicas", 10, 1),
|
||||
],
|
||||
)
|
||||
def test_parallel_replicas_over_distributed(
|
||||
|
@ -2,3 +2,6 @@
|
||||
2018-01-01 1 1
|
||||
2018-01-01 2 2
|
||||
2018-01-01 2 2
|
||||
== (Replicas) Test optimize ==
|
||||
d2 1 0
|
||||
d4 1 0
|
||||
|
@ -1,10 +1,30 @@
|
||||
set optimize_on_insert = 0;
|
||||
|
||||
drop table if exists tab_00577;
|
||||
create table tab_00577 (date Date, version UInt64, val UInt64) engine = ReplacingMergeTree(version) partition by date order by date settings enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 1, vertical_merge_algorithm_min_columns_to_activate = 0;
|
||||
create table tab_00577 (date Date, version UInt64, val UInt64) engine = ReplacingMergeTree(version) partition by date order by date settings enable_vertical_merge_algorithm = 1,
|
||||
vertical_merge_algorithm_min_rows_to_activate = 0, vertical_merge_algorithm_min_columns_to_activate = 0, min_rows_for_wide_part = 0,
|
||||
min_bytes_for_wide_part = 0;
|
||||
insert into tab_00577 values ('2018-01-01', 2, 2), ('2018-01-01', 1, 1);
|
||||
insert into tab_00577 values ('2018-01-01', 0, 0);
|
||||
select * from tab_00577 order by version;
|
||||
OPTIMIZE TABLE tab_00577;
|
||||
OPTIMIZE TABLE tab_00577 FINAL CLEANUP;
|
||||
select * from tab_00577;
|
||||
drop table tab_00577;
|
||||
|
||||
|
||||
DROP TABLE IF EXISTS testCleanupR1;
|
||||
CREATE TABLE testCleanupR1 (uid String, version UInt32, is_deleted UInt8)
|
||||
ENGINE = ReplicatedReplacingMergeTree('/clickhouse/{database}/tables/test_cleanup/', 'r1', version, is_deleted)
|
||||
ORDER BY uid SETTINGS enable_vertical_merge_algorithm = 1, vertical_merge_algorithm_min_rows_to_activate = 0, vertical_merge_algorithm_min_columns_to_activate = 0, min_rows_for_wide_part = 0,
|
||||
min_bytes_for_wide_part = 0;
|
||||
INSERT INTO testCleanupR1 (*) VALUES ('d1', 1, 0),('d2', 1, 0),('d3', 1, 0),('d4', 1, 0);
|
||||
INSERT INTO testCleanupR1 (*) VALUES ('d3', 2, 1);
|
||||
INSERT INTO testCleanupR1 (*) VALUES ('d1', 2, 1);
|
||||
SYSTEM SYNC REPLICA testCleanupR1; -- Avoid "Cannot select parts for optimization: Entry for part all_2_2_0 hasn't been read from the replication log yet"
|
||||
|
||||
OPTIMIZE TABLE testCleanupR1 FINAL CLEANUP;
|
||||
|
||||
-- Only d3 to d5 remain
|
||||
SELECT '== (Replicas) Test optimize ==';
|
||||
SELECT * FROM testCleanupR1 order by uid;
|
||||
DROP TABLE IF EXISTS testCleanupR1
|
@ -1,4 +1,3 @@
|
||||
1
|
||||
0
|
||||
---
|
||||
1
|
||||
|
@ -3,13 +3,13 @@
|
||||
|
||||
SYSTEM DROP QUERY CACHE;
|
||||
|
||||
-- rand() is non-deterministic, with default settings no entry in the query cache should be created
|
||||
SELECT COUNT(rand(1)) SETTINGS use_query_cache = true;
|
||||
-- rand() is non-deterministic, the query is rejected by default
|
||||
SELECT COUNT(rand(1)) SETTINGS use_query_cache = true; -- { serverError CANNOT_USE_QUERY_CACHE_WITH_NONDETERMINISTIC_FUNCTIONS }
|
||||
SELECT COUNT(*) FROM system.query_cache;
|
||||
|
||||
SELECT '---';
|
||||
|
||||
-- But an entry can be forced using a setting
|
||||
-- Force caching using a setting
|
||||
SELECT COUNT(RAND(1)) SETTINGS use_query_cache = true, query_cache_store_results_of_queries_with_nondeterministic_functions = true;
|
||||
SELECT COUNT(*) FROM system.query_cache;
|
||||
|
||||
|
@ -0,0 +1,41 @@
|
||||
*** BEFORE MUTATION BEFORE MERGE ***
|
||||
1 1 1 all_1_1_0
|
||||
2 2 1 all_1_1_0
|
||||
3 3 1 all_1_1_0
|
||||
4 4 2 all_2_2_0
|
||||
5 5 2 all_2_2_0
|
||||
6 6 2 all_2_2_0
|
||||
*** AFTER MUTATION BEFORE MERGE ***
|
||||
1 0 1 all_1_1_0_3
|
||||
2 0 1 all_1_1_0_3
|
||||
3 0 1 all_1_1_0_3
|
||||
4 4 2 all_2_2_0_3
|
||||
5 5 2 all_2_2_0_3
|
||||
6 6 2 all_2_2_0_3
|
||||
*** AFTER MUTATION AFTER MERGE ***
|
||||
1 0 1 all_1_2_1_3
|
||||
2 0 1 all_1_2_1_3
|
||||
3 0 1 all_1_2_1_3
|
||||
4 4 2 all_1_2_1_3
|
||||
5 5 2 all_1_2_1_3
|
||||
6 6 2 all_1_2_1_3
|
||||
*** AFTER MUTATION AFTER MERGE , NEW BLOCK ***
|
||||
1 0 1 all_1_2_1_3
|
||||
2 0 1 all_1_2_1_3
|
||||
3 0 1 all_1_2_1_3
|
||||
4 4 2 all_1_2_1_3
|
||||
5 5 2 all_1_2_1_3
|
||||
6 6 2 all_1_2_1_3
|
||||
7 7 4 all_4_4_0
|
||||
8 8 4 all_4_4_0
|
||||
9 9 4 all_4_4_0
|
||||
*** AFTER MUTATION AFTER MERGE , NEW BLOCK MERGED ***
|
||||
1 0 1 all_1_4_2_3
|
||||
2 0 1 all_1_4_2_3
|
||||
3 0 1 all_1_4_2_3
|
||||
4 4 2 all_1_4_2_3
|
||||
5 5 2 all_1_4_2_3
|
||||
6 6 2 all_1_4_2_3
|
||||
7 7 4 all_1_4_2_3
|
||||
8 8 4 all_1_4_2_3
|
||||
9 9 4 all_1_4_2_3
|
32
tests/queries/0_stateless/02668_column_block_number.sql
Normal file
32
tests/queries/0_stateless/02668_column_block_number.sql
Normal file
@ -0,0 +1,32 @@
|
||||
DROP TABLE IF EXISTS test;
|
||||
|
||||
CREATE TABLE test (id UInt32, a UInt32) ENGINE = MergeTree ORDER BY id SETTINGS allow_experimental_block_number_column = true;
|
||||
|
||||
INSERT INTO test(id,a) VALUES (1,1),(2,2),(3,3);
|
||||
INSERT INTO test(id,a) VALUES (4,4),(5,5),(6,6);
|
||||
|
||||
SELECT '*** BEFORE MUTATION BEFORE MERGE ***';
|
||||
SELECT id,a,_block_number,_part from test ORDER BY id;
|
||||
|
||||
set mutations_sync=1;
|
||||
ALTER TABLE test UPDATE a=0 WHERE id<4;
|
||||
|
||||
SELECT '*** AFTER MUTATION BEFORE MERGE ***';
|
||||
SELECT id,a,_block_number,_part from test ORDER BY id;
|
||||
|
||||
OPTIMIZE TABLE test FINAL;
|
||||
|
||||
SELECT '*** AFTER MUTATION AFTER MERGE ***';
|
||||
SELECT *,_block_number,_part from test ORDER BY id;
|
||||
|
||||
INSERT INTO test(id,a) VALUES (7,7),(8,8),(9,9);
|
||||
|
||||
SELECT '*** AFTER MUTATION AFTER MERGE , NEW BLOCK ***';
|
||||
SELECT *,_block_number,_part from test ORDER BY id;
|
||||
|
||||
OPTIMIZE TABLE test FINAL;
|
||||
|
||||
SELECT '*** AFTER MUTATION AFTER MERGE , NEW BLOCK MERGED ***';
|
||||
SELECT *,_block_number,_part from test ORDER BY id;
|
||||
|
||||
DROP TABLE test;
|
@ -0,0 +1,41 @@
|
||||
*** BEFORE MUTATION BEFORE MERGE ***
|
||||
1 1 1 all_1_1_0
|
||||
2 2 1 all_1_1_0
|
||||
3 3 1 all_1_1_0
|
||||
4 4 2 all_2_2_0
|
||||
5 5 2 all_2_2_0
|
||||
6 6 2 all_2_2_0
|
||||
*** AFTER MUTATION BEFORE MERGE ***
|
||||
1 0 1 all_1_1_0_3
|
||||
2 0 1 all_1_1_0_3
|
||||
3 0 1 all_1_1_0_3
|
||||
4 4 2 all_2_2_0_3
|
||||
5 5 2 all_2_2_0_3
|
||||
6 6 2 all_2_2_0_3
|
||||
*** AFTER MUTATION AFTER MERGE ***
|
||||
1 0 1 all_1_2_1_3
|
||||
2 0 1 all_1_2_1_3
|
||||
3 0 1 all_1_2_1_3
|
||||
4 4 2 all_1_2_1_3
|
||||
5 5 2 all_1_2_1_3
|
||||
6 6 2 all_1_2_1_3
|
||||
*** AFTER MUTATION AFTER MERGE , NEW BLOCK ***
|
||||
1 0 1 all_1_2_1_3
|
||||
2 0 1 all_1_2_1_3
|
||||
3 0 1 all_1_2_1_3
|
||||
4 4 2 all_1_2_1_3
|
||||
5 5 2 all_1_2_1_3
|
||||
6 6 2 all_1_2_1_3
|
||||
7 7 4 all_4_4_0
|
||||
8 8 4 all_4_4_0
|
||||
9 9 4 all_4_4_0
|
||||
*** AFTER MUTATION AFTER MERGE , NEW BLOCK MERGED ***
|
||||
1 0 1 all_1_4_2_3
|
||||
2 0 1 all_1_4_2_3
|
||||
3 0 1 all_1_4_2_3
|
||||
4 4 2 all_1_4_2_3
|
||||
5 5 2 all_1_4_2_3
|
||||
6 6 2 all_1_4_2_3
|
||||
7 7 4 all_1_4_2_3
|
||||
8 8 4 all_1_4_2_3
|
||||
9 9 4 all_1_4_2_3
|
@ -0,0 +1,36 @@
|
||||
DROP TABLE IF EXISTS test;
|
||||
|
||||
CREATE TABLE test (id UInt32, a UInt32) ENGINE = MergeTree ORDER BY id SETTINGS allow_experimental_block_number_column = true,
|
||||
vertical_merge_algorithm_min_rows_to_activate = 1,
|
||||
vertical_merge_algorithm_min_columns_to_activate = 0,
|
||||
min_rows_for_wide_part = 1,
|
||||
min_bytes_for_wide_part = 1;
|
||||
|
||||
INSERT INTO test(id,a) VALUES (1,1),(2,2),(3,3);
|
||||
INSERT INTO test(id,a) VALUES (4,4),(5,5),(6,6);
|
||||
|
||||
SELECT '*** BEFORE MUTATION BEFORE MERGE ***';
|
||||
SELECT id,a,_block_number,_part from test ORDER BY id;
|
||||
|
||||
set mutations_sync=1;
|
||||
ALTER TABLE test UPDATE a=0 WHERE id<4;
|
||||
|
||||
SELECT '*** AFTER MUTATION BEFORE MERGE ***';
|
||||
SELECT id,a,_block_number,_part from test ORDER BY id;
|
||||
|
||||
OPTIMIZE TABLE test FINAL;
|
||||
|
||||
SELECT '*** AFTER MUTATION AFTER MERGE ***';
|
||||
SELECT *,_block_number,_part from test ORDER BY id;
|
||||
|
||||
INSERT INTO test(id,a) VALUES (7,7),(8,8),(9,9);
|
||||
|
||||
SELECT '*** AFTER MUTATION AFTER MERGE , NEW BLOCK ***';
|
||||
SELECT *,_block_number,_part from test ORDER BY id;
|
||||
|
||||
OPTIMIZE TABLE test FINAL;
|
||||
|
||||
SELECT '*** AFTER MUTATION AFTER MERGE , NEW BLOCK MERGED ***';
|
||||
SELECT *,_block_number,_part from test ORDER BY id;
|
||||
|
||||
DROP TABLE test;
|
@ -0,0 +1,19 @@
|
||||
0
|
||||
1
|
||||
2
|
||||
3
|
||||
*** AFTER FIRST OPTIMIZE ***
|
||||
0 1
|
||||
1 2
|
||||
1 2
|
||||
2 3
|
||||
3 3
|
||||
*** AFTER SECOND OPTIMIZE ***
|
||||
0 1
|
||||
1 2
|
||||
1 2
|
||||
2 3
|
||||
3 3
|
||||
4 4
|
||||
5 4
|
||||
6 4
|
@ -0,0 +1,18 @@
|
||||
DROP TABLE IF EXISTS t;
|
||||
CREATE TABLE t (x UInt8, PROJECTION p (SELECT x GROUP BY x)) ENGINE = MergeTree ORDER BY () SETTINGS allow_experimental_block_number_column=true;
|
||||
INSERT INTO t VALUES (0);
|
||||
INSERT INTO t VALUES (1),(1);
|
||||
INSERT INTO t VALUES (2),(3);
|
||||
|
||||
SELECT x FROM t GROUP BY x;
|
||||
OPTIMIZE TABLE t FINAL;
|
||||
|
||||
SELECT '*** AFTER FIRST OPTIMIZE ***';
|
||||
SELECT x,_block_number FROM t;
|
||||
|
||||
INSERT INTO t VALUES (4), (5), (6);
|
||||
OPTIMIZE TABLE t FINAL;
|
||||
SELECT '*** AFTER SECOND OPTIMIZE ***';
|
||||
SELECT x,_block_number FROM t;
|
||||
|
||||
DROP TABLE t;
|
@ -0,0 +1,88 @@
|
||||
CreatingSets (Create sets before main query execution)
|
||||
Expression ((Projection + Before ORDER BY))
|
||||
ReadFromMergeTree (default.test_table)
|
||||
Indexes:
|
||||
PrimaryKey
|
||||
Keys:
|
||||
id
|
||||
value
|
||||
Condition: and((id in (-Inf, 10]), (value in 1-element set))
|
||||
Parts: 1/1
|
||||
Granules: 1/1
|
||||
CreatingSets (Create sets before main query execution)
|
||||
Expression ((Projection + Before ORDER BY))
|
||||
ReadFromMergeTree (default.test_table)
|
||||
Indexes:
|
||||
PrimaryKey
|
||||
Keys:
|
||||
id
|
||||
value
|
||||
Condition: and((id in (-Inf, 10]), (value in 1-element set))
|
||||
Parts: 1/1
|
||||
Granules: 1/1
|
||||
CreatingSets (Create sets before main query execution)
|
||||
Expression ((Projection + Before ORDER BY))
|
||||
ReadFromMergeTree (default.test_table)
|
||||
Indexes:
|
||||
PrimaryKey
|
||||
Keys:
|
||||
id
|
||||
value
|
||||
Condition: and((id in (-Inf, 10]), (value in 5-element set))
|
||||
Parts: 1/1
|
||||
Granules: 1/1
|
||||
CreatingSets (Create sets before main query execution)
|
||||
Expression ((Projection + Before ORDER BY))
|
||||
ReadFromMergeTree (default.test_table)
|
||||
Indexes:
|
||||
PrimaryKey
|
||||
Keys:
|
||||
id
|
||||
value
|
||||
Condition: and((id in (-Inf, 10]), (value in 5-element set))
|
||||
Parts: 1/1
|
||||
Granules: 1/1
|
||||
CreatingSets (Create sets before main query execution)
|
||||
Expression ((Project names + Projection))
|
||||
ReadFromMergeTree (default.test_table)
|
||||
Indexes:
|
||||
PrimaryKey
|
||||
Keys:
|
||||
id
|
||||
value
|
||||
Condition: and((id in (-Inf, 10]), (value in 1-element set))
|
||||
Parts: 1/1
|
||||
Granules: 1/1
|
||||
CreatingSets (Create sets before main query execution)
|
||||
Expression ((Project names + Projection))
|
||||
ReadFromMergeTree (default.test_table)
|
||||
Indexes:
|
||||
PrimaryKey
|
||||
Keys:
|
||||
id
|
||||
value
|
||||
Condition: and((id in (-Inf, 10]), (value in 1-element set))
|
||||
Parts: 1/1
|
||||
Granules: 1/1
|
||||
CreatingSets (Create sets before main query execution)
|
||||
Expression ((Project names + Projection))
|
||||
ReadFromMergeTree (default.test_table)
|
||||
Indexes:
|
||||
PrimaryKey
|
||||
Keys:
|
||||
id
|
||||
value
|
||||
Condition: and((id in (-Inf, 10]), (value in 5-element set))
|
||||
Parts: 1/1
|
||||
Granules: 1/1
|
||||
CreatingSets (Create sets before main query execution)
|
||||
Expression ((Project names + Projection))
|
||||
ReadFromMergeTree (default.test_table)
|
||||
Indexes:
|
||||
PrimaryKey
|
||||
Keys:
|
||||
id
|
||||
value
|
||||
Condition: and((id in (-Inf, 10]), (value in 5-element set))
|
||||
Parts: 1/1
|
||||
Granules: 1/1
|
@ -0,0 +1,24 @@
|
||||
DROP TABLE IF EXISTS test_table;
|
||||
CREATE TABLE test_table
|
||||
(
|
||||
id UInt64,
|
||||
value UInt64
|
||||
) ENGINE=MergeTree ORDER BY (id, value);
|
||||
|
||||
INSERT INTO test_table SELECT number, number FROM numbers(10);
|
||||
|
||||
SET allow_experimental_analyzer = 0;
|
||||
|
||||
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT 5);
|
||||
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT '5');
|
||||
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT toUInt8(number) FROM numbers(5));
|
||||
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT toString(number) FROM numbers(5));
|
||||
|
||||
SET allow_experimental_analyzer = 1;
|
||||
|
||||
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT 5);
|
||||
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT '5');
|
||||
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT toUInt8(number) FROM numbers(5));
|
||||
EXPLAIN indexes = 1 SELECT id FROM test_table WHERE id <= 10 AND value IN (SELECT toString(number) FROM numbers(5));
|
||||
|
||||
DROP TABLE test_table;
|
Loading…
Reference in New Issue
Block a user