Merge branch 'master' into change-log-level-clickhouse-local

This commit is contained in:
Alexey Milovidov 2024-08-07 17:21:26 +02:00
commit 2b78b3f90a
149 changed files with 1736 additions and 711 deletions

2
.gitmodules vendored
View File

@ -341,7 +341,7 @@
url = https://github.com/graphitemaster/incbin.git
[submodule "contrib/usearch"]
path = contrib/usearch
url = https://github.com/unum-cloud/usearch.git
url = https://github.com/ClickHouse/usearch.git
[submodule "contrib/SimSIMD"]
path = contrib/SimSIMD
url = https://github.com/ashvardanian/SimSIMD.git

View File

@ -71,7 +71,6 @@ add_contrib (zlib-ng-cmake zlib-ng)
add_contrib (bzip2-cmake bzip2)
add_contrib (minizip-ng-cmake minizip-ng)
add_contrib (snappy-cmake snappy)
add_contrib (rocksdb-cmake rocksdb)
add_contrib (thrift-cmake thrift)
# parquet/arrow/orc
add_contrib (arrow-cmake arrow) # requires: snappy, thrift, double-conversion
@ -148,6 +147,7 @@ add_contrib (hive-metastore-cmake hive-metastore) # requires: thrift, avro, arro
add_contrib (cppkafka-cmake cppkafka)
add_contrib (libpqxx-cmake libpqxx)
add_contrib (libpq-cmake libpq)
add_contrib (rocksdb-cmake rocksdb) # requires: jemalloc, snappy, zlib, lz4, zstd, liburing
add_contrib (nuraft-cmake NuRaft)
add_contrib (fast_float-cmake fast_float)
add_contrib (idna-cmake idna)

2
contrib/qpl vendored

@ -1 +1 @@
Subproject commit d4715e0e79896b85612158e135ee1a85f3b3e04d
Subproject commit c2ced94c53c1ee22191201a59878e9280bc9b9b8

View File

@ -4,7 +4,6 @@ set (QPL_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl")
set (QPL_SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl/sources")
set (QPL_BINARY_DIR "${ClickHouse_BINARY_DIR}/build/contrib/qpl")
set (EFFICIENT_WAIT OFF)
set (BLOCK_ON_FAULT ON)
set (LOG_HW_INIT OFF)
set (SANITIZE_MEMORY OFF)
set (SANITIZE_THREADS OFF)
@ -16,16 +15,20 @@ function(GetLibraryVersion _content _outputVar)
SET(${_outputVar} ${CMAKE_MATCH_1} PARENT_SCOPE)
endfunction()
set (QPL_VERSION 1.2.0)
set (QPL_VERSION 1.6.0)
message(STATUS "Intel QPL version: ${QPL_VERSION}")
# There are 5 source subdirectories under $QPL_SRC_DIR: isal, c_api, core-sw, middle-layer, c_api.
# Generate 8 library targets: middle_layer_lib, isal, isal_asm, qplcore_px, qplcore_avx512, qplcore_sw_dispatcher, core_iaa, middle_layer_lib.
# There are 5 source subdirectories under $QPL_SRC_DIR: c_api, core-iaa, core-sw, middle-layer and isal.
# Generate 8 library targets: qpl_c_api, core_iaa, qplcore_px, qplcore_avx512, qplcore_sw_dispatcher, middle_layer_lib, isal and isal_asm,
# which are then combined into static or shared qpl.
# Output ch_contrib::qpl by linking with 8 library targets.
# The qpl submodule comes with its own version of isal. It contains code which does not exist in upstream isal. It would be nice to link
# only upstream isal (ch_contrib::isal) but at this point we can't.
# Note, QPL has integrated a customized version of ISA-L to meet specific needs.
# This version has been significantly modified and there are no plans to maintain compatibility with the upstream version
# or upgrade the current copy.
## cmake/CompileOptions.cmake and automatic wrappers generation
# ==========================================================================
# Copyright (C) 2022 Intel Corporation
@ -442,6 +445,7 @@ function(generate_unpack_kernel_arrays current_directory PLATFORMS_LIST)
endforeach()
endfunction()
# [SUBDIR]isal
enable_language(ASM_NASM)
@ -479,7 +483,6 @@ set(ISAL_ASM_SRC ${QPL_SRC_DIR}/isal/igzip/igzip_body.asm
${QPL_SRC_DIR}/isal/igzip/igzip_set_long_icf_fg_04.asm
${QPL_SRC_DIR}/isal/igzip/igzip_set_long_icf_fg_06.asm
${QPL_SRC_DIR}/isal/igzip/igzip_multibinary.asm
${QPL_SRC_DIR}/isal/igzip/stdmac.asm
${QPL_SRC_DIR}/isal/crc/crc_multibinary.asm
${QPL_SRC_DIR}/isal/crc/crc32_gzip_refl_by8.asm
${QPL_SRC_DIR}/isal/crc/crc32_gzip_refl_by8_02.asm
@ -505,7 +508,6 @@ set_property(GLOBAL APPEND PROPERTY QPL_LIB_DEPS
# Setting external and internal interfaces for ISA-L library
target_include_directories(isal
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/isal/include>
PRIVATE ${QPL_SRC_DIR}/isal/include
PUBLIC ${QPL_SRC_DIR}/isal/igzip)
set_target_properties(isal PROPERTIES
@ -617,12 +619,9 @@ target_compile_options(qplcore_sw_dispatcher
# [SUBDIR]core-iaa
file(GLOB HW_PATH_SRC ${QPL_SRC_DIR}/core-iaa/sources/aecs/*.c
${QPL_SRC_DIR}/core-iaa/sources/aecs/*.cpp
${QPL_SRC_DIR}/core-iaa/sources/driver_loader/*.c
${QPL_SRC_DIR}/core-iaa/sources/driver_loader/*.cpp
${QPL_SRC_DIR}/core-iaa/sources/descriptors/*.c
${QPL_SRC_DIR}/core-iaa/sources/descriptors/*.cpp
${QPL_SRC_DIR}/core-iaa/sources/bit_rev.c)
${QPL_SRC_DIR}/core-iaa/sources/*.c)
# Create library
add_library(core_iaa OBJECT ${HW_PATH_SRC})
@ -634,31 +633,27 @@ target_include_directories(core_iaa
PRIVATE ${UUID_DIR}
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-iaa/include>
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/core-iaa/sources/include>
PRIVATE $<BUILD_INTERFACE:${QPL_PROJECT_DIR}/include> # status.h in own_checkers.h
PRIVATE $<BUILD_INTERFACE:${QPL_PROJECT_DIR}/sources/c_api> # own_checkers.h
PRIVATE $<BUILD_INTERFACE:${QPL_PROJECT_DIR}/include> # status.h in own_checkers.h
PRIVATE $<TARGET_PROPERTY:qpl_c_api,INTERFACE_INCLUDE_DIRECTORIES> # for own_checkers.h
PRIVATE $<TARGET_PROPERTY:qplcore_sw_dispatcher,INTERFACE_INCLUDE_DIRECTORIES>)
target_compile_features(core_iaa PRIVATE c_std_11)
target_compile_definitions(core_iaa PRIVATE QPL_BADARG_CHECK
PRIVATE $<$<BOOL:${BLOCK_ON_FAULT}>: BLOCK_ON_FAULT_ENABLED>
PRIVATE $<$<BOOL:${LOG_HW_INIT}>:LOG_HW_INIT>
PRIVATE $<$<BOOL:${DYNAMIC_LOADING_LIBACCEL_CONFIG}>:DYNAMIC_LOADING_LIBACCEL_CONFIG>)
# [SUBDIR]middle-layer
file(GLOB MIDDLE_LAYER_SRC
${QPL_SRC_DIR}/middle-layer/analytics/*.cpp
${QPL_SRC_DIR}/middle-layer/c_wrapper/*.cpp
${QPL_SRC_DIR}/middle-layer/checksum/*.cpp
${QPL_SRC_DIR}/middle-layer/accelerator/*.cpp
${QPL_SRC_DIR}/middle-layer/analytics/*.cpp
${QPL_SRC_DIR}/middle-layer/common/*.cpp
${QPL_SRC_DIR}/middle-layer/compression/*.cpp
${QPL_SRC_DIR}/middle-layer/compression/*/*.cpp
${QPL_SRC_DIR}/middle-layer/compression/*/*/*.cpp
${QPL_SRC_DIR}/middle-layer/dispatcher/*.cpp
${QPL_SRC_DIR}/middle-layer/other/*.cpp
${QPL_SRC_DIR}/middle-layer/util/*.cpp
${QPL_SRC_DIR}/middle-layer/inflate/*.cpp
${QPL_SRC_DIR}/core-iaa/sources/accelerator/*.cpp) # todo
${QPL_SRC_DIR}/middle-layer/util/*.cpp)
add_library(middle_layer_lib OBJECT
${MIDDLE_LAYER_SRC})
@ -667,6 +662,7 @@ set_property(GLOBAL APPEND PROPERTY QPL_LIB_DEPS
$<TARGET_OBJECTS:middle_layer_lib>)
target_compile_options(middle_layer_lib
PRIVATE $<$<C_COMPILER_ID:GNU,Clang>:$<$<CONFIG:Release>:-O3;-U_FORTIFY_SOURCE;-D_FORTIFY_SOURCE=2>>
PRIVATE ${QPL_LINUX_TOOLCHAIN_CPP_EMBEDDED_FLAGS})
target_compile_definitions(middle_layer_lib
@ -682,6 +678,7 @@ target_include_directories(middle_layer_lib
PRIVATE ${UUID_DIR}
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/middle-layer>
PUBLIC $<TARGET_PROPERTY:_qpl,INTERFACE_INCLUDE_DIRECTORIES>
PRIVATE $<TARGET_PROPERTY:qpl_c_api,INTERFACE_INCLUDE_DIRECTORIES>
PUBLIC $<TARGET_PROPERTY:qplcore_sw_dispatcher,INTERFACE_INCLUDE_DIRECTORIES>
PUBLIC $<TARGET_PROPERTY:isal,INTERFACE_INCLUDE_DIRECTORIES>
PUBLIC $<TARGET_PROPERTY:core_iaa,INTERFACE_INCLUDE_DIRECTORIES>)
@ -689,31 +686,54 @@ target_include_directories(middle_layer_lib
target_compile_definitions(middle_layer_lib PUBLIC -DQPL_LIB)
# [SUBDIR]c_api
file(GLOB_RECURSE QPL_C_API_SRC
${QPL_SRC_DIR}/c_api/*.c
${QPL_SRC_DIR}/c_api/*.cpp)
file(GLOB QPL_C_API_SRC
${QPL_SRC_DIR}/c_api/compression_operations/*.c
${QPL_SRC_DIR}/c_api/compression_operations/*.cpp
${QPL_SRC_DIR}/c_api/filter_operations/*.cpp
${QPL_SRC_DIR}/c_api/legacy_hw_path/*.c
${QPL_SRC_DIR}/c_api/legacy_hw_path/*.cpp
${QPL_SRC_DIR}/c_api/other_operations/*.cpp
${QPL_SRC_DIR}/c_api/serialization/*.cpp
${QPL_SRC_DIR}/c_api/*.cpp)
add_library(qpl_c_api OBJECT ${QPL_C_API_SRC})
target_include_directories(qpl_c_api
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/c_api/>
PUBLIC $<BUILD_INTERFACE:${QPL_SRC_DIR}/include/> $<INSTALL_INTERFACE:include>
PRIVATE $<TARGET_PROPERTY:middle_layer_lib,INTERFACE_INCLUDE_DIRECTORIES>)
set_target_properties(qpl_c_api PROPERTIES
$<$<C_COMPILER_ID:GNU,Clang>:C_STANDARD 17
CXX_STANDARD 17)
target_compile_options(qpl_c_api
PRIVATE $<$<C_COMPILER_ID:GNU,Clang>:$<$<CONFIG:Release>:-O3;-U_FORTIFY_SOURCE;-D_FORTIFY_SOURCE=2>>
PRIVATE $<$<COMPILE_LANG_AND_ID:CXX,GNU,Clang>:${QPL_LINUX_TOOLCHAIN_CPP_EMBEDDED_FLAGS}>)
target_compile_definitions(qpl_c_api
PUBLIC -DQPL_BADARG_CHECK # own_checkers.h
PUBLIC -DQPL_LIB # needed for middle_layer_lib
PUBLIC $<$<BOOL:${LOG_HW_INIT}>:LOG_HW_INIT>) # needed for middle_layer_lib
set_property(GLOBAL APPEND PROPERTY QPL_LIB_DEPS
$<TARGET_OBJECTS:qpl_c_api>)
# Final _qpl target
get_property(LIB_DEPS GLOBAL PROPERTY QPL_LIB_DEPS)
add_library(_qpl STATIC ${QPL_C_API_SRC} ${LIB_DEPS})
add_library(_qpl STATIC ${LIB_DEPS})
target_include_directories(_qpl
PUBLIC $<BUILD_INTERFACE:${QPL_PROJECT_DIR}/include/> $<INSTALL_INTERFACE:include>
PRIVATE $<TARGET_PROPERTY:middle_layer_lib,INTERFACE_INCLUDE_DIRECTORIES>
PRIVATE $<BUILD_INTERFACE:${QPL_SRC_DIR}/c_api>)
PUBLIC $<BUILD_INTERFACE:${QPL_PROJECT_DIR}/include/> $<INSTALL_INTERFACE:include>)
target_compile_options(_qpl
PRIVATE ${QPL_LINUX_TOOLCHAIN_CPP_EMBEDDED_FLAGS})
target_compile_definitions(_qpl
PRIVATE -DQPL_LIB
PRIVATE -DQPL_BADARG_CHECK
PRIVATE $<$<BOOL:${DYNAMIC_LOADING_LIBACCEL_CONFIG}>:DYNAMIC_LOADING_LIBACCEL_CONFIG>
PUBLIC -DENABLE_QPL_COMPRESSION)
target_link_libraries(_qpl
PRIVATE ch_contrib::accel-config
PRIVATE ch_contrib::isal)
PRIVATE ch_contrib::accel-config)
target_include_directories(_qpl SYSTEM BEFORE
PUBLIC "${QPL_PROJECT_DIR}/include"

2
contrib/rocksdb vendored

@ -1 +1 @@
Subproject commit 49ce8a1064dd1ad89117899839bf136365e49e79
Subproject commit 5f003e4a22d2e48e37c98d9620241237cd30dd24

View File

@ -5,36 +5,38 @@ if (NOT ENABLE_ROCKSDB OR NO_SSE3_OR_HIGHER) # assumes SSE4.2 and PCLMUL
return()
endif()
# not in original build system, otherwise xxHash.cc fails to compile with ClickHouse C++23 default
set (CMAKE_CXX_STANDARD 20)
# Always disable jemalloc for rocksdb by default because it introduces non-standard jemalloc APIs
option(WITH_JEMALLOC "build with JeMalloc" OFF)
option(WITH_LIBURING "build with liburing" OFF) # TODO could try to enable this conditionally, depending on ClickHouse's ENABLE_LIBURING
# ClickHouse cannot be compiled without snappy, lz4, zlib, zstd
option(WITH_SNAPPY "build with SNAPPY" ON)
option(WITH_LZ4 "build with lz4" ON)
option(WITH_ZLIB "build with zlib" ON)
option(WITH_ZSTD "build with zstd" ON)
if(WITH_SNAPPY)
if (ENABLE_JEMALLOC)
add_definitions(-DROCKSDB_JEMALLOC -DJEMALLOC_NO_DEMANGLE)
list (APPEND THIRDPARTY_LIBS ch_contrib::jemalloc)
endif ()
if (ENABLE_LIBURING)
add_definitions(-DROCKSDB_IOURING_PRESENT)
list (APPEND THIRDPARTY_LIBS ch_contrib::liburing)
endif ()
if (WITH_SNAPPY)
add_definitions(-DSNAPPY)
list(APPEND THIRDPARTY_LIBS ch_contrib::snappy)
endif()
if(WITH_ZLIB)
if (WITH_ZLIB)
add_definitions(-DZLIB)
list(APPEND THIRDPARTY_LIBS ch_contrib::zlib)
endif()
if(WITH_LZ4)
if (WITH_LZ4)
add_definitions(-DLZ4)
list(APPEND THIRDPARTY_LIBS ch_contrib::lz4)
endif()
if(WITH_ZSTD)
if (WITH_ZSTD)
add_definitions(-DZSTD)
list(APPEND THIRDPARTY_LIBS ch_contrib::zstd)
endif()
@ -88,6 +90,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/cache/sharded_cache.cc
${ROCKSDB_SOURCE_DIR}/cache/tiered_secondary_cache.cc
${ROCKSDB_SOURCE_DIR}/db/arena_wrapped_db_iter.cc
${ROCKSDB_SOURCE_DIR}/db/attribute_group_iterator_impl.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_contents.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_fetcher.cc
${ROCKSDB_SOURCE_DIR}/db/blob/blob_file_addition.cc
@ -104,6 +107,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/db/blob/prefetch_buffer_collection.cc
${ROCKSDB_SOURCE_DIR}/db/builder.cc
${ROCKSDB_SOURCE_DIR}/db/c.cc
${ROCKSDB_SOURCE_DIR}/db/coalescing_iterator.cc
${ROCKSDB_SOURCE_DIR}/db/column_family.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction.cc
${ROCKSDB_SOURCE_DIR}/db/compaction/compaction_iterator.cc
@ -124,6 +128,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_write.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_compaction_flush.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_files.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_follower.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_open.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_debug.cc
${ROCKSDB_SOURCE_DIR}/db/db_impl/db_impl_experimental.cc
@ -181,6 +186,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/env/env_encryption.cc
${ROCKSDB_SOURCE_DIR}/env/file_system.cc
${ROCKSDB_SOURCE_DIR}/env/file_system_tracer.cc
${ROCKSDB_SOURCE_DIR}/env/fs_on_demand.cc
${ROCKSDB_SOURCE_DIR}/env/fs_remap.cc
${ROCKSDB_SOURCE_DIR}/env/mock_env.cc
${ROCKSDB_SOURCE_DIR}/env/unique_id_gen.cc
@ -368,6 +374,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/utilities/persistent_cache/volatile_tier_impl.cc
${ROCKSDB_SOURCE_DIR}/utilities/simulator_cache/cache_simulator.cc
${ROCKSDB_SOURCE_DIR}/utilities/simulator_cache/sim_cache.cc
${ROCKSDB_SOURCE_DIR}/utilities/table_properties_collectors/compact_for_tiering_collector.cc
${ROCKSDB_SOURCE_DIR}/utilities/table_properties_collectors/compact_on_deletion_collector.cc
${ROCKSDB_SOURCE_DIR}/utilities/trace/file_trace_reader_writer.cc
${ROCKSDB_SOURCE_DIR}/utilities/trace/replayer_impl.cc
@ -388,6 +395,7 @@ set(SOURCES
${ROCKSDB_SOURCE_DIR}/utilities/transactions/write_prepared_txn_db.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/write_unprepared_txn.cc
${ROCKSDB_SOURCE_DIR}/utilities/transactions/write_unprepared_txn_db.cc
${ROCKSDB_SOURCE_DIR}/utilities/types_util.cc
${ROCKSDB_SOURCE_DIR}/utilities/ttl/db_ttl_impl.cc
${ROCKSDB_SOURCE_DIR}/utilities/wal_filter.cc
${ROCKSDB_SOURCE_DIR}/utilities/write_batch_with_index/write_batch_with_index.cc
@ -418,14 +426,18 @@ if(HAS_ARMV8_CRC)
endif(HAS_ARMV8_CRC)
list(APPEND SOURCES
"${ROCKSDB_SOURCE_DIR}/port/port_posix.cc"
"${ROCKSDB_SOURCE_DIR}/env/env_posix.cc"
"${ROCKSDB_SOURCE_DIR}/env/fs_posix.cc"
"${ROCKSDB_SOURCE_DIR}/env/io_posix.cc")
${ROCKSDB_SOURCE_DIR}/port/port_posix.cc
${ROCKSDB_SOURCE_DIR}/env/env_posix.cc
${ROCKSDB_SOURCE_DIR}/env/fs_posix.cc
${ROCKSDB_SOURCE_DIR}/env/io_posix.cc)
add_library(_rocksdb ${SOURCES})
add_library(ch_contrib::rocksdb ALIAS _rocksdb)
target_link_libraries(_rocksdb PRIVATE ${THIRDPARTY_LIBS} ${SYSTEM_LIBS})
# Not in the native build system but useful anyways:
# Make all functions in xxHash.h inline. Beneficial for performance: https://github.com/Cyan4973/xxHash/tree/v0.8.2#build-modifiers
target_compile_definitions (_rocksdb PRIVATE XXH_INLINE_ALL)
# SYSTEM is required to overcome some issues
target_include_directories(_rocksdb SYSTEM BEFORE INTERFACE "${ROCKSDB_SOURCE_DIR}/include")

2
contrib/usearch vendored

@ -1 +1 @@
Subproject commit 955c6f9c11adfd89c912e0d1643d160b4e9e543f
Subproject commit 30810452bec5d3d3aa0931bb5d761e2f09aa6356

View File

@ -28,12 +28,14 @@ RUN echo "TSAN_OPTIONS='verbosity=1000 halt_on_error=1 abort_on_error=1 history_
RUN echo "UBSAN_OPTIONS='print_stacktrace=1 max_allocation_size_mb=32768'" >> /etc/environment
RUN echo "MSAN_OPTIONS='abort_on_error=1 poison_in_dtor=1 max_allocation_size_mb=32768'" >> /etc/environment
RUN echo "LSAN_OPTIONS='suppressions=/usr/share/clickhouse-test/config/lsan_suppressions.txt max_allocation_size_mb=32768'" >> /etc/environment
RUN echo "ASAN_OPTIONS='halt_on_error=1 abort_on_error=1'" >> /etc/environment
# Sanitizer options for current shell (not current, but the one that will be spawned on "docker run")
# (but w/o verbosity for TSAN, otherwise test.reference will not match)
ENV TSAN_OPTIONS='halt_on_error=1 abort_on_error=1 history_size=7 memory_limit_mb=46080 second_deadlock_stack=1 max_allocation_size_mb=32768'
ENV UBSAN_OPTIONS='print_stacktrace=1 max_allocation_size_mb=32768'
ENV MSAN_OPTIONS='abort_on_error=1 poison_in_dtor=1 max_allocation_size_mb=32768'
ENV LSAN_OPTIONS='max_allocation_size_mb=32768'
ENV ASAN_OPTIONS='halt_on_error=1 abort_on_error=1'
# for external_symbolizer_path, and also ensure that llvm-symbolizer really
# exists (since you don't want to fallback to addr2line, it is very slow)

View File

@ -193,53 +193,60 @@ function fuzz
kill -0 $server_pid
# Set follow-fork-mode to parent, because we attach to clickhouse-server, not to watchdog
# and clickhouse-server can do fork-exec, for example, to run some bridge.
# Do not set nostop noprint for all signals, because some it may cause gdb to hang,
# explicitly ignore non-fatal signals that are used by server.
# Number of SIGRTMIN can be determined only in runtime.
RTMIN=$(kill -l SIGRTMIN)
echo "
set follow-fork-mode parent
handle SIGHUP nostop noprint pass
handle SIGINT nostop noprint pass
handle SIGQUIT nostop noprint pass
handle SIGPIPE nostop noprint pass
handle SIGTERM nostop noprint pass
handle SIGUSR1 nostop noprint pass
handle SIGUSR2 nostop noprint pass
handle SIG$RTMIN nostop noprint pass
info signals
continue
backtrace full
thread apply all backtrace full
info registers
disassemble /s
up
disassemble /s
up
disassemble /s
p \"done\"
detach
quit
" > script.gdb
IS_ASAN=$(clickhouse-client --query "SELECT count() FROM system.build_options WHERE name = 'CXX_FLAGS' AND position('sanitize=address' IN value)")
if [[ "$IS_ASAN" = "1" ]];
then
echo "ASAN build detected. Not using gdb since it disables LeakSanitizer detections"
else
# Set follow-fork-mode to parent, because we attach to clickhouse-server, not to watchdog
# and clickhouse-server can do fork-exec, for example, to run some bridge.
# Do not set nostop noprint for all signals, because some it may cause gdb to hang,
# explicitly ignore non-fatal signals that are used by server.
# Number of SIGRTMIN can be determined only in runtime.
RTMIN=$(kill -l SIGRTMIN)
echo "
set follow-fork-mode parent
handle SIGHUP nostop noprint pass
handle SIGINT nostop noprint pass
handle SIGQUIT nostop noprint pass
handle SIGPIPE nostop noprint pass
handle SIGTERM nostop noprint pass
handle SIGUSR1 nostop noprint pass
handle SIGUSR2 nostop noprint pass
handle SIG$RTMIN nostop noprint pass
info signals
continue
backtrace full
thread apply all backtrace full
info registers
disassemble /s
up
disassemble /s
up
disassemble /s
p \"done\"
detach
quit
" > script.gdb
gdb -batch -command script.gdb -p $server_pid &
sleep 5
# gdb will send SIGSTOP, spend some time loading debug info, and then send SIGCONT, wait for it (up to send_timeout, 300s)
time clickhouse-client --query "SELECT 'Connected to clickhouse-server after attaching gdb'" ||:
gdb -batch -command script.gdb -p $server_pid &
sleep 5
# gdb will send SIGSTOP, spend some time loading debug info, and then send SIGCONT, wait for it (up to send_timeout, 300s)
time clickhouse-client --query "SELECT 'Connected to clickhouse-server after attaching gdb'" ||:
# Check connectivity after we attach gdb, because it might cause the server
# to freeze, and the fuzzer will fail. In debug build, it can take a lot of time.
for _ in {1..180}
do
if clickhouse-client --query "select 1"
then
break
fi
sleep 1
done
kill -0 $server_pid # This checks that it is our server that is started and not some other one
fi
# Check connectivity after we attach gdb, because it might cause the server
# to freeze, and the fuzzer will fail. In debug build, it can take a lot of time.
for _ in {1..180}
do
if clickhouse-client --query "select 1"
then
break
fi
sleep 1
done
kill -0 $server_pid # This checks that it is our server that is started and not some other one
echo 'Server started and responded.'
setup_logs_replication
@ -264,8 +271,13 @@ quit
# The fuzzer_pid belongs to the timeout process.
actual_fuzzer_pid=$(ps -o pid= --ppid "$fuzzer_pid")
echo "Attaching gdb to the fuzzer itself"
gdb -batch -command script.gdb -p $actual_fuzzer_pid &
if [[ "$IS_ASAN" = "1" ]];
then
echo "ASAN build detected. Not using gdb since it disables LeakSanitizer detections"
else
echo "Attaching gdb to the fuzzer itself"
gdb -batch -command script.gdb -p $actual_fuzzer_pid &
fi
# Wait for the fuzzer to complete.
# Note that the 'wait || ...' thing is required so that the script doesn't

View File

@ -5,47 +5,53 @@ source /utils.lib
function attach_gdb_to_clickhouse()
{
# Set follow-fork-mode to parent, because we attach to clickhouse-server, not to watchdog
# and clickhouse-server can do fork-exec, for example, to run some bridge.
# Do not set nostop noprint for all signals, because some it may cause gdb to hang,
# explicitly ignore non-fatal signals that are used by server.
# Number of SIGRTMIN can be determined only in runtime.
RTMIN=$(kill -l SIGRTMIN)
# shellcheck disable=SC2016
echo "
set follow-fork-mode parent
handle SIGHUP nostop noprint pass
handle SIGINT nostop noprint pass
handle SIGQUIT nostop noprint pass
handle SIGPIPE nostop noprint pass
handle SIGTERM nostop noprint pass
handle SIGUSR1 nostop noprint pass
handle SIGUSR2 nostop noprint pass
handle SIG$RTMIN nostop noprint pass
info signals
continue
backtrace full
info registers
p "top 1 KiB of the stack:"
p/x *(uint64_t[128]*)"'$sp'"
maintenance info sections
thread apply all backtrace full
disassemble /s
up
disassemble /s
up
disassemble /s
p \"done\"
detach
quit
" > script.gdb
IS_ASAN=$(clickhouse-client --query "SELECT count() FROM system.build_options WHERE name = 'CXX_FLAGS' AND position('sanitize=address' IN value)")
if [[ "$IS_ASAN" = "1" ]];
then
echo "ASAN build detected. Not using gdb since it disables LeakSanitizer detections"
else
# Set follow-fork-mode to parent, because we attach to clickhouse-server, not to watchdog
# and clickhouse-server can do fork-exec, for example, to run some bridge.
# Do not set nostop noprint for all signals, because some it may cause gdb to hang,
# explicitly ignore non-fatal signals that are used by server.
# Number of SIGRTMIN can be determined only in runtime.
RTMIN=$(kill -l SIGRTMIN)
# shellcheck disable=SC2016
echo "
set follow-fork-mode parent
handle SIGHUP nostop noprint pass
handle SIGINT nostop noprint pass
handle SIGQUIT nostop noprint pass
handle SIGPIPE nostop noprint pass
handle SIGTERM nostop noprint pass
handle SIGUSR1 nostop noprint pass
handle SIGUSR2 nostop noprint pass
handle SIG$RTMIN nostop noprint pass
info signals
continue
backtrace full
info registers
p "top 1 KiB of the stack:"
p/x *(uint64_t[128]*)"'$sp'"
maintenance info sections
thread apply all backtrace full
disassemble /s
up
disassemble /s
up
disassemble /s
p \"done\"
detach
quit
" > script.gdb
# FIXME Hung check may work incorrectly because of attached gdb
# We cannot attach another gdb to get stacktraces if some queries hung
gdb -batch -command script.gdb -p "$(cat /var/run/clickhouse-server/clickhouse-server.pid)" | ts '%Y-%m-%d %H:%M:%S' >> /test_output/gdb.log &
sleep 5
# gdb will send SIGSTOP, spend some time loading debug info and then send SIGCONT, wait for it (up to send_timeout, 300s)
run_with_retry 60 clickhouse-client --query "SELECT 'Connected to clickhouse-server after attaching gdb'"
# FIXME Hung check may work incorrectly because of attached gdb
# We cannot attach another gdb to get stacktraces if some queries hung
gdb -batch -command script.gdb -p "$(cat /var/run/clickhouse-server/clickhouse-server.pid)" | ts '%Y-%m-%d %H:%M:%S' >> /test_output/gdb.log &
sleep 5
# gdb will send SIGSTOP, spend some time loading debug info and then send SIGCONT, wait for it (up to send_timeout, 300s)
run_with_retry 60 clickhouse-client --query "SELECT 'Connected to clickhouse-server after attaching gdb'"
fi
}
# vi: ft=bash

View File

@ -174,7 +174,7 @@ do
done
setup_logs_replication
attach_gdb_to_clickhouse || true # FIXME: to not break old builds, clean on 2023-09-01
attach_gdb_to_clickhouse
function fn_exists() {
declare -F "$1" > /dev/null;

View File

@ -308,7 +308,8 @@ function collect_query_and_trace_logs()
{
for table in query_log trace_log metric_log
do
clickhouse-local --config-file=/etc/clickhouse-server/config.xml --only-system-tables -q "select * from system.$table format TSVWithNamesAndTypes" | zstd --threads=0 > /test_output/$table.tsv.zst ||:
# Don't ignore errors here, it leads to ignore sanitizer reports when running clickhouse-local
clickhouse-local --config-file=/etc/clickhouse-server/config.xml --only-system-tables -q "select * from system.$table format TSVWithNamesAndTypes" | zstd --threads=0 > /test_output/$table.tsv.zst
done
}

View File

@ -4,4 +4,5 @@ ARG FROM_TAG=latest
FROM clickhouse/test-base:$FROM_TAG
COPY run.sh /
CMD ["/bin/bash", "/run.sh"]
RUN chmod +x run.sh
ENTRYPOINT ["/run.sh"]

View File

@ -1,5 +1,27 @@
#!/bin/bash
set -x
# Need to keep error from tests after `tee`. Otherwise we don't alert on asan errors
set -o pipefail
set -e
timeout 40m gdb -q -ex 'set print inferior-events off' -ex 'set confirm off' -ex 'set print thread-events off' -ex run -ex bt -ex quit --args ./unit_tests_dbms --gtest_output='json:test_output/test_result.json' | tee test_output/test_result.txt
if [ "$#" -ne 1 ]; then
echo "Expected exactly one argument"
exit 1
fi
if [ "$1" = "GDB" ];
then
timeout 40m \
gdb -q -ex "set print inferior-events off" -ex "set confirm off" -ex "set print thread-events off" -ex run -ex bt -ex quit --args \
./unit_tests_dbms --gtest_output='json:test_output/test_result.json' \
| tee test_output/test_result.txt
elif [ "$1" = "NO_GDB" ];
then
timeout 40m \
./unit_tests_dbms --gtest_output='json:test_output/test_result.json' \
| tee test_output/test_result.txt
else
echo "Unknown argument: $1"
exit 1
fi

View File

@ -4629,8 +4629,8 @@ Default Value: 5.
## memory_overcommit_ratio_denominator {#memory_overcommit_ratio_denominator}
It represents soft memory limit in case when hard limit is reached on user level.
This value is used to compute overcommit ratio for the query.
It represents the soft memory limit when the hard limit is reached on the global level.
This value is used to compute the overcommit ratio for the query.
Zero means skip the query.
Read more about [memory overcommit](memory-overcommit.md).
@ -4646,8 +4646,8 @@ Default value: `5000000`.
## memory_overcommit_ratio_denominator_for_user {#memory_overcommit_ratio_denominator_for_user}
It represents soft memory limit in case when hard limit is reached on global level.
This value is used to compute overcommit ratio for the query.
It represents the soft memory limit when the hard limit is reached on the user level.
This value is used to compute the overcommit ratio for the query.
Zero means skip the query.
Read more about [memory overcommit](memory-overcommit.md).
@ -5609,6 +5609,18 @@ Minimal size of block to compress in CROSS JOIN. Zero value means - disable this
Default value: `1GiB`.
## restore_replace_external_engines_to_null
For testing purposes. Replaces all external engines to Null to not initiate external connections.
Default value: `False`
## restore_replace_external_table_functions_to_null
For testing purposes. Replaces all external table functions to Null to not initiate external connections.
Default value: `False`
## disable_insertion_and_mutation
Disable all insert and mutations (alter table update / alter table delete / alter table drop partition). Set to true, can make this node focus on reading queries.

View File

@ -28,39 +28,39 @@ A client application to interact with clickhouse-keeper by its native protocol.
Connected to ZooKeeper at [::1]:9181 with session_id 137
/ :) ls
keeper foo bar
/ :) cd keeper
/ :) cd 'keeper'
/keeper :) ls
api_version
/keeper :) cd api_version
/keeper :) cd 'api_version'
/keeper/api_version :) ls
/keeper/api_version :) cd xyz
/keeper/api_version :) cd 'xyz'
Path /keeper/api_version/xyz does not exist
/keeper/api_version :) cd ../../
/ :) ls
keeper foo bar
/ :) get keeper/api_version
/ :) get 'keeper/api_version'
2
```
## Commands {#clickhouse-keeper-client-commands}
- `ls [path]` -- Lists the nodes for the given path (default: cwd)
- `cd [path]` -- Changes the working path (default `.`)
- `exists <path>` -- Returns `1` if node exists, `0` otherwise
- `set <path> <value> [version]` -- Updates the node's value. Only updates if version matches (default: -1)
- `create <path> <value> [mode]` -- Creates new node with the set value
- `touch <path>` -- Creates new node with an empty string as value. Doesn't throw an exception if the node already exists
- `get <path>` -- Returns the node's value
- `rm <path> [version]` -- Removes the node only if version matches (default: -1)
- `rmr <path>` -- Recursively deletes path. Confirmation required
- `ls '[path]'` -- Lists the nodes for the given path (default: cwd)
- `cd '[path]'` -- Changes the working path (default `.`)
- `exists '<path>'` -- Returns `1` if node exists, `0` otherwise
- `set '<path>' <value> [version]` -- Updates the node's value. Only updates if version matches (default: -1)
- `create '<path>' <value> [mode]` -- Creates new node with the set value
- `touch '<path>'` -- Creates new node with an empty string as value. Doesn't throw an exception if the node already exists
- `get '<path>'` -- Returns the node's value
- `rm '<path>' [version]` -- Removes the node only if version matches (default: -1)
- `rmr '<path>'` -- Recursively deletes path. Confirmation required
- `flwc <command>` -- Executes four-letter-word command
- `help` -- Prints this message
- `get_direct_children_number [path]` -- Get numbers of direct children nodes under a specific path
- `get_all_children_number [path]` -- Get all numbers of children nodes under a specific path
- `get_stat [path]` -- Returns the node's stat (default `.`)
- `find_super_nodes <threshold> [path]` -- Finds nodes with number of children larger than some threshold for the given path (default `.`)
- `get_direct_children_number '[path]'` -- Get numbers of direct children nodes under a specific path
- `get_all_children_number '[path]'` -- Get all numbers of children nodes under a specific path
- `get_stat '[path]'` -- Returns the node's stat (default `.`)
- `find_super_nodes <threshold> '[path]'` -- Finds nodes with number of children larger than some threshold for the given path (default `.`)
- `delete_stale_backups` -- Deletes ClickHouse nodes used for backups that are now inactive
- `find_big_family [path] [n]` -- Returns the top n nodes with the biggest family in the subtree (default path = `.` and n = 10)
- `sync <path>` -- Synchronizes node between processes and leader
- `sync '<path>'` -- Synchronizes node between processes and leader
- `reconfig <add|remove|set> "<arg>" [version]` -- Reconfigure Keeper cluster. See https://clickhouse.com/docs/en/guides/sre/keeper/clickhouse-keeper#reconfiguration

View File

@ -10,7 +10,7 @@ Calculates a concatenated string from a group of strings, optionally separated b
**Syntax**
``` sql
groupConcat(expression [, delimiter] [, limit]);
groupConcat[(delimiter [, limit])](expression);
```
**Arguments**
@ -20,7 +20,7 @@ groupConcat(expression [, delimiter] [, limit]);
- `limit` — A positive [integer](../../../sql-reference/data-types/int-uint.md) specifying the maximum number of elements to concatenate. If more elements are present, excess elements are ignored. This parameter is optional.
:::note
If delimiter is specified without limit, it must be the first parameter following the expression. If both delimiter and limit are specified, delimiter must precede limit.
If delimiter is specified without limit, it must be the first parameter. If both delimiter and limit are specified, delimiter must precede limit.
:::
**Returned value**
@ -61,7 +61,7 @@ This concatenates all names into one continuous string without any separator.
Query:
``` sql
SELECT groupConcat(Name, ', ', 2) FROM Employees;
SELECT groupConcat(', ')(Name) FROM Employees;
```
Result:
@ -78,7 +78,7 @@ This output shows the names separated by a comma followed by a space.
Query:
``` sql
SELECT groupConcat(Name, ', ', 2) FROM Employees;
SELECT groupConcat(', ', 2)(Name) FROM Employees;
```
Result:

View File

@ -36,9 +36,10 @@ If you anticipate frequent deletes, consider using a [custom partitioning key](/
## Limitations of lightweight `DELETE`
### Lightweight `DELETE`s do not work with projections
### Lightweight `DELETE`s with projections
Currently, `DELETE` does not work for tables with projections. This is because rows in a projection may be affected by a `DELETE` operation and may require the projection to be rebuilt, negatively affecting `DELETE` performance.
By default, `DELETE` does not work for tables with projections. This is because rows in a projection may be affected by a `DELETE` operation and may require the projection to be rebuilt, negatively affecting `DELETE` performance.
However, there is an option to change this behavior. By changing setting `lightweight_mutation_projection_mode = 'drop'`, deletes will work with projections.
## Performance considerations when using lightweight `DELETE`

View File

@ -5,6 +5,7 @@
#include <Common/assert_cast.h>
#include <Common/FieldVisitorToString.h>
#include <Common/SipHash.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteHelpers.h>
@ -162,6 +163,7 @@ QueryTreeNodePtr ConstantNode::cloneImpl() const
ASTPtr ConstantNode::toASTImpl(const ConvertToASTOptions & options) const
{
const auto & constant_value_literal = constant_value->getValue();
const auto & constant_value_type = constant_value->getType();
auto constant_value_ast = std::make_shared<ASTLiteral>(constant_value_literal);
if (!options.add_cast_for_constants)
@ -169,7 +171,25 @@ ASTPtr ConstantNode::toASTImpl(const ConvertToASTOptions & options) const
if (requiresCastCall())
{
auto constant_type_name_ast = std::make_shared<ASTLiteral>(constant_value->getType()->getName());
/** Value for DateTime64 is Decimal64, which is serialized as a string literal.
* If we serialize it as is, DateTime64 would be parsed from that string literal, which can be incorrect.
* For example, DateTime64 cannot be parsed from the short value, like '1', while it's a valid Decimal64 value.
* It could also lead to ambiguous parsing because we don't know if the string literal represents a date or a Decimal64 literal.
* For this reason, we use a string literal representing a date instead of a Decimal64 literal.
*/
if (WhichDataType(constant_value_type->getTypeId()).isDateTime64())
{
const auto * date_time_type = typeid_cast<const DataTypeDateTime64 *>(constant_value_type.get());
DecimalField<Decimal64> decimal_value;
if (constant_value_literal.tryGet<DecimalField<Decimal64>>(decimal_value))
{
WriteBufferFromOwnString ostr;
writeDateTimeText(decimal_value.getValue(), date_time_type->getScale(), ostr, date_time_type->getTimeZone());
constant_value_ast = std::make_shared<ASTLiteral>(ostr.str());
}
}
auto constant_type_name_ast = std::make_shared<ASTLiteral>(constant_value_type->getName());
return makeASTFunction("_CAST", std::move(constant_value_ast), std::move(constant_type_name_ast));
}

View File

@ -46,7 +46,7 @@ public:
return;
const auto & storage_snapshot = table_node ? table_node->getStorageSnapshot() : table_function_node->getStorageSnapshot();
if (!storage->isVirtualColumn(column.name, storage_snapshot->getMetadataForQuery()))
if (!storage->isVirtualColumn(column.name, storage_snapshot->metadata))
return;
auto function_node = std::make_shared<FunctionNode>("shardNum");

View File

@ -243,7 +243,7 @@ public:
}
/// Clear and finish queue
void clearAndFinish()
void clearAndFinish() noexcept
{
{
std::lock_guard lock(queue_mutex);

View File

@ -8,7 +8,9 @@ using namespace DB;
using ResourceTest = ResourceTestClass;
TEST(SchedulerFairPolicy, Factory)
/// Tests disabled because of leaks in the test themselves: https://github.com/ClickHouse/ClickHouse/issues/67678
TEST(DISABLED_SchedulerFairPolicy, Factory)
{
ResourceTest t;
@ -17,7 +19,7 @@ TEST(SchedulerFairPolicy, Factory)
EXPECT_TRUE(dynamic_cast<FairPolicy *>(fair.get()) != nullptr);
}
TEST(SchedulerFairPolicy, FairnessWeights)
TEST(DISABLED_SchedulerFairPolicy, FairnessWeights)
{
ResourceTest t;
@ -41,7 +43,7 @@ TEST(SchedulerFairPolicy, FairnessWeights)
t.consumed("B", 20);
}
TEST(SchedulerFairPolicy, Activation)
TEST(DISABLED_SchedulerFairPolicy, Activation)
{
ResourceTest t;
@ -77,7 +79,7 @@ TEST(SchedulerFairPolicy, Activation)
t.consumed("B", 10);
}
TEST(SchedulerFairPolicy, FairnessMaxMin)
TEST(DISABLED_SchedulerFairPolicy, FairnessMaxMin)
{
ResourceTest t;
@ -101,7 +103,7 @@ TEST(SchedulerFairPolicy, FairnessMaxMin)
t.consumed("A", 20);
}
TEST(SchedulerFairPolicy, HierarchicalFairness)
TEST(DISABLED_SchedulerFairPolicy, HierarchicalFairness)
{
ResourceTest t;

View File

@ -8,7 +8,9 @@ using namespace DB;
using ResourceTest = ResourceTestClass;
TEST(SchedulerPriorityPolicy, Factory)
/// Tests disabled because of leaks in the test themselves: https://github.com/ClickHouse/ClickHouse/issues/67678
TEST(DISABLED_SchedulerPriorityPolicy, Factory)
{
ResourceTest t;
@ -17,7 +19,7 @@ TEST(SchedulerPriorityPolicy, Factory)
EXPECT_TRUE(dynamic_cast<PriorityPolicy *>(prio.get()) != nullptr);
}
TEST(SchedulerPriorityPolicy, Priorities)
TEST(DISABLED_SchedulerPriorityPolicy, Priorities)
{
ResourceTest t;
@ -51,7 +53,7 @@ TEST(SchedulerPriorityPolicy, Priorities)
t.consumed("C", 0);
}
TEST(SchedulerPriorityPolicy, Activation)
TEST(DISABLED_SchedulerPriorityPolicy, Activation)
{
ResourceTest t;
@ -92,7 +94,7 @@ TEST(SchedulerPriorityPolicy, Activation)
t.consumed("C", 0);
}
TEST(SchedulerPriorityPolicy, SinglePriority)
TEST(DISABLED_SchedulerPriorityPolicy, SinglePriority)
{
ResourceTest t;

View File

@ -10,7 +10,9 @@ using namespace DB;
using ResourceTest = ResourceTestClass;
TEST(SchedulerThrottlerConstraint, LeakyBucketConstraint)
/// Tests disabled because of leaks in the test themselves: https://github.com/ClickHouse/ClickHouse/issues/67678
TEST(DISABLED_SchedulerThrottlerConstraint, LeakyBucketConstraint)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();
@ -40,7 +42,7 @@ TEST(SchedulerThrottlerConstraint, LeakyBucketConstraint)
t.consumed("A", 10);
}
TEST(SchedulerThrottlerConstraint, Unlimited)
TEST(DISABLED_SchedulerThrottlerConstraint, Unlimited)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();
@ -57,7 +59,7 @@ TEST(SchedulerThrottlerConstraint, Unlimited)
}
}
TEST(SchedulerThrottlerConstraint, Pacing)
TEST(DISABLED_SchedulerThrottlerConstraint, Pacing)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();
@ -77,7 +79,7 @@ TEST(SchedulerThrottlerConstraint, Pacing)
}
}
TEST(SchedulerThrottlerConstraint, BucketFilling)
TEST(DISABLED_SchedulerThrottlerConstraint, BucketFilling)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();
@ -111,7 +113,7 @@ TEST(SchedulerThrottlerConstraint, BucketFilling)
t.consumed("A", 3);
}
TEST(SchedulerThrottlerConstraint, PeekAndAvgLimits)
TEST(DISABLED_SchedulerThrottlerConstraint, PeekAndAvgLimits)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();
@ -139,7 +141,7 @@ TEST(SchedulerThrottlerConstraint, PeekAndAvgLimits)
}
}
TEST(SchedulerThrottlerConstraint, ThrottlerAndFairness)
TEST(DISABLED_SchedulerThrottlerConstraint, ThrottlerAndFairness)
{
ResourceTest t;
EventQueue::TimePoint start = std::chrono::system_clock::now();

View File

@ -14,20 +14,21 @@
/// because of broken getauxval() [1].
///
/// [1]: https://github.com/ClickHouse/ClickHouse/pull/33957
TEST(Common, LSan)
TEST(SanitizerDeathTest, LSan)
{
int sanitizers_exit_code = 1;
ASSERT_EXIT({
std::thread leak_in_thread([]()
EXPECT_DEATH(
{
void * leak = malloc(4096);
ASSERT_NE(leak, nullptr);
});
leak_in_thread.join();
std::thread leak_in_thread(
[]()
{
void * leak = malloc(4096);
ASSERT_NE(leak, nullptr);
});
leak_in_thread.join();
__lsan_do_leak_check();
}, ::testing::ExitedWithCode(sanitizers_exit_code), ".*LeakSanitizer: detected memory leaks.*");
__lsan_do_leak_check();
},
".*LeakSanitizer: detected memory leaks.*");
}
#endif

View File

@ -5,19 +5,19 @@
# If you want really small size of the resulted binary, just link with fuzz_compression and clickhouse_common_io
clickhouse_add_executable (compressed_buffer_fuzzer compressed_buffer_fuzzer.cpp)
target_link_libraries (compressed_buffer_fuzzer PRIVATE dbms)
target_link_libraries (compressed_buffer_fuzzer PRIVATE dbms clickhouse_functions)
clickhouse_add_executable (lz4_decompress_fuzzer lz4_decompress_fuzzer.cpp)
target_link_libraries (lz4_decompress_fuzzer PUBLIC dbms ch_contrib::lz4)
target_link_libraries (lz4_decompress_fuzzer PUBLIC dbms ch_contrib::lz4 clickhouse_functions)
clickhouse_add_executable (delta_decompress_fuzzer delta_decompress_fuzzer.cpp)
target_link_libraries (delta_decompress_fuzzer PRIVATE dbms)
target_link_libraries (delta_decompress_fuzzer PRIVATE dbms clickhouse_functions)
clickhouse_add_executable (double_delta_decompress_fuzzer double_delta_decompress_fuzzer.cpp)
target_link_libraries (double_delta_decompress_fuzzer PRIVATE dbms)
target_link_libraries (double_delta_decompress_fuzzer PRIVATE dbms clickhouse_functions)
clickhouse_add_executable (encrypted_decompress_fuzzer encrypted_decompress_fuzzer.cpp)
target_link_libraries (encrypted_decompress_fuzzer PRIVATE dbms)
target_link_libraries (encrypted_decompress_fuzzer PRIVATE dbms clickhouse_functions)
clickhouse_add_executable (gcd_decompress_fuzzer gcd_decompress_fuzzer.cpp)
target_link_libraries (gcd_decompress_fuzzer PRIVATE dbms)
target_link_libraries (gcd_decompress_fuzzer PRIVATE dbms clickhouse_functions)

View File

@ -893,6 +893,8 @@ class IColumn;
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(UInt64, extract_key_value_pairs_max_pairs_per_row, 1000, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory.", 0) ALIAS(extract_kvp_max_pairs_per_row) \
M(Bool, restore_replace_external_engines_to_null, false, "Replace all the external table engines to Null on restore. Useful for testing purposes", 0) \
M(Bool, restore_replace_external_table_functions_to_null, false, "Replace all table functions to Null on restore. Useful for testing purposes", 0) \
\
\
/* ###################################### */ \

View File

@ -75,6 +75,8 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
},
{"24.8",
{
{"restore_replace_external_table_functions_to_null", false, false, "New setting."},
{"restore_replace_external_engines_to_null", false, false, "New setting."},
{"input_format_json_max_depth", 1000000, 1000, "It was unlimited in previous versions, but that was unsafe."},
{"merge_tree_min_bytes_per_task_for_remote_reading", 4194304, 2097152, "Value is unified with `filesystem_prefetch_min_bytes_for_single_read_task`"},
{"allow_archive_path_syntax", true, true, "Added new setting to allow disabling archive path syntax."},

View File

@ -177,6 +177,11 @@ IMPLEMENT_SETTING_ENUM(LightweightMutationProjectionMode, ErrorCodes::BAD_ARGUME
{{"throw", LightweightMutationProjectionMode::THROW},
{"drop", LightweightMutationProjectionMode::DROP}})
IMPLEMENT_SETTING_ENUM(DeduplicateMergeProjectionMode, ErrorCodes::BAD_ARGUMENTS,
{{"throw", DeduplicateMergeProjectionMode::THROW},
{"drop", DeduplicateMergeProjectionMode::DROP},
{"rebuild", DeduplicateMergeProjectionMode::REBUILD}})
IMPLEMENT_SETTING_AUTO_ENUM(LocalFSReadMethod, ErrorCodes::BAD_ARGUMENTS)
IMPLEMENT_SETTING_ENUM(ParquetVersion, ErrorCodes::BAD_ARGUMENTS,

View File

@ -315,6 +315,15 @@ enum class LightweightMutationProjectionMode : uint8_t
DECLARE_SETTING_ENUM(LightweightMutationProjectionMode)
enum class DeduplicateMergeProjectionMode : uint8_t
{
THROW,
DROP,
REBUILD,
};
DECLARE_SETTING_ENUM(DeduplicateMergeProjectionMode)
DECLARE_SETTING_ENUM(LocalFSReadMethod)
enum class ObjectStorageQueueMode : uint8_t

View File

@ -33,6 +33,16 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
DataTypeAggregateFunction::DataTypeAggregateFunction(AggregateFunctionPtr function_, const DataTypes & argument_types_,
const Array & parameters_, std::optional<size_t> version_)
: function(std::move(function_))
, argument_types(argument_types_)
, parameters(parameters_)
, version(version_)
{
}
String DataTypeAggregateFunction::getFunctionName() const
{
return function->getName();

View File

@ -30,13 +30,7 @@ public:
static constexpr bool is_parametric = true;
DataTypeAggregateFunction(AggregateFunctionPtr function_, const DataTypes & argument_types_,
const Array & parameters_, std::optional<size_t> version_ = std::nullopt)
: function(std::move(function_))
, argument_types(argument_types_)
, parameters(parameters_)
, version(version_)
{
}
const Array & parameters_, std::optional<size_t> version_ = std::nullopt);
size_t getVersion() const;

View File

@ -90,7 +90,9 @@ void IDataType::forEachSubcolumn(
{
auto name = ISerialization::getSubcolumnNameForStream(subpath, prefix_len);
auto subdata = ISerialization::createFromPath(subpath, prefix_len);
callback(subpath, name, subdata);
auto path_copy = subpath;
path_copy.resize(prefix_len);
callback(path_copy, name, subdata);
}
subpath[i].visited = true;
}

View File

@ -8,6 +8,7 @@
#include <DataTypes/DataTypeNothing.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNested.h>
@ -66,6 +67,36 @@ DataTypePtr getBaseTypeOfArray(const DataTypePtr & type)
return last_array ? last_array->getNestedType() : type;
}
DataTypePtr getBaseTypeOfArray(DataTypePtr type, const Names & tuple_elements)
{
auto it = tuple_elements.begin();
while (true)
{
if (const auto * type_array = typeid_cast<const DataTypeArray *>(type.get()))
{
type = type_array->getNestedType();
}
else if (const auto * type_tuple = typeid_cast<const DataTypeTuple *>(type.get()))
{
if (it == tuple_elements.end())
break;
auto pos = type_tuple->tryGetPositionByName(*it);
if (!pos)
break;
++it;
type = type_tuple->getElement(*pos);
}
else
{
break;
}
}
return type;
}
ColumnPtr getBaseColumnOfArray(const ColumnPtr & column)
{
/// Get raw pointers to avoid extra copying of column pointers.

View File

@ -27,6 +27,9 @@ size_t getNumberOfDimensions(const IColumn & column);
/// Returns type of scalars of Array of arbitrary dimensions.
DataTypePtr getBaseTypeOfArray(const DataTypePtr & type);
/// The same as above but takes into account Tuples of Nested.
DataTypePtr getBaseTypeOfArray(DataTypePtr type, const Names & tuple_elements);
/// Returns Array type with requested scalar type and number of dimensions.
DataTypePtr createArrayOfType(DataTypePtr type, size_t num_dimensions);

View File

@ -195,7 +195,7 @@ public:
/// Types of substreams that can have arbitrary name.
static const std::set<Type> named_types;
Type type;
Type type = Type::Regular;
/// The name of a variant element type.
String variant_element_name;
@ -212,6 +212,7 @@ public:
/// Flag, that may help to traverse substream paths.
mutable bool visited = false;
Substream() = default;
Substream(Type type_) : type(type_) {} /// NOLINT
String toString() const;
};

View File

@ -3307,6 +3307,17 @@ void NO_INLINE Aggregator::destroyImpl(Table & table) const
data = nullptr;
});
if constexpr (Method::low_cardinality_optimization || Method::one_key_nullable_optimization)
{
if (table.getNullKeyData() != nullptr)
{
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->destroy(table.getNullKeyData() + offsets_of_aggregate_states[i]);
table.getNullKeyData() = nullptr;
}
}
}

View File

@ -951,12 +951,40 @@ namespace
engine_ast->no_empty_args = true;
storage.set(storage.engine, engine_ast);
}
void setNullTableEngine(ASTStorage & storage)
{
auto engine_ast = std::make_shared<ASTFunction>();
engine_ast->name = "Null";
engine_ast->no_empty_args = true;
storage.set(storage.engine, engine_ast);
}
}
void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
{
if (create.as_table_function)
{
if (getContext()->getSettingsRef().restore_replace_external_table_functions_to_null)
{
const auto & factory = TableFunctionFactory::instance();
auto properties = factory.tryGetProperties(create.as_table_function->as<ASTFunction>()->name);
if (properties && properties->allow_readonly)
return;
if (!create.storage)
{
auto storage_ast = std::make_shared<ASTStorage>();
create.set(create.storage, storage_ast);
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Storage should not be created yet, it's a bug.");
create.as_table_function = nullptr;
setNullTableEngine(*create.storage);
}
return;
}
if (create.is_dictionary || create.is_ordinary_view || create.is_live_view || create.is_window_view)
return;
@ -1007,6 +1035,13 @@ void InterpreterCreateQuery::setEngine(ASTCreateQuery & create) const
/// Some part of storage definition (such as PARTITION BY) is specified, but ENGINE is not: just set default one.
setDefaultTableEngine(*create.storage, getContext()->getSettingsRef().default_table_engine.value);
}
/// For external tables with restore_replace_external_engine_to_null setting we replace external engines to
/// Null table engine.
else if (getContext()->getSettingsRef().restore_replace_external_engines_to_null)
{
if (StorageFactory::instance().getStorageFeatures(create.storage->engine->name).source_access_type != AccessType::NONE)
setNullTableEngine(*create.storage);
}
return;
}

View File

@ -1158,7 +1158,8 @@ bool TreeRewriterResult::collectUsedColumns(const ASTPtr & query, bool is_select
}
}
has_virtual_shard_num = is_remote_storage && storage->isVirtualColumn("_shard_num", storage_snapshot->getMetadataForQuery()) && virtuals->has("_shard_num");
has_virtual_shard_num
= is_remote_storage && storage->isVirtualColumn("_shard_num", storage_snapshot->metadata) && virtuals->has("_shard_num");
}
/// Collect missed object subcolumns

View File

@ -18,6 +18,7 @@
#include <Storages/ColumnsDescription.h>
#include <DataTypes/NestedUtils.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <DataTypes/DataTypeArray.h>
#include <Storages/StorageInMemoryMetadata.h>
@ -35,8 +36,13 @@ namespace
/// Add all required expressions for missing columns calculation
void addDefaultRequiredExpressionsRecursively(
const Block & block, const String & required_column_name, DataTypePtr required_column_type,
const ColumnsDescription & columns, ASTPtr default_expr_list_accum, NameSet & added_columns, bool null_as_default)
const Block & block,
const String & required_column_name,
DataTypePtr required_column_type,
const ColumnsDescription & columns,
ASTPtr default_expr_list_accum,
NameSet & added_columns,
bool null_as_default)
{
checkStackSize();
@ -271,6 +277,53 @@ static std::unordered_map<String, ColumnPtr> collectOffsetsColumns(
return offsets_columns;
}
static ColumnPtr createColumnWithDefaultValue(const IDataType & data_type, const String & subcolumn_name, size_t num_rows)
{
auto column = data_type.createColumnConstWithDefaultValue(num_rows);
/// We must turn a constant column into a full column because the interpreter could infer
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
if (subcolumn_name.empty())
return column->convertToFullColumnIfConst();
/// Firstly get subcolumn from const column and then replicate.
column = assert_cast<const ColumnConst &>(*column).getDataColumnPtr();
column = data_type.getSubcolumn(subcolumn_name, column);
return ColumnConst::create(std::move(column), num_rows)->convertToFullColumnIfConst();
}
static bool hasDefault(const StorageMetadataPtr & metadata_snapshot, const NameAndTypePair & column)
{
if (!metadata_snapshot)
return false;
const auto & columns = metadata_snapshot->getColumns();
if (columns.has(column.name))
return columns.hasDefault(column.name);
auto name_in_storage = column.getNameInStorage();
return columns.hasDefault(name_in_storage);
}
static String removeTupleElementsFromSubcolumn(String subcolumn_name, const Names & tuple_elements)
{
/// Add a dot to the end of name for convenience.
subcolumn_name += ".";
for (const auto & elem : tuple_elements)
{
auto pos = subcolumn_name.find(elem + ".");
if (pos != std::string::npos)
subcolumn_name.erase(pos, elem.size() + 1);
}
if (subcolumn_name.ends_with("."))
subcolumn_name.pop_back();
return subcolumn_name;
}
void fillMissingColumns(
Columns & res_columns,
size_t num_rows,
@ -296,21 +349,17 @@ void fillMissingColumns(
auto requested_column = requested_columns.begin();
for (size_t i = 0; i < num_columns; ++i, ++requested_column)
{
const auto & [name, type] = *requested_column;
if (res_columns[i] && partially_read_columns.contains(name))
if (res_columns[i] && partially_read_columns.contains(requested_column->name))
res_columns[i] = nullptr;
if (res_columns[i])
continue;
if (metadata_snapshot && metadata_snapshot->getColumns().hasDefault(name))
/// Nothing to fill or default should be filled in evaluateMissingDefaults
if (res_columns[i] || hasDefault(metadata_snapshot, *requested_column))
continue;
std::vector<ColumnPtr> current_offsets;
size_t num_dimensions = 0;
const auto * array_type = typeid_cast<const DataTypeArray *>(type.get());
const auto * array_type = typeid_cast<const DataTypeArray *>(requested_column->type.get());
if (array_type && !offsets_columns.empty())
{
num_dimensions = getNumberOfDimensions(*array_type);
@ -345,20 +394,34 @@ void fillMissingColumns(
if (!current_offsets.empty())
{
size_t num_empty_dimensions = num_dimensions - current_offsets.size();
auto scalar_type = createArrayOfType(getBaseTypeOfArray(type), num_empty_dimensions);
Names tuple_elements;
auto serialization = IDataType::getSerialization(*requested_column);
/// For Nested columns collect names of tuple elements and skip them while getting the base type of array.
IDataType::forEachSubcolumn([&](const auto & path, const auto &, const auto &)
{
if (path.back().type == ISerialization::Substream::TupleElement)
tuple_elements.push_back(path.back().name_of_substream);
}, ISerialization::SubstreamData(serialization));
/// The number of dimensions that belongs to the array itself but not shared in Nested column.
/// For example for column "n Nested(a UInt64, b Array(UInt64))" this value is 0 for `n.a` and 1 for `n.b`.
size_t num_empty_dimensions = num_dimensions - current_offsets.size();
auto base_type = getBaseTypeOfArray(requested_column->getTypeInStorage(), tuple_elements);
auto scalar_type = createArrayOfType(base_type, num_empty_dimensions);
size_t data_size = assert_cast<const ColumnUInt64 &>(*current_offsets.back()).getData().back();
res_columns[i] = scalar_type->createColumnConstWithDefaultValue(data_size)->convertToFullColumnIfConst();
/// Remove names of tuple elements because they are already processed by 'getBaseTypeOfArray'.
auto subcolumn_name = removeTupleElementsFromSubcolumn(requested_column->getSubcolumnName(), tuple_elements);
res_columns[i] = createColumnWithDefaultValue(*scalar_type, subcolumn_name, data_size);
for (auto it = current_offsets.rbegin(); it != current_offsets.rend(); ++it)
res_columns[i] = ColumnArray::create(res_columns[i], *it);
}
else
{
/// We must turn a constant column into a full column because the interpreter could infer
/// that it is constant everywhere but in some blocks (from other parts) it can be a full column.
res_columns[i] = type->createColumnConstWithDefaultValue(num_rows)->convertToFullColumnIfConst();
res_columns[i] = createColumnWithDefaultValue(*requested_column->getTypeInStorage(), requested_column->getSubcolumnName(), num_rows);
}
}
}

View File

@ -417,20 +417,20 @@ void updatePrewhereOutputsIfNeeded(SelectQueryInfo & table_expression_query_info
/// We evaluate sampling for Merge lazily so we need to get all the columns
if (storage_snapshot->storage.getName() == "Merge")
{
const auto columns = storage_snapshot->getMetadataForQuery()->getColumns().getAll();
const auto columns = storage_snapshot->metadata->getColumns().getAll();
for (const auto & column : columns)
required_columns.insert(column.name);
}
else
{
auto columns_required_for_sampling = storage_snapshot->getMetadataForQuery()->getColumnsRequiredForSampling();
auto columns_required_for_sampling = storage_snapshot->metadata->getColumnsRequiredForSampling();
required_columns.insert(columns_required_for_sampling.begin(), columns_required_for_sampling.end());
}
}
if (table_expression_modifiers->hasFinal())
{
auto columns_required_for_final = storage_snapshot->getMetadataForQuery()->getColumnsRequiredForFinal();
auto columns_required_for_final = storage_snapshot->metadata->getColumnsRequiredForFinal();
required_columns.insert(columns_required_for_final.begin(), columns_required_for_final.end());
}
}

View File

@ -97,7 +97,9 @@ void CompletedPipelineExecutor::execute()
break;
if (is_cancelled_callback())
{
data->executor->cancel();
}
}
if (data->has_exception)
@ -116,7 +118,9 @@ CompletedPipelineExecutor::~CompletedPipelineExecutor()
try
{
if (data && data->executor)
{
data->executor->cancel();
}
}
catch (...)
{

View File

@ -32,7 +32,7 @@ public:
private:
Chunk read() override;
void onCancel() override
void onCancel() noexcept override
{
is_stopped = 1;
}

View File

@ -32,7 +32,7 @@ public:
protected:
Chunk read() override;
void onCancel() override
void onCancel() noexcept override
{
is_stopped = 1;
}

View File

@ -64,7 +64,7 @@ public:
protected:
Chunk read() override;
void onCancel() override { is_stopped = 1; }
void onCancel() noexcept override { is_stopped = 1; }
private:
void prepareFileReader();

View File

@ -34,7 +34,7 @@ public:
protected:
Chunk read() override;
void onCancel() override
void onCancel() noexcept override
{
is_stopped = 1;
}

View File

@ -96,7 +96,7 @@ namespace DB
}
void ParallelFormattingOutputFormat::finishAndWait()
void ParallelFormattingOutputFormat::finishAndWait() noexcept
{
emergency_stop = true;

View File

@ -122,7 +122,7 @@ public:
started_prefix = true;
}
void onCancel() override
void onCancel() noexcept override
{
finishAndWait();
}
@ -268,7 +268,7 @@ private:
bool collected_suffix = false;
bool collected_finalize = false;
void finishAndWait();
void finishAndWait() noexcept;
void onBackgroundException()
{

View File

@ -137,7 +137,7 @@ private:
Chunk read() final;
void onCancel() final
void onCancel() noexcept final
{
/*
* The format parsers themselves are not being cancelled here, so we'll
@ -292,7 +292,7 @@ private:
first_parser_finished.wait();
}
void finishAndWait()
void finishAndWait() noexcept
{
/// Defending concurrent segmentator thread join
std::lock_guard finish_and_wait_lock(finish_and_wait_mutex);

View File

@ -68,7 +68,7 @@ public:
private:
Chunk read() override;
void onCancel() override
void onCancel() noexcept override
{
is_stopped = 1;
}

View File

@ -270,7 +270,7 @@ void ParquetBlockOutputFormat::resetFormatterImpl()
staging_bytes = 0;
}
void ParquetBlockOutputFormat::onCancel()
void ParquetBlockOutputFormat::onCancel() noexcept
{
is_stopped = true;
}

View File

@ -112,7 +112,7 @@ private:
void consume(Chunk) override;
void finalizeImpl() override;
void resetFormatterImpl() override;
void onCancel() override;
void onCancel() noexcept override;
void writeRowGroup(std::vector<Chunk> chunks);
void writeUsingArrow(std::vector<Chunk> chunks);

View File

@ -65,7 +65,7 @@ public:
private:
Chunk read() override;
void onCancel() override
void onCancel() noexcept override
{
is_stopped = 1;
}

View File

@ -29,7 +29,7 @@ public:
void setRowsBeforeLimit(size_t rows_before_limit) override;
void onCancel() override
void onCancel() noexcept override
{
queue.clearAndFinish();
}

View File

@ -9,7 +9,7 @@
namespace DB
{
void IProcessor::cancel()
void IProcessor::cancel() noexcept
{
bool already_cancelled = is_cancelled.exchange(true, std::memory_order_acq_rel);

View File

@ -255,7 +255,7 @@ public:
/// In case if query was cancelled executor will wait till all processors finish their jobs.
/// Generally, there is no reason to check this flag. However, it may be reasonable for long operations (e.g. i/o).
bool isCancelled() const { return is_cancelled.load(std::memory_order_acquire); }
void cancel();
void cancel() noexcept;
/// Additional method which is called in case if ports were updated while work() method.
/// May be used to stop execution in rare cases.
@ -380,7 +380,7 @@ public:
virtual void setRowsBeforeLimitCounter(RowsBeforeLimitCounterPtr /* counter */) {}
protected:
virtual void onCancel() {}
virtual void onCancel() noexcept {}
std::atomic<bool> is_cancelled{false};

View File

@ -757,9 +757,7 @@ std::optional<String> optimizeUseAggregateProjections(QueryPlan::Node & node, Qu
else
{
auto storage_snapshot = reading->getStorageSnapshot();
auto proj_snapshot = std::make_shared<StorageSnapshot>(storage_snapshot->storage, storage_snapshot->metadata);
proj_snapshot->addProjection(best_candidate->projection);
auto proj_snapshot = std::make_shared<StorageSnapshot>(storage_snapshot->storage, best_candidate->projection->metadata);
auto projection_query_info = query_info;
projection_query_info.prewhere_info = nullptr;
projection_query_info.filter_actions_dag = nullptr;

View File

@ -193,9 +193,7 @@ std::optional<String> optimizeUseNormalProjections(Stack & stack, QueryPlan::Nod
}
auto storage_snapshot = reading->getStorageSnapshot();
auto proj_snapshot = std::make_shared<StorageSnapshot>(storage_snapshot->storage, storage_snapshot->metadata);
proj_snapshot->addProjection(best_candidate->projection);
auto proj_snapshot = std::make_shared<StorageSnapshot>(storage_snapshot->storage, best_candidate->projection->metadata);
auto query_info_copy = query_info;
query_info_copy.prewhere_info = nullptr;

View File

@ -285,7 +285,6 @@ ReadFromMergeTree::ReadFromMergeTree(
, all_column_names(std::move(all_column_names_))
, data(data_)
, actions_settings(ExpressionActionsSettings::fromContext(context_))
, metadata_for_reading(storage_snapshot->getMetadataForQuery())
, block_size{
.max_block_size_rows = max_block_size_,
.preferred_block_size_bytes = context->getSettingsRef().preferred_block_size_bytes,
@ -327,7 +326,7 @@ ReadFromMergeTree::ReadFromMergeTree(
updateSortDescriptionForOutputStream(
*output_stream,
storage_snapshot->getMetadataForQuery()->getSortingKeyColumns(),
storage_snapshot->metadata->getSortingKeyColumns(),
getSortDirection(),
query_info.input_order_info,
prewhere_info,
@ -782,7 +781,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(RangesInDataParts && parts_
Names in_order_column_names_to_read(column_names);
/// Add columns needed to calculate the sorting expression
for (const auto & column_name : metadata_for_reading->getColumnsRequiredForSortingKey())
for (const auto & column_name : storage_snapshot->metadata->getColumnsRequiredForSortingKey())
{
if (column_names_set.contains(column_name))
continue;
@ -802,10 +801,10 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreams(RangesInDataParts && parts_
info.use_uncompressed_cache);
};
auto sorting_expr = metadata_for_reading->getSortingKey().expression;
auto sorting_expr = storage_snapshot->metadata->getSortingKey().expression;
SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey(
metadata_for_reading->getPrimaryKey(),
storage_snapshot->metadata->getPrimaryKey(),
std::move(sorting_expr),
std::move(parts_with_ranges),
num_streams,
@ -883,7 +882,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
if (prewhere_info)
{
NameSet sorting_columns;
for (const auto & column : metadata_for_reading->getSortingKey().expression->getRequiredColumnsWithTypes())
for (const auto & column : storage_snapshot->metadata->getSortingKey().expression->getRequiredColumnsWithTypes())
sorting_columns.insert(column.name);
have_input_columns_removed_after_prewhere = restorePrewhereInputs(*prewhere_info, sorting_columns);
@ -1038,12 +1037,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
if (need_preliminary_merge || output_each_partition_through_separate_port)
{
size_t prefix_size = input_order_info->used_prefix_of_sorting_key_size;
auto order_key_prefix_ast = metadata_for_reading->getSortingKey().expression_list_ast->clone();
auto order_key_prefix_ast = storage_snapshot->metadata->getSortingKey().expression_list_ast->clone();
order_key_prefix_ast->children.resize(prefix_size);
auto syntax_result = TreeRewriter(context).analyze(order_key_prefix_ast, metadata_for_reading->getColumns().getAllPhysical());
auto syntax_result = TreeRewriter(context).analyze(order_key_prefix_ast, storage_snapshot->metadata->getColumns().getAllPhysical());
auto sorting_key_prefix_expr = ExpressionAnalyzer(order_key_prefix_ast, syntax_result, context).getActionsDAG(false);
const auto & sorting_columns = metadata_for_reading->getSortingKey().column_names;
const auto & sorting_columns = storage_snapshot->metadata->getSortingKey().column_names;
SortDescription sort_description;
sort_description.compile_sort_description = settings.compile_sort_description;
@ -1150,7 +1149,7 @@ bool ReadFromMergeTree::doNotMergePartsAcrossPartitionsFinal() const
if (settings.do_not_merge_across_partitions_select_final.changed)
return settings.do_not_merge_across_partitions_select_final;
if (!metadata_for_reading->hasPrimaryKey() || !metadata_for_reading->hasPartitionKey())
if (!storage_snapshot->metadata->hasPrimaryKey() || !storage_snapshot->metadata->hasPartitionKey())
return false;
/** To avoid merging parts across partitions we want result of partition key expression for
@ -1160,11 +1159,11 @@ bool ReadFromMergeTree::doNotMergePartsAcrossPartitionsFinal() const
* in primary key, then for same primary key column values, result of partition key expression
* will be the same.
*/
const auto & partition_key_expression = metadata_for_reading->getPartitionKey().expression;
const auto & partition_key_expression = storage_snapshot->metadata->getPartitionKey().expression;
if (partition_key_expression->getActionsDAG().hasNonDeterministic())
return false;
const auto & primary_key_columns = metadata_for_reading->getPrimaryKey().column_names;
const auto & primary_key_columns = storage_snapshot->metadata->getPrimaryKey().column_names;
NameSet primary_key_columns_set(primary_key_columns.begin(), primary_key_columns.end());
const auto & partition_key_required_columns = partition_key_expression->getRequiredColumns();
@ -1217,12 +1216,12 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
/// we will store lonely parts with level > 0 to use parallel select on them.
RangesInDataParts non_intersecting_parts_by_primary_key;
auto sorting_expr = metadata_for_reading->getSortingKey().expression;
auto sorting_expr = storage_snapshot->metadata->getSortingKey().expression;
if (prewhere_info)
{
NameSet sorting_columns;
for (const auto & column : metadata_for_reading->getSortingKey().expression->getRequiredColumnsWithTypes())
for (const auto & column : storage_snapshot->metadata->getSortingKey().expression->getRequiredColumnsWithTypes())
sorting_columns.insert(column.name);
restorePrewhereInputs(*prewhere_info, sorting_columns);
}
@ -1253,7 +1252,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
if (new_parts.empty())
continue;
if (num_streams > 1 && metadata_for_reading->hasPrimaryKey())
if (num_streams > 1 && storage_snapshot->metadata->hasPrimaryKey())
{
// Let's split parts into non intersecting parts ranges and layers to ensure data parallelism of FINAL.
auto in_order_reading_step_getter = [this, &column_names, &info](auto parts)
@ -1273,7 +1272,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
data.merging_params.is_deleted_column.empty() && !reader_settings.read_in_order;
SplitPartsWithRangesByPrimaryKeyResult split_ranges_result = splitPartsWithRangesByPrimaryKey(
metadata_for_reading->getPrimaryKey(),
storage_snapshot->metadata->getPrimaryKey(),
sorting_expr,
std::move(new_parts),
num_streams,
@ -1305,7 +1304,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
if (pipes.empty())
continue;
Names sort_columns = metadata_for_reading->getSortingKeyColumns();
Names sort_columns = storage_snapshot->metadata->getSortingKeyColumns();
SortDescription sort_description;
sort_description.compile_sort_description = settings.compile_sort_description;
sort_description.min_count_to_compile_sort_description = settings.min_count_to_compile_sort_description;
@ -1313,7 +1312,7 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsFinal(
size_t sort_columns_size = sort_columns.size();
sort_description.reserve(sort_columns_size);
Names partition_key_columns = metadata_for_reading->getPartitionKey().column_names;
Names partition_key_columns = storage_snapshot->metadata->getPartitionKey().column_names;
for (size_t i = 0; i < sort_columns_size; ++i)
sort_description.emplace_back(sort_columns[i], 1, 1);
@ -1370,7 +1369,7 @@ ReadFromMergeTree::AnalysisResultPtr ReadFromMergeTree::selectRangesToRead(
return selectRangesToRead(
std::move(parts),
std::move(alter_conversions),
metadata_for_reading,
storage_snapshot->metadata,
query_info,
context,
requested_num_streams,
@ -1534,7 +1533,7 @@ void ReadFromMergeTree::applyFilters(ActionDAGNodes added_filter_nodes)
prepared_parts,
context,
query_info,
metadata_for_reading);
storage_snapshot->metadata);
}
}
@ -1703,7 +1702,7 @@ bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
/// update sort info for output stream
SortDescription sort_description;
const Names & sorting_key_columns = metadata_for_reading->getSortingKeyColumns();
const Names & sorting_key_columns = storage_snapshot->metadata->getSortingKeyColumns();
const Block & header = output_stream->header;
const int sort_direction = getSortDirection();
for (const auto & column_name : sorting_key_columns)
@ -1745,7 +1744,7 @@ void ReadFromMergeTree::updatePrewhereInfo(const PrewhereInfoPtr & prewhere_info
updateSortDescriptionForOutputStream(
*output_stream,
storage_snapshot->getMetadataForQuery()->getSortingKeyColumns(),
storage_snapshot->metadata->getSortingKeyColumns(),
getSortDirection(),
query_info.input_order_info,
prewhere_info,
@ -1871,7 +1870,7 @@ Pipe ReadFromMergeTree::spreadMarkRanges(
throw Exception(ErrorCodes::LOGICAL_ERROR, "Optimization isn't supposed to be used for queries with final");
/// Add columns needed to calculate the sorting expression and the sign.
for (const auto & column : metadata_for_reading->getColumnsRequiredForSortingKey())
for (const auto & column : storage_snapshot->metadata->getColumnsRequiredForSortingKey())
{
if (!names.contains(column))
{
@ -1965,10 +1964,6 @@ void ReadFromMergeTree::initializePipeline(QueryPipelineBuilder & pipeline, cons
fmt::format("{}.{}", data.getStorageID().getFullNameNotQuoted(), part.data_part->info.partition_id));
}
context->getQueryContext()->addQueryAccessInfo(partition_names);
if (storage_snapshot->projection)
context->getQueryContext()->addQueryAccessInfo(
Context::QualifiedProjectionName{.storage_id = data.getStorageID(), .projection_name = storage_snapshot->projection->name});
}
ProfileEvents::increment(ProfileEvents::SelectedParts, result.selected_parts);

View File

@ -171,7 +171,7 @@ public:
AnalysisResultPtr selectRangesToRead(bool find_exact_ranges = false) const;
StorageMetadataPtr getStorageMetadata() const { return metadata_for_reading; }
StorageMetadataPtr getStorageMetadata() const { return storage_snapshot->metadata; }
/// Returns `false` if requested reading cannot be performed.
bool requestReadingInOrder(size_t prefix_size, int direction, size_t limit);
@ -216,8 +216,6 @@ private:
const MergeTreeData & data;
ExpressionActionsSettings actions_settings;
StorageMetadataPtr metadata_for_reading;
const MergeTreeReadTask::BlockSizeParams block_size;
size_t requested_num_streams;

View File

@ -4,6 +4,8 @@
#include <QueryPipeline/StreamLocalLimits.h>
#include <Processors/Transforms/AggregatingTransform.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Common/Exception.h>
#include <Common/Logger.h>
namespace DB
{
@ -202,9 +204,16 @@ std::optional<Chunk> RemoteSource::tryGenerate()
return chunk;
}
void RemoteSource::onCancel()
void RemoteSource::onCancel() noexcept
{
query_executor->cancel();
try
{
query_executor->cancel();
}
catch (...)
{
tryLogCurrentException(getLogger("RemoteSource"), "Error occurs on cancellation.");
}
}
void RemoteSource::onUpdatePorts()

View File

@ -38,7 +38,7 @@ public:
protected:
std::optional<Chunk> tryGenerate() override;
void onCancel() override;
void onCancel() noexcept override;
private:
bool was_query_sent = false;

View File

@ -367,7 +367,7 @@ public:
return prepareTwoLevel();
}
void onCancel() override
void onCancel() noexcept override
{
shared_data->is_cancelled.store(true, std::memory_order_seq_cst);
}

View File

@ -52,7 +52,7 @@ protected:
virtual void onConsume(Chunk chunk) = 0;
virtual GenerateResult onGenerate() = 0;
virtual void onFinish() {}
virtual void onException(std::exception_ptr /* exception */) {}
virtual void onException(std::exception_ptr /* exception */) { }
public:
ExceptionKeepingTransform(const Block & in_header, const Block & out_header, bool ignore_on_start_and_finish_ = true);

View File

@ -558,7 +558,9 @@ void TCPHandler::runImpl()
std::scoped_lock lock(out_mutex, task_callback_mutex);
if (getQueryCancellationStatus() == CancellationStatus::FULLY_CANCELLED)
{
return true;
}
sendProgress();
sendSelectProfileEvents();

View File

@ -19,6 +19,7 @@
#include <Interpreters/inplaceBlockConversions.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Interpreters/parseColumnsListForTableFunction.h>
#include <Storages/StorageView.h>
#include <Parsers/ASTAlterQuery.h>
#include <Parsers/ASTColumnDeclaration.h>
@ -1316,6 +1317,8 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Data type have to be specified for column {} to add", backQuote(column_name));
validateDataType(command.data_type, DataTypeValidationSettings(context->getSettingsRef()));
/// FIXME: Adding a new column of type Object(JSON) is broken.
/// Looks like there is something around default expression for this column (method `getDefault` is not implemented for the data type Object).
/// But after ALTER TABLE ADD COLUMN we need to fill existing rows with something (exactly the default value).
@ -1395,6 +1398,8 @@ void AlterCommands::validate(const StoragePtr & table, ContextPtr context) const
/// So we don't allow to do it for now.
if (command.data_type)
{
validateDataType(command.data_type, DataTypeValidationSettings(context->getSettingsRef()));
const GetColumnsOptions options(GetColumnsOptions::All);
const auto old_data_type = all_columns.getColumn(options, column_name).type;

View File

@ -607,7 +607,7 @@ void DistributedSink::onFinish()
}
}
void DistributedSink::onCancel()
void DistributedSink::onCancel() noexcept
{
std::lock_guard lock(execution_mutex);
if (pool && !pool->finished())
@ -618,14 +618,26 @@ void DistributedSink::onCancel()
}
catch (...)
{
tryLogCurrentException(storage.log);
tryLogCurrentException(storage.log, "Error occurs on cancellation.");
}
}
for (auto & shard_jobs : per_shard_jobs)
{
for (JobReplica & job : shard_jobs.replicas_jobs)
if (job.executor)
job.executor->cancel();
{
try
{
if (job.executor)
job.executor->cancel();
}
catch (...)
{
tryLogCurrentException(storage.log, "Error occurs on cancellation.");
}
}
}
}

View File

@ -53,7 +53,7 @@ public:
void onFinish() override;
private:
void onCancel() override;
void onCancel() noexcept override;
IColumn::Selector createSelector(const Block & source_block) const;

View File

@ -54,7 +54,7 @@ public:
String getName() const override { return "LiveViewEventsSource"; }
void onCancel() override
void onCancel() noexcept override
{
if (storage->shutdown_called)
return;

View File

@ -36,7 +36,7 @@ public:
String getName() const override { return "LiveViewSource"; }
void onCancel() override
void onCancel() noexcept override
{
if (storage->shutdown_called)
return;

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/MergeTreeVirtualColumns.h>
#include <Storages/MergeTree/LoadedMergeTreeDataPartInfoForReader.h>
#include <DataTypes/NestedUtils.h>
#include <DataTypes/DataTypeNested.h>
#include <Common/escapeForFileName.h>
#include <Compression/CachedCompressedReadBuffer.h>
#include <Columns/ColumnArray.h>
@ -43,6 +44,7 @@ IMergeTreeReader::IMergeTreeReader(
, alter_conversions(data_part_info_for_read->getAlterConversions())
/// For wide parts convert plain arrays of Nested to subcolumns
/// to allow to use shared offset column from cache.
, original_requested_columns(columns_)
, requested_columns(data_part_info_for_read->isWidePart()
? Nested::convertToSubcolumns(columns_)
: columns_)
@ -75,7 +77,7 @@ void IMergeTreeReader::fillVirtualColumns(Columns & columns, size_t rows) const
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Filling of virtual columns is supported only for LoadedMergeTreeDataPartInfoForReader");
const auto & data_part = loaded_part_info->getDataPart();
const auto & storage_columns = storage_snapshot->getMetadataForQuery()->getColumns();
const auto & storage_columns = storage_snapshot->metadata->getColumns();
const auto & virtual_columns = storage_snapshot->virtual_columns;
auto it = requested_columns.begin();
@ -138,25 +140,33 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
{
try
{
size_t num_columns = requested_columns.size();
size_t num_columns = original_requested_columns.size();
if (res_columns.size() != num_columns)
throw Exception(ErrorCodes::LOGICAL_ERROR, "invalid number of columns passed to MergeTreeReader::fillMissingColumns. "
"Expected {}, got {}", num_columns, res_columns.size());
/// Convert columns list to block.
/// TODO: rewrite with columns interface. It will be possible after changes in ExpressionActions.
auto name_and_type = requested_columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
{
if (res_columns[pos] == nullptr)
continue;
NameSet full_requested_columns_set;
NamesAndTypesList full_requested_columns;
additional_columns.insert({res_columns[pos], name_and_type->type, name_and_type->name});
/// Convert columns list to block. And convert subcolumns to full columns.
/// Defaults should be executed on full columns to get correct values for subcolumns.
/// TODO: rewrite with columns interface. It will be possible after changes in ExpressionActions.
auto it = original_requested_columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++it)
{
auto name_in_storage = it->getNameInStorage();
if (full_requested_columns_set.emplace(name_in_storage).second)
full_requested_columns.emplace_back(name_in_storage, it->getTypeInStorage());
if (res_columns[pos])
additional_columns.insert({res_columns[pos], it->type, it->name});
}
auto dag = DB::evaluateMissingDefaults(
additional_columns, requested_columns,
additional_columns, full_requested_columns,
storage_snapshot->metadata->getColumns(),
data_part_info_for_read->getContext());
@ -170,9 +180,18 @@ void IMergeTreeReader::evaluateMissingDefaults(Block additional_columns, Columns
}
/// Move columns from block.
name_and_type = requested_columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++name_and_type)
res_columns[pos] = std::move(additional_columns.getByName(name_and_type->name).column);
it = original_requested_columns.begin();
for (size_t pos = 0; pos < num_columns; ++pos, ++it)
{
auto name_in_storage = it->getNameInStorage();
res_columns[pos] = additional_columns.getByName(name_in_storage).column;
if (it->isSubcolumn())
{
const auto & type_in_storage = it->getTypeInStorage();
res_columns[pos] = type_in_storage->getSubcolumn(it->getSubcolumnName(), res_columns[pos]);
}
}
}
catch (Exception & e)
{
@ -192,7 +211,12 @@ bool IMergeTreeReader::isSubcolumnOffsetsOfNested(const String & name_in_storage
if (!data_part_info_for_read->isWidePart() || subcolumn_name != "size0")
return false;
return Nested::isSubcolumnOfNested(name_in_storage, part_columns);
auto split = Nested::splitName(name_in_storage);
if (split.second.empty())
return false;
auto nested_column = part_columns.tryGetColumn(GetColumnsOptions::All, split.first);
return nested_column && isNested(nested_column->type);
}
String IMergeTreeReader::getColumnNameInPart(const NameAndTypePair & required_column) const

View File

@ -112,6 +112,9 @@ protected:
private:
/// Columns that are requested to read.
NamesAndTypesList original_requested_columns;
/// The same as above but with converted Arrays to subcolumns of Nested.
NamesAndTypesList requested_columns;
/// Actual columns description in part.

View File

@ -797,6 +797,16 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
}
const auto mode = global_ctx->data->getSettings()->deduplicate_merge_projection_mode;
/// Under throw mode, we still choose to drop projections due to backward compatibility since some
/// users might have projections before this change.
if (global_ctx->data->merging_params.mode != MergeTreeData::MergingParams::Ordinary
&& (mode == DeduplicateMergeProjectionMode::THROW || mode == DeduplicateMergeProjectionMode::DROP))
{
ctx->projections_iterator = ctx->tasks_for_projections.begin();
return false;
}
const auto & projections = global_ctx->metadata_snapshot->getProjections();
for (const auto & projection : projections)

View File

@ -71,8 +71,7 @@ bool injectRequiredColumnsRecursively(
/// Column doesn't have default value and don't exist in part
/// don't need to add to required set.
auto metadata_snapshot = storage_snapshot->getMetadataForQuery();
const auto column_default = metadata_snapshot->getColumns().getDefault(column_name);
const auto column_default = storage_snapshot->metadata->getColumns().getDefault(column_name);
if (!column_default)
return false;

View File

@ -3213,6 +3213,17 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, Context
queryToString(mutation_commands.ast()));
}
/// Block the case of alter table add projection for special merge trees.
if (std::any_of(commands.begin(), commands.end(), [](const AlterCommand & c) { return c.type == AlterCommand::ADD_PROJECTION; }))
{
if (merging_params.mode != MergingParams::Mode::Ordinary
&& settings_from_storage->deduplicate_merge_projection_mode == DeduplicateMergeProjectionMode::THROW)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Projection is fully supported in {} with deduplicate_merge_projection_mode = throw. "
"Use 'drop' or 'rebuild' option of deduplicate_merge_projection_mode.",
getName());
}
commands.apply(new_metadata, local_context);
if (AlterCommands::hasFullTextIndex(new_metadata) && !settings.allow_experimental_full_text_index)
@ -7141,12 +7152,7 @@ UInt64 MergeTreeData::estimateNumberOfRowsToRead(
MergeTreeDataSelectExecutor reader(*this);
auto result_ptr = reader.estimateNumMarksToRead(
parts,
storage_snapshot->getMetadataForQuery()->getColumns().getAll().getNames(),
storage_snapshot->metadata,
query_info,
query_context,
query_context->getSettingsRef().max_threads);
parts, {}, storage_snapshot->metadata, query_info, query_context, query_context->getSettingsRef().max_threads);
UInt64 total_rows = result_ptr->selected_rows;
if (query_info.trivial_limit > 0 && query_info.trivial_limit < total_rows)

View File

@ -60,39 +60,25 @@ void MergeTreeReaderCompact::fillColumnPositions()
for (size_t i = 0; i < columns_num; ++i)
{
const auto & column_to_read = columns_to_read[i];
auto & column_to_read = columns_to_read[i];
auto position = data_part_info_for_read->getColumnPosition(column_to_read.getNameInStorage());
bool is_array = isArray(column_to_read.type);
if (column_to_read.isSubcolumn())
{
auto storage_column_from_part = getColumnInPart(
{column_to_read.getNameInStorage(), column_to_read.getTypeInStorage()});
NameAndTypePair column_in_storage{column_to_read.getNameInStorage(), column_to_read.getTypeInStorage()};
auto storage_column_from_part = getColumnInPart(column_in_storage);
auto subcolumn_name = column_to_read.getSubcolumnName();
if (!storage_column_from_part.type->hasSubcolumn(subcolumn_name))
position.reset();
}
column_positions[i] = std::move(position);
/// If array of Nested column is missing in part,
/// we have to read its offsets if they exist.
if (!position && is_array)
{
auto column_to_read_with_subcolumns = getColumnConvertedToSubcolumnOfNested(column_to_read);
auto name_level_for_offsets = findColumnForOffsets(column_to_read_with_subcolumns);
if (name_level_for_offsets.has_value())
{
column_positions[i] = data_part_info_for_read->getColumnPosition(name_level_for_offsets->first);
columns_for_offsets[i] = name_level_for_offsets;
partially_read_columns.insert(column_to_read.name);
}
}
else
{
column_positions[i] = std::move(position);
}
if (!column_positions[i])
findPositionForMissedNested(i);
}
}
@ -115,7 +101,7 @@ NameAndTypePair MergeTreeReaderCompact::getColumnConvertedToSubcolumnOfNested(co
if (!storage_columns_with_collected_nested)
{
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withExtendedObjects();
auto options = GetColumnsOptions(GetColumnsOptions::All).withExtendedObjects();
auto storage_columns_list = Nested::collect(storage_snapshot->getColumns(options));
storage_columns_with_collected_nested = ColumnsDescription(std::move(storage_columns_list));
}
@ -125,11 +111,44 @@ NameAndTypePair MergeTreeReaderCompact::getColumnConvertedToSubcolumnOfNested(co
Nested::concatenateName(name_in_storage, subcolumn_name));
}
void MergeTreeReaderCompact::findPositionForMissedNested(size_t pos)
{
auto & column = columns_to_read[pos];
bool is_array = isArray(column.type);
bool is_offsets_subcolumn = isArray(column.getTypeInStorage()) && column.getSubcolumnName() == "size0";
if (!is_array && !is_offsets_subcolumn)
return;
NameAndTypePair column_in_storage{column.getNameInStorage(), column.getTypeInStorage()};
auto column_to_read_with_subcolumns = getColumnConvertedToSubcolumnOfNested(column_in_storage);
auto name_level_for_offsets = findColumnForOffsets(column_to_read_with_subcolumns);
if (!name_level_for_offsets)
return;
column_positions[pos] = data_part_info_for_read->getColumnPosition(name_level_for_offsets->first);
if (is_offsets_subcolumn)
{
/// Read offsets from antoher array from the same Nested column.
column = {name_level_for_offsets->first, column.getSubcolumnName(), column.getTypeInStorage(), column.type};
}
else
{
columns_for_offsets[pos] = std::move(name_level_for_offsets);
partially_read_columns.insert(column.name);
}
}
void MergeTreeReaderCompact::readData(
const NameAndTypePair & name_and_type,
ColumnPtr & column,
size_t rows_to_read,
const InputStreamGetter & getter)
const InputStreamGetter & getter,
ISerialization::SubstreamsCache & cache)
{
try
{
@ -140,6 +159,13 @@ void MergeTreeReaderCompact::readData(
deserialize_settings.getter = getter;
deserialize_settings.avg_value_size_hint = avg_value_size_hints[name];
auto it = cache.find(name);
if (it != cache.end() && it->second != nullptr)
{
column = it->second;
return;
}
if (name_and_type.isSubcolumn())
{
const auto & type_in_storage = name_and_type.getTypeInStorage();
@ -163,6 +189,8 @@ void MergeTreeReaderCompact::readData(
serialization->deserializeBinaryBulkWithMultipleStreams(column, rows_to_read, deserialize_settings, deserialize_binary_bulk_state_map[name], nullptr);
}
cache[name] = column;
size_t read_rows_in_column = column->size() - column_size_before_reading;
if (read_rows_in_column != rows_to_read)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA,

View File

@ -36,6 +36,7 @@ public:
protected:
void fillColumnPositions();
NameAndTypePair getColumnConvertedToSubcolumnOfNested(const NameAndTypePair & column);
void findPositionForMissedNested(size_t pos);
using InputStreamGetter = ISerialization::InputStreamGetter;
@ -43,7 +44,8 @@ protected:
const NameAndTypePair & name_and_type,
ColumnPtr & column,
size_t rows_to_read,
const InputStreamGetter & getter);
const InputStreamGetter & getter,
ISerialization::SubstreamsCache & cache);
void readPrefix(
const NameAndTypePair & name_and_type,

View File

@ -26,6 +26,10 @@ try
{
size_t rows_to_read = data_part_info_for_read->getIndexGranularity().getMarkRows(from_mark);
/// Use cache to avoid reading the column with the same name twice.
/// It may happen if there are empty array Nested in the part.
ISerialization::SubstreamsCache cache;
for (size_t pos = 0; pos < num_columns; ++pos)
{
if (!res_columns[pos])
@ -52,7 +56,7 @@ try
};
readPrefix(columns_to_read[pos], buffer_getter, buffer_getter_for_prefix, columns_for_offsets[pos]);
readData(columns_to_read[pos], column, rows_to_read, buffer_getter);
readData(columns_to_read[pos], column, rows_to_read, buffer_getter, cache);
}
++from_mark;

View File

@ -48,7 +48,7 @@ public:
ChunkAndProgress read();
void cancel() { is_cancelled = true; }
void cancel() noexcept { is_cancelled = true; }
const MergeTreeReaderSettings & getSettings() const { return reader_settings; }

View File

@ -215,6 +215,7 @@ struct Settings;
M(Float, primary_key_ratio_of_unique_prefix_values_to_skip_suffix_columns, 0.9f, "If the value of a column of the primary key in data part changes at least in this ratio of times, skip loading next columns in memory. This allows to save memory usage by not loading useless columns of the primary key.", 0) \
/** Projection settings. */ \
M(UInt64, max_projections, 25, "The maximum number of merge tree projections.", 0) \
M(DeduplicateMergeProjectionMode, deduplicate_merge_projection_mode, DeduplicateMergeProjectionMode::THROW, "Whether to allow create projection for the table with non-classic MergeTree, if allowed, what is the action when merge, drop or rebuild.", 0) \
#define MAKE_OBSOLETE_MERGE_TREE_SETTING(M, TYPE, NAME, DEFAULT) \
M(TYPE, NAME, DEFAULT, "Obsolete setting, does nothing.", BaseSettingsHelpers::Flags::OBSOLETE)

View File

@ -34,7 +34,18 @@ struct MergeTreeSink::DelayedChunk
};
MergeTreeSink::~MergeTreeSink() = default;
MergeTreeSink::~MergeTreeSink()
{
if (!delayed_chunk)
return;
for (auto & partition : delayed_chunk->partitions)
{
partition.temp_part.cancel();
}
delayed_chunk.reset();
}
MergeTreeSink::MergeTreeSink(
StorageMergeTree & storage_,
@ -59,13 +70,10 @@ void MergeTreeSink::onStart()
void MergeTreeSink::onFinish()
{
chassert(!isCancelled());
finishDelayedChunk();
}
void MergeTreeSink::onCancel()
{
}
void MergeTreeSink::consume(Chunk & chunk)
{
if (num_blocks_processed > 0)

View File

@ -28,7 +28,6 @@ public:
void consume(Chunk & chunk) override;
void onStart() override;
void onFinish() override;
void onCancel() override;
private:
StorageMergeTree & storage;

View File

@ -149,7 +149,7 @@ std::string MergeTreeSource::getName() const
return processor->getName();
}
void MergeTreeSource::onCancel()
void MergeTreeSource::onCancel() noexcept
{
processor->cancel();
}

View File

@ -26,7 +26,7 @@ public:
protected:
std::optional<Chunk> tryGenerate() override;
void onCancel() override;
void onCancel() noexcept override;
private:
MergeTreeSelectProcessorPtr processor;

View File

@ -3,8 +3,8 @@
#include <Storages/MergeTree/ReplicatedMergeTreeSink.h>
#include <Storages/MergeTree/InsertBlockInfo.h>
#include <Interpreters/PartLog.h>
#include <Common/Exception.h>
#include <Processors/Transforms/DeduplicationTokenTransforms.h>
#include "Common/Exception.h"
#include <Common/FailPoint.h>
#include <Common/ProfileEventsScope.h>
#include <Common/SipHash.h>
@ -155,7 +155,18 @@ ReplicatedMergeTreeSinkImpl<async_insert>::ReplicatedMergeTreeSinkImpl(
}
template<bool async_insert>
ReplicatedMergeTreeSinkImpl<async_insert>::~ReplicatedMergeTreeSinkImpl() = default;
ReplicatedMergeTreeSinkImpl<async_insert>::~ReplicatedMergeTreeSinkImpl()
{
if (!delayed_chunk)
return;
for (auto & partition : delayed_chunk->partitions)
{
partition.temp_part.cancel();
}
delayed_chunk.reset();
}
template<bool async_insert>
size_t ReplicatedMergeTreeSinkImpl<async_insert>::checkQuorumPrecondition(const ZooKeeperWithFaultInjectionPtr & zookeeper)
@ -1155,8 +1166,9 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::onStart()
template<bool async_insert>
void ReplicatedMergeTreeSinkImpl<async_insert>::onFinish()
{
const auto & settings = context->getSettingsRef();
chassert(!isCancelled());
const auto & settings = context->getSettingsRef();
ZooKeeperWithFaultInjectionPtr zookeeper = ZooKeeperWithFaultInjection::createInstance(
settings.insert_keeper_fault_injection_probability,
settings.insert_keeper_fault_injection_seed,

View File

@ -34,6 +34,7 @@ namespace ErrorCodes
extern const int UNKNOWN_STORAGE;
extern const int NO_REPLICA_NAME_GIVEN;
extern const int CANNOT_EXTRACT_TABLE_STRUCTURE;
extern const int SUPPORT_IS_DISABLED;
}
@ -829,6 +830,18 @@ static StoragePtr create(const StorageFactory::Arguments & args)
"Floating point partition key is not supported: {}", metadata.partition_key.column_names[i]);
}
if (metadata.hasProjections() && args.mode == LoadingStrictnessLevel::CREATE)
{
/// Now let's handle the merge tree family. Note we only handle in the mode of CREATE due to backward compatibility.
/// Otherwise, it would fail to start in the case of existing projections with special mergetree.
if (merging_params.mode != MergeTreeData::MergingParams::Mode::Ordinary
&& storage_settings->deduplicate_merge_projection_mode == DeduplicateMergeProjectionMode::THROW)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Projection is fully supported in {}MergeTree with deduplicate_merge_projection_mode = throw. "
"Use 'drop' or 'rebuild' option of deduplicate_merge_projection_mode.",
merging_params.getModeName());
}
if (arg_num != arg_cnt)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Wrong number of engine arguments.");

View File

@ -1,6 +1,7 @@
#include <Storages/MessageQueueSink.h>
#include <Formats/FormatFactory.h>
#include <Processors/Formats/IRowOutputFormat.h>
#include <Common/Exception.h>
#include <Common/logger_useful.h>
namespace DB
@ -79,4 +80,16 @@ void MessageQueueSink::consume(Chunk & chunk)
}
void MessageQueueSink::onCancel() noexcept
{
try
{
onFinish();
}
catch (...)
{
tryLogCurrentException(getLogger("MessageQueueSink"), "Error occurs on cancellation.");
}
}
}

View File

@ -39,7 +39,7 @@ public:
void onStart() override;
void onFinish() override;
void onCancel() override { onFinish(); }
void onCancel() noexcept override;
void onException(std::exception_ptr /* exception */) override { onFinish(); }
protected:

View File

@ -4,6 +4,7 @@
#include <Common/isValidUTF8.h>
#include <Core/Settings.h>
#include <Storages/ObjectStorage/Utils.h>
#include <base/defines.h>
namespace DB
{
@ -42,31 +43,16 @@ StorageObjectStorageSink::StorageObjectStorageSink(
void StorageObjectStorageSink::consume(Chunk & chunk)
{
std::lock_guard lock(cancel_mutex);
if (cancelled)
if (isCancelled())
return;
writer->write(getHeader().cloneWithColumns(chunk.getColumns()));
}
void StorageObjectStorageSink::onCancel()
{
std::lock_guard lock(cancel_mutex);
cancelBuffers();
releaseBuffers();
cancelled = true;
}
void StorageObjectStorageSink::onException(std::exception_ptr)
{
std::lock_guard lock(cancel_mutex);
cancelBuffers();
releaseBuffers();
}
void StorageObjectStorageSink::onFinish()
{
std::lock_guard lock(cancel_mutex);
chassert(!isCancelled());
finalizeBuffers();
releaseBuffers();
}
void StorageObjectStorageSink::finalizeBuffers()
@ -120,6 +106,12 @@ PartitionedStorageObjectStorageSink::PartitionedStorageObjectStorageSink(
{
}
StorageObjectStorageSink::~StorageObjectStorageSink()
{
if (isCancelled())
cancelBuffers();
}
SinkPtr PartitionedStorageObjectStorageSink::createSinkForPartition(const String & partition_id)
{
auto partition_bucket = replaceWildcards(configuration->getNamespace(), partition_id);

View File

@ -18,22 +18,18 @@ public:
ContextPtr context,
const std::string & blob_path = "");
~StorageObjectStorageSink() override;
String getName() const override { return "StorageObjectStorageSink"; }
void consume(Chunk & chunk) override;
void onCancel() override;
void onException(std::exception_ptr exception) override;
void onFinish() override;
private:
const Block sample_block;
std::unique_ptr<WriteBuffer> write_buf;
OutputFormatPtr writer;
bool cancelled = false;
std::mutex cancel_mutex;
void finalizeBuffers();
void releaseBuffers();

View File

@ -1,3 +1,7 @@
#include <Storages/StorageBuffer.h>
#include <Analyzer/TableNode.h>
#include <Analyzer/Utils.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
@ -23,7 +27,6 @@
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Storages/AlterCommands.h>
#include <Storages/StorageBuffer.h>
#include <Storages/StorageFactory.h>
#include <Storages/StorageValues.h>
#include <Storages/checkAndGetLiteralArgument.h>
@ -232,6 +235,12 @@ QueryProcessingStage::Enum StorageBuffer::getQueryProcessingStage(
return QueryProcessingStage::FetchColumns;
}
bool StorageBuffer::isRemote() const
{
auto destination = getDestinationTable();
return destination && destination->isRemote();
}
void StorageBuffer::read(
QueryPlan & query_plan,
const Names & column_names,
@ -242,6 +251,29 @@ void StorageBuffer::read(
size_t max_block_size,
size_t num_streams)
{
bool allow_experimental_analyzer = local_context->getSettingsRef().allow_experimental_analyzer;
if (allow_experimental_analyzer && processed_stage > QueryProcessingStage::FetchColumns)
{
/** For query processing stages after FetchColumns, we do not allow using the same table more than once in the query.
* For example: SELECT * FROM buffer t1 JOIN buffer t2 USING (column)
* In that case, we will execute this query separately for the destination table and for the buffer, resulting in incorrect results.
*/
const auto & current_storage_id = getStorageID();
auto table_nodes = extractAllTableReferences(query_info.query_tree);
size_t count_of_current_storage = 0;
for (const auto & node : table_nodes)
{
const auto & table_node = node->as<TableNode &>();
if (table_node.getStorageID().getFullNameNotQuoted() == current_storage_id.getFullNameNotQuoted())
{
count_of_current_storage++;
if (count_of_current_storage > 1)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "StorageBuffer over Distributed does not support using the same table more than once in the query");
}
}
}
const auto & metadata_snapshot = storage_snapshot->metadata;
if (auto destination = getDestinationTable())
@ -328,13 +360,17 @@ void StorageBuffer::read(
}
}
src_table_query_info.merge_storage_snapshot = storage_snapshot;
destination->read(
query_plan, columns_intersection, destination_snapshot, src_table_query_info,
local_context, processed_stage, max_block_size, num_streams);
if (query_plan.isInitialized())
if (query_plan.isInitialized() && processed_stage <= QueryProcessingStage::FetchColumns)
{
/** The code below converts columns from metadata_snapshot to columns from destination_metadata_snapshot.
* This conversion is not applicable for processed_stage > FetchColumns.
* Instead, we rely on the converting actions at the end of this function.
*/
auto actions = addMissingDefaults(
query_plan.getCurrentDataStream().header,
header_after_adding_defaults.getNamesAndTypesList(),
@ -397,7 +433,7 @@ void StorageBuffer::read(
/// TODO: Find a way to support projections for StorageBuffer
if (processed_stage > QueryProcessingStage::FetchColumns)
{
if (local_context->getSettingsRef().allow_experimental_analyzer)
if (allow_experimental_analyzer)
{
auto storage = std::make_shared<StorageValues>(
getStorageID(),

View File

@ -84,6 +84,7 @@ public:
QueryProcessingStage::Enum processed_stage,
size_t max_block_size,
size_t num_streams) override;
bool isRemote() const override;
bool supportsParallelInsert() const override { return true; }

View File

@ -308,7 +308,8 @@ StorageDistributed::StorageDistributed(
const DistributedSettings & distributed_settings_,
LoadingStrictnessLevel mode,
ClusterPtr owned_cluster_,
ASTPtr remote_table_function_ptr_)
ASTPtr remote_table_function_ptr_,
bool is_remote_function_)
: IStorage(id_)
, WithContext(context_->getGlobalContext())
, remote_database(remote_database_)
@ -322,6 +323,7 @@ StorageDistributed::StorageDistributed(
, relative_data_path(relative_data_path_)
, distributed_settings(distributed_settings_)
, rng(randomSeed())
, is_remote_function(is_remote_function_)
{
if (!distributed_settings.flush_on_detach && distributed_settings.background_insert_batch)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Settings flush_on_detach=0 and background_insert_batch=1 are incompatible");
@ -373,38 +375,6 @@ StorageDistributed::StorageDistributed(
}
StorageDistributed::StorageDistributed(
const StorageID & id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
ContextPtr context_,
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
LoadingStrictnessLevel mode,
ClusterPtr owned_cluster_)
: StorageDistributed(
id_,
columns_,
constraints_,
String{},
String{},
String{},
cluster_name_,
context_,
sharding_key_,
storage_policy_name_,
relative_data_path_,
distributed_settings_,
mode,
std::move(owned_cluster_),
remote_table_function_ptr_)
{
}
QueryProcessingStage::Enum StorageDistributed::getQueryProcessingStage(
ContextPtr local_context,
QueryProcessingStage::Enum to_stage,
@ -901,7 +871,7 @@ void StorageDistributed::read(
sharding_key_column_name,
distributed_settings,
shard_filter_generator,
/* is_remote_function= */ static_cast<bool>(owned_cluster));
is_remote_function);
/// This is a bug, it is possible only when there is no shards to query, and this is handled earlier.
if (!query_plan.isInitialized())

View File

@ -61,21 +61,8 @@ public:
const DistributedSettings & distributed_settings_,
LoadingStrictnessLevel mode,
ClusterPtr owned_cluster_ = {},
ASTPtr remote_table_function_ptr_ = {});
StorageDistributed(
const StorageID & id_,
const ColumnsDescription & columns_,
const ConstraintsDescription & constraints_,
ASTPtr remote_table_function_ptr_,
const String & cluster_name_,
ContextPtr context_,
const ASTPtr & sharding_key_,
const String & storage_policy_name_,
const String & relative_data_path_,
const DistributedSettings & distributed_settings_,
LoadingStrictnessLevel mode,
ClusterPtr owned_cluster_ = {});
ASTPtr remote_table_function_ptr_ = {},
bool is_remote_function_ = false);
~StorageDistributed() override;
@ -287,6 +274,8 @@ private:
// For random shard index generation
mutable std::mutex rng_mutex;
pcg64 rng;
bool is_remote_function;
};
}

View File

@ -53,6 +53,7 @@
#include <Common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <Common/re2.h>
#include "base/defines.h"
#include <Core/Settings.h>
@ -1767,6 +1768,12 @@ public:
initialize();
}
~StorageFileSink() override
{
if (isCancelled())
cancelBuffers();
}
void initialize()
{
std::unique_ptr<WriteBufferFromFileDescriptor> naked_buffer;
@ -1800,37 +1807,14 @@ public:
void consume(Chunk & chunk) override
{
std::lock_guard cancel_lock(cancel_mutex);
if (cancelled)
if (isCancelled())
return;
writer->write(getHeader().cloneWithColumns(chunk.getColumns()));
}
void onCancel() override
{
std::lock_guard cancel_lock(cancel_mutex);
cancelBuffers();
releaseBuffers();
cancelled = true;
}
void onException(std::exception_ptr exception) override
{
std::lock_guard cancel_lock(cancel_mutex);
try
{
std::rethrow_exception(exception);
}
catch (...)
{
/// An exception context is needed to proper delete write buffers without finalization
releaseBuffers();
}
}
void onFinish() override
{
std::lock_guard cancel_lock(cancel_mutex);
chassert(!isCancelled());
finalizeBuffers();
}
@ -1885,9 +1869,6 @@ private:
int flags;
std::unique_lock<std::shared_timed_mutex> lock;
std::mutex cancel_mutex;
bool cancelled = false;
};
class PartitionedStorageFileSink : public PartitionedSink

View File

@ -602,7 +602,7 @@ std::vector<ReadFromMerge::ChildPlan> ReadFromMerge::createChildrenPlans(SelectQ
ASTPtr required_columns_expr_list = std::make_shared<ASTExpressionList>();
ASTPtr column_expr;
auto sample_block = merge_storage_snapshot->getMetadataForQuery()->getSampleBlock();
auto sample_block = merge_storage_snapshot->metadata->getSampleBlock();
for (const auto & column : real_column_names)
{

View File

@ -1568,10 +1568,12 @@ bool StorageMergeTree::optimize(
{
assertNotReadonly();
if (deduplicate && getInMemoryMetadataPtr()->hasProjections())
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
if (deduplicate && getInMemoryMetadataPtr()->hasProjections()
&& getSettings()->deduplicate_merge_projection_mode == DeduplicateMergeProjectionMode::THROW)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"OPTIMIZE DEDUPLICATE query is not supported for table {} as it has projections. "
"User should drop all the projections manually before running the query",
"User should drop all the projections manually before running the query, "
"or consider drop or rebuild option of deduplicate_merge_projection_mode",
getStorageID().getTableName());
if (deduplicate)

View File

@ -5794,10 +5794,12 @@ bool StorageReplicatedMergeTree::optimize(
if (!is_leader)
throw Exception(ErrorCodes::NOT_A_LEADER, "OPTIMIZE cannot be done on this replica because it is not a leader");
if (deduplicate && getInMemoryMetadataPtr()->hasProjections())
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
if (deduplicate && getInMemoryMetadataPtr()->hasProjections()
&& getSettings()->deduplicate_merge_projection_mode == DeduplicateMergeProjectionMode::THROW)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"OPTIMIZE DEDUPLICATE query is not supported for table {} as it has projections. "
"User should drop all the projections manually before running the query",
"User should drop all the projections manually before running the query, "
"or consider drop or rebuild option of deduplicate_merge_projection_mode",
getStorageID().getTableName());
if (cleanup)

View File

@ -63,7 +63,6 @@ std::shared_ptr<StorageSnapshot> StorageSnapshot::clone(DataPtr data_) const
{
auto res = std::make_shared<StorageSnapshot>(storage, metadata, object_columns);
res->projection = projection;
res->data = std::move(data_);
return res;
@ -79,7 +78,7 @@ ColumnsDescription StorageSnapshot::getAllColumnsDescription() const
NamesAndTypesList StorageSnapshot::getColumns(const GetColumnsOptions & options) const
{
auto all_columns = getMetadataForQuery()->getColumns().get(options);
auto all_columns = metadata->getColumns().get(options);
if (options.with_extended_objects)
extendObjectColumns(all_columns, object_columns, options.with_subcolumns);
@ -113,7 +112,7 @@ NamesAndTypesList StorageSnapshot::getColumnsByNames(const GetColumnsOptions & o
std::optional<NameAndTypePair> StorageSnapshot::tryGetColumn(const GetColumnsOptions & options, const String & column_name) const
{
const auto & columns = getMetadataForQuery()->getColumns();
const auto & columns = metadata->getColumns();
auto column = columns.tryGetColumn(options, column_name);
if (column && (!column->type->hasDynamicSubcolumnsDeprecated() || !options.with_extended_objects))
return column;
@ -189,7 +188,7 @@ Block StorageSnapshot::getSampleBlockForColumns(const Names & column_names) cons
{
Block res;
const auto & columns = getMetadataForQuery()->getColumns();
const auto & columns = metadata->getColumns();
for (const auto & column_name : column_names)
{
auto column = columns.tryGetColumnOrSubcolumn(GetColumnsOptions::All, column_name);
@ -221,7 +220,7 @@ Block StorageSnapshot::getSampleBlockForColumns(const Names & column_names) cons
ColumnsDescription StorageSnapshot::getDescriptionForColumns(const Names & column_names) const
{
ColumnsDescription res;
const auto & columns = getMetadataForQuery()->getColumns();
const auto & columns = metadata->getColumns();
for (const auto & name : column_names)
{
auto column = columns.tryGetColumnOrSubcolumnDescription(GetColumnsOptions::All, name);
@ -257,7 +256,7 @@ namespace
void StorageSnapshot::check(const Names & column_names) const
{
const auto & columns = getMetadataForQuery()->getColumns();
const auto & columns = metadata->getColumns();
auto options = GetColumnsOptions(GetColumnsOptions::AllPhysical).withSubcolumns();
if (column_names.empty())

View File

@ -30,9 +30,6 @@ struct StorageSnapshot
using DataPtr = std::unique_ptr<Data>;
DataPtr data;
/// Projection that is used in query.
mutable const ProjectionDescription * projection = nullptr;
StorageSnapshot(
const IStorage & storage_,
StorageMetadataPtr metadata_);
@ -82,11 +79,6 @@ struct StorageSnapshot
void check(const Names & column_names) const;
DataTypePtr getConcreteType(const String & column_name) const;
void addProjection(const ProjectionDescription * projection_) const { projection = projection_; }
/// If we have a projection then we should use its metadata.
StorageMetadataPtr getMetadataForQuery() const { return projection ? projection->metadata : metadata; }
};
using StorageSnapshotPtr = std::shared_ptr<StorageSnapshot>;

Some files were not shown because too many files have changed in this diff Show More