merge master

This commit is contained in:
Ivan Blinkov 2018-03-02 22:25:56 +03:00
commit 00524335a4
465 changed files with 10412 additions and 5224 deletions

1
.github/PULL_REQUEST_TEMPLATE.md vendored Normal file
View File

@ -0,0 +1 @@
I hereby agree to the terms of the CLA available at: https://yandex.ru/legal/cla/?lang=en

3
.gitmodules vendored
View File

@ -31,3 +31,6 @@
[submodule "contrib/re2"]
path = contrib/re2
url = https://github.com/google/re2.git
[submodule "contrib/ssl"]
path = contrib/ssl
url = https://github.com/ClickHouse-Extras/ssl.git

View File

@ -36,7 +36,7 @@ matrix:
sources:
- ubuntu-toolchain-r-test
- llvm-toolchain-trusty-5.0
packages: [ g++-7, clang-5.0, libicu-dev, libreadline-dev, libmysqlclient-dev, unixodbc-dev, libltdl-dev, libssl-dev, libboost-dev, zlib1g-dev, libdouble-conversion-dev, libzookeeper-mt-dev, libsparsehash-dev, librdkafka-dev, libcapnp-dev, libsparsehash-dev, libgoogle-perftools-dev, bash, expect, python, python-lxml, python-termcolor, curl, perl, sudo ]
packages: [ g++-7, clang-5.0, lld-5.0, libicu-dev, libreadline-dev, libmysqlclient-dev, unixodbc-dev, libltdl-dev, libssl-dev, libboost-dev, zlib1g-dev, libdouble-conversion-dev, libzookeeper-mt-dev, libsparsehash-dev, librdkafka-dev, libcapnp-dev, libsparsehash-dev, libgoogle-perftools-dev, bash, expect, python, python-lxml, python-termcolor, curl, perl, sudo ]
env:
- MATRIX_EVAL="export CC=clang-5.0 && export CXX=clang++-5.0"
@ -118,7 +118,7 @@ matrix:
# packages: [ pbuilder, fakeroot, debhelper ]
#
# env:
# - MATRIX_EVAL="export DEB_CC=clang-6.0 && export DEB_CXX=clang++-6.0 && export DIST=bionic && export EXTRAPACKAGES=clang-6.0"
# - MATRIX_EVAL="export DEB_CC=clang-6.0 && export DEB_CXX=clang++-6.0 && export DIST=bionic && export EXTRAPACKAGES='clang-6.0 lld-6.0'"
#
# script:
# - utils/travis/pbuilder.sh

View File

@ -35,32 +35,16 @@ message (STATUS "CMAKE_BUILD_TYPE: " ${CMAKE_BUILD_TYPE} )
# TSan is not supported due to false positive errors in libstdc++ and necessity to rebuild libstdc++ with TSan
set (CMAKE_CONFIGURATION_TYPES "RelWithDebInfo;Debug;Release;MinSizeRel;ASan;UBSan" CACHE STRING "" FORCE)
if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(aarch64.*|AARCH64.*)")
set (ARCH_AARCH64 1)
endif ()
if (ARCH_AARCH64 OR CMAKE_SYSTEM_PROCESSOR MATCHES "arm")
set (ARCH_ARM 1)
endif ()
if (CMAKE_LIBRARY_ARCHITECTURE MATCHES "i386")
set (ARCH_I386 1)
endif ()
if ( ( ARCH_ARM AND NOT ARCH_AARCH64 ) OR ARCH_I386)
set (ARCH_32 1)
message (WARNING "Support for 32bit platforms is highly experimental")
endif ()
if (CMAKE_SYSTEM MATCHES "Linux")
set (ARCH_LINUX 1)
endif ()
if (CMAKE_SYSTEM MATCHES "FreeBSD")
set (ARCH_FREEBSD 1)
include (cmake/arch.cmake)
if (NOT MSVC)
set (COMMON_WARNING_FLAGS "${COMMON_WARNING_FLAGS} -Wall") # -Werror is also added inside directories with our own code.
endif ()
if (ARCH_FREEBSD)
set (PLATFORM_EXTRA_CXX_FLAG "-DCLOCK_MONOTONIC_COARSE=CLOCK_MONOTONIC_FAST")
if (COMPILER_GCC OR COMPILER_CLANG)
set (CXX_WARNING_FLAGS "${CXX_WARNING_FLAGS} -Wnon-virtual-dtor")
endif ()
set (COMMON_WARNING_FLAGS "${COMMON_WARNING_FLAGS} -Wall") # -Werror is also added inside directories with our own code.
set (CXX_WARNING_FLAGS "${CXX_WARNING_FLAGS} -Wnon-virtual-dtor")
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
# clang: warning: argument unused during compilation: '-stdlib=libc++'
# clang: warning: argument unused during compilation: '-specs=/usr/share/dpkg/no-pie-compile.specs' [-Wunused-command-line-argument]
@ -72,7 +56,7 @@ if (ARCH_LINUX)
endif ()
option (TEST_COVERAGE "Enables flags for test coverage" OFF)
option (ENABLE_TESTS "Enables tests" ON)
option (ENABLE_TESTS "Enables tests" ${NOT_MSVC})
option (USE_STATIC_LIBRARIES "Set to FALSE to use shared libraries" ON)
option (MAKE_STATIC_LIBRARIES "Set to FALSE to make shared libraries" ${USE_STATIC_LIBRARIES})
@ -111,8 +95,10 @@ endif ()
set (COMPILER_FLAGS "${COMPILER_FLAGS} ${CXX11_ABI_FLAGS}")
find_program (LLD_PATH NAMES lld)
find_program (GOLD_PATH NAMES gold)
string(REGEX MATCH "-?[0-9]+(.[0-9]+)?$" COMPILER_POSTFIX ${CMAKE_CXX_COMPILER})
find_program (LLD_PATH NAMES "lld${COMPILER_POSTFIX}" "lld")
find_program (GOLD_PATH NAMES "gold")
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang" AND LLD_PATH AND NOT LINKER_NAME)
set (LINKER_NAME "lld")
@ -121,8 +107,8 @@ elseif (GOLD_PATH)
endif ()
if (LINKER_NAME)
message(STATUS "Using linker: ${LINKER_NAME}")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=${LINKER_NAME}")
message(STATUS "Using linker: ${LINKER_NAME} (selected from: LLD_PATH=${LLD_PATH}; GOLD_PATH=${GOLD_PATH}; COMPILER_POSTFIX=${COMPILER_POSTFIX})")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fuse-ld=${LINKER_NAME}")
endif ()
option (PIPE "-pipe compiler option [less /tmp usage, more ram usage]" ON)
@ -196,29 +182,7 @@ if (NOT MAKE_STATIC_LIBRARIES)
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
endif ()
set (SAN_FLAGS "${SAN_FLAGS} -g -fno-omit-frame-pointer")
if (SAN_DEBUG)
set (SAN_FLAGS "${SAN_FLAGS} -O0")
else ()
set (SAN_FLAGS "${SAN_FLAGS} -O3")
endif ()
set (CMAKE_CXX_FLAGS_ASAN "${CMAKE_CXX_FLAGS_ASAN} ${SAN_FLAGS} -fsanitize=address")
set (CMAKE_C_FLAGS_ASAN "${CMAKE_C_FLAGS_ASAN} ${SAN_FLAGS} -fsanitize=address")
set (CMAKE_CXX_FLAGS_UBSAN "${CMAKE_CXX_FLAGS_UBSAN} ${SAN_FLAGS} -fsanitize=undefined")
set (CMAKE_C_FLAGS_UBSAN "${CMAKE_C_FLAGS_UBSAN} ${SAN_FLAGS} -fsanitize=undefined")
set (CMAKE_CXX_FLAGS_MSAN "${CMAKE_CXX_FLAGS_MSAN} ${SAN_FLAGS} -fsanitize=memory")
set (CMAKE_C_FLAGS_MSAN "${CMAKE_C_FLAGS_MSAN} ${SAN_FLAGS} -fsanitize=memory")
set (CMAKE_CXX_FLAGS_TSAN "${CMAKE_CXX_FLAGS_TSAN} ${SAN_FLAGS} -fsanitize=thread")
set (CMAKE_C_FLAGS_TSAN "${CMAKE_C_FLAGS_TSAN} ${SAN_FLAGS} -fsanitize=thread")
# clang use static linking by default
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS_ASAN "${CMAKE_EXE_LINKER_FLAGS_ASAN} -static-libasan")
set (CMAKE_EXE_LINKER_FLAGS_UBSAN "${CMAKE_EXE_LINKER_FLAGS_UBSAN} -static-libubsan")
set (CMAKE_EXE_LINKER_FLAGS_MSAN "${CMAKE_EXE_LINKER_FLAGS_MSAN} -static-libmsan")
set (CMAKE_EXE_LINKER_FLAGS_TSAN "${CMAKE_EXE_LINKER_FLAGS_TSAN} -static-libtsan")
endif ()
include (cmake/sanitize.cmake)
# Using "include-what-you-use" tool.
option (USE_INCLUDE_WHAT_YOU_USE "Use 'include-what-you-use' tool" OFF)
@ -264,7 +228,7 @@ message (STATUS "Building for: ${CMAKE_SYSTEM} ${CMAKE_SYSTEM_PROCESSOR} ${CMAKE
include(GNUInstallDirs)
include (cmake/find_openssl.cmake)
include (cmake/find_ssl.cmake)
if (NOT OPENSSL_FOUND)
message (FATAL_ERROR "Need openssl for build. debian tip: sudo apt install libssl-dev")
endif ()
@ -307,13 +271,7 @@ include (libs/libcommon/cmake/find_cctz.cmake)
include (libs/libmysqlxx/cmake/find_mysqlclient.cmake)
include (libs/libdaemon/cmake/find_unwind.cmake)
set (FULL_C_FLAGS "${CMAKE_C_FLAGS} ${CMAKE_C_FLAGS_${CMAKE_BUILD_TYPE_UC}}")
set (FULL_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CMAKE_CXX_FLAGS_${CMAKE_BUILD_TYPE_UC}}")
set (FULL_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${CMAKE_EXE_LINKER_FLAGS_${CMAKE_BUILD_TYPE_UC}}")
message (STATUS "C_FLAGS = ${FULL_C_FLAGS}")
message (STATUS "CXX_FLAGS = ${FULL_CXX_FLAGS}")
message (STATUS "LINKER_FLAGS = ${FULL_EXE_LINKER_FLAGS}")
include (cmake/print_flags.cmake)
# Directory for Yandex specific files
set (CLICKHOUSE_PRIVATE_DIR ${ClickHouse_SOURCE_DIR}/private/)

29
cmake/arch.cmake Normal file
View File

@ -0,0 +1,29 @@
if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(aarch64.*|AARCH64.*)")
set (ARCH_AARCH64 1)
endif ()
if (ARCH_AARCH64 OR CMAKE_SYSTEM_PROCESSOR MATCHES "arm")
set (ARCH_ARM 1)
endif ()
if (CMAKE_LIBRARY_ARCHITECTURE MATCHES "i386")
set (ARCH_I386 1)
endif ()
if ( ( ARCH_ARM AND NOT ARCH_AARCH64 ) OR ARCH_I386)
set (ARCH_32 1)
message (WARNING "Support for 32bit platforms is highly experimental")
endif ()
if (CMAKE_SYSTEM MATCHES "Linux")
set (ARCH_LINUX 1)
endif ()
if (CMAKE_SYSTEM MATCHES "FreeBSD")
set (ARCH_FREEBSD 1)
endif ()
if (NOT MSVC)
set (NOT_MSVC 1)
endif ()
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (COMPILER_GCC 1)
elseif (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set (COMPILER_CLANG 1)
endif ()

View File

@ -1,4 +1,4 @@
option (ENABLE_CAPNP "Enable Cap'n Proto" ON)
option (ENABLE_CAPNP "Enable Cap'n Proto" ${NOT_MSVC})
if (ENABLE_CAPNP)

View File

@ -1,6 +1,6 @@
option (USE_EMBEDDED_COMPILER "Set to TRUE to enable support for 'compile' option for query execution" FALSE)
option (ENABLE_EMBEDDED_COMPILER "Set to TRUE to enable support for 'compile' option for query execution" FALSE)
if (USE_EMBEDDED_COMPILER)
if (ENABLE_EMBEDDED_COMPILER)
# Based on source code of YT.
# Authors: Ivan Puzyrevskiy, Alexey Lukyanchikov, Ruslan Savchenko.
@ -15,10 +15,14 @@ if (USE_EMBEDDED_COMPILER)
# llvm_map_components_to_libraries - Maps LLVM used components to required libraries.
# Usage: llvm_map_components_to_libraries(REQUIRED_LLVM_LIBRARIES core jit interpreter native ...)
if (ARCH_FREEBSD)
set(LLVM_VERSION_POSTFIX "50" CACHE INTERNAL "")
if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set(LLVM_VERSION_POSTFIX "${COMPILER_POSTFIX}" CACHE STRING "")
else()
set(LLVM_VERSION_POSTFIX "-5.0" CACHE INTERNAL "")
if (ARCH_FREEBSD)
set(LLVM_VERSION_POSTFIX "50" CACHE STRING "")
else()
set(LLVM_VERSION_POSTFIX "-5.0" CACHE STRING "")
endif()
endif()
find_program(LLVM_CONFIG_EXECUTABLE
@ -28,9 +32,8 @@ if (USE_EMBEDDED_COMPILER)
mark_as_advanced(LLVM_CONFIG_EXECUTABLE)
if(NOT LLVM_CONFIG_EXECUTABLE)
message(FATAL_ERROR "Cannot find LLVM (looking for `llvm-config`). Please, provide LLVM_ROOT environment variable.")
message(FATAL_ERROR "Cannot find LLVM (looking for `llvm-config${LLVM_VERSION_POSTFIX}`, `llvm-config`, `llvm-config-devel`). Please, provide LLVM_ROOT environment variable.")
else()
set(LLVM_FOUND TRUE)
execute_process(
@ -97,4 +100,8 @@ if (USE_EMBEDDED_COMPILER)
message(STATUS "LLVM Library Directory: ${LLVM_LIBRARY_DIRS}")
message(STATUS "LLVM C++ Compiler: ${LLVM_CXXFLAGS}")
endif()
if (LLVM_FOUND AND LLVM_INCLUDE_DIRS AND LLVM_LIBRARY_DIRS)
set(USE_EMBEDDED_COMPILER TRUE)
endif()
endif()

View File

@ -1,23 +0,0 @@
set (OPENSSL_USE_STATIC_LIBS ${USE_STATIC_LIBRARIES})
if (APPLE)
set (OPENSSL_ROOT_DIR "/usr/local/opt/openssl")
# https://rt.openssl.org/Ticket/Display.html?user=guest&pass=guest&id=2232
if (USE_STATIC_LIBRARIES)
message(WARNING "Disable USE_STATIC_LIBRARIES if you have linking problems with OpenSSL on MacOS")
endif ()
endif ()
find_package (OpenSSL)
if (NOT OPENSSL_FOUND)
# Try to find manually.
set (OPENSSL_INCLUDE_PATHS "/usr/local/opt/openssl/include")
set (OPENSSL_PATHS "/usr/local/opt/openssl/lib")
find_path (OPENSSL_INCLUDE_DIR NAMES openssl/ssl.h PATHS ${OPENSSL_INCLUDE_PATHS})
find_library (OPENSSL_SSL_LIBRARY ssl PATHS ${OPENSSL_PATHS})
find_library (OPENSSL_CRYPTO_LIBRARY crypto PATHS ${OPENSSL_PATHS})
if (OPENSSL_SSL_LIBRARY AND OPENSSL_CRYPTO_LIBRARY AND OPENSSL_INCLUDE_DIR)
set (OPENSSL_LIBRARIES ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY})
set (OPENSSL_FOUND 1)
endif ()
endif ()
message (STATUS "Using openssl=${OPENSSL_FOUND}: ${OPENSSL_INCLUDE_DIR} : ${OPENSSL_LIBRARIES}")

View File

@ -57,8 +57,8 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
if (USE_STATIC_LIBRARIES AND USE_INTERNAL_ZLIB_LIBRARY)
list (APPEND Poco_INCLUDE_DIRS
"${ClickHouse_SOURCE_DIR}/contrib/zlib-ng/"
"${ClickHouse_BINARY_DIR}/contrib/zlib-ng/"
"${ClickHouse_SOURCE_DIR}/contrib/${INTERNAL_ZLIB_NAME}/"
"${ClickHouse_BINARY_DIR}/contrib/${INTERNAL_ZLIB_NAME}/"
)
endif ()

View File

@ -1,4 +1,4 @@
option (ENABLE_RDKAFKA "Enable kafka" ON)
option (ENABLE_RDKAFKA "Enable kafka" ${NOT_MSVC})
if (ENABLE_RDKAFKA)

44
cmake/find_ssl.cmake Normal file
View File

@ -0,0 +1,44 @@
option (USE_INTERNAL_SSL_LIBRARY "Set to FALSE to use system *ssl library instead of bundled" ${MSVC})
set (OPENSSL_USE_STATIC_LIBS ${USE_STATIC_LIBRARIES})
if (NOT USE_INTERNAL_SSL_LIBRARY)
if (APPLE)
set (OPENSSL_ROOT_DIR "/usr/local/opt/openssl")
# https://rt.openssl.org/Ticket/Display.html?user=guest&pass=guest&id=2232
if (USE_STATIC_LIBRARIES)
message(WARNING "Disable USE_STATIC_LIBRARIES if you have linking problems with OpenSSL on MacOS")
endif ()
endif ()
find_package (OpenSSL)
if (NOT OPENSSL_FOUND)
# Try to find manually.
set (OPENSSL_INCLUDE_PATHS "/usr/local/opt/openssl/include")
set (OPENSSL_PATHS "/usr/local/opt/openssl/lib")
find_path (OPENSSL_INCLUDE_DIR NAMES openssl/ssl.h PATHS ${OPENSSL_INCLUDE_PATHS})
find_library (OPENSSL_SSL_LIBRARY ssl PATHS ${OPENSSL_PATHS})
find_library (OPENSSL_CRYPTO_LIBRARY crypto PATHS ${OPENSSL_PATHS})
if (OPENSSL_SSL_LIBRARY AND OPENSSL_CRYPTO_LIBRARY AND OPENSSL_INCLUDE_DIR)
set (OPENSSL_LIBRARIES ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY})
set (OPENSSL_FOUND 1)
endif ()
endif ()
endif ()
if (NOT OPENSSL_FOUND)
set (USE_INTERNAL_SSL_LIBRARY 1)
set (OPENSSL_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/ssl")
set (OPENSSL_INCLUDE_DIR "${OPENSSL_ROOT_DIR}/include")
if (NOT USE_STATIC_LIBRARIES AND TARGET crypto-shared AND TARGET ssl-shared)
set (OPENSSL_CRYPTO_LIBRARY crypto-shared)
set (OPENSSL_SSL_LIBRARY ssl-shared)
else ()
set (OPENSSL_CRYPTO_LIBRARY crypto)
set (OPENSSL_SSL_LIBRARY ssl)
endif ()
set (OPENSSL_LIBRARIES ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY})
set (OPENSSL_FOUND 1)
endif ()
message (STATUS "Using ssl=${OPENSSL_FOUND}: ${OPENSSL_INCLUDE_DIR} : ${OPENSSL_LIBRARIES}")

View File

@ -5,6 +5,15 @@ if (NOT USE_INTERNAL_ZLIB_LIBRARY)
endif ()
if (NOT ZLIB_FOUND)
if (NOT MSVC)
set (INTERNAL_ZLIB_NAME "zlib-ng")
else ()
set (INTERNAL_ZLIB_NAME "zlib")
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/${INTERNAL_ZLIB_NAME}")
message (WARNING "Will use standard zlib, please clone manually:\n git clone https://github.com/madler/zlib.git ${ClickHouse_SOURCE_DIR}/contrib/${INTERNAL_ZLIB_NAME}")
endif ()
endif ()
set (USE_INTERNAL_ZLIB_LIBRARY 1)
set (ZLIB_COMPAT 1) # for zlib-ng, also enables WITH_GZFILEOP
set (WITH_NATIVE_INSTRUCTIONS ${ARCHNATIVE})
@ -15,7 +24,7 @@ if (NOT ZLIB_FOUND)
set(WITH_NEON 1 CACHE INTERNAL "")
set(WITH_ACLE 1 CACHE INTERNAL "")
endif ()
set (ZLIB_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/zlib-ng" "${ClickHouse_BINARY_DIR}/contrib/zlib-ng") # generated zconf.h
set (ZLIB_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/${INTERNAL_ZLIB_NAME}" "${ClickHouse_BINARY_DIR}/contrib/${INTERNAL_ZLIB_NAME}") # generated zconf.h
set (ZLIB_INCLUDE_DIRS ${ZLIB_INCLUDE_DIR}) # for poco
set (ZLIB_FOUND 1) # for poco
if (USE_STATIC_LIBRARIES)

View File

@ -1,4 +1,3 @@
set(DIVIDE_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libdivide)
set(CITYHASH_CONTRIB_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcityhash/include)
set(COMMON_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/libs/libcommon/include ${ClickHouse_BINARY_DIR}/libs/libcommon/include)

6
cmake/print_flags.cmake Normal file
View File

@ -0,0 +1,6 @@
set (FULL_C_FLAGS "${CMAKE_C_FLAGS} ${CMAKE_C_FLAGS_${CMAKE_BUILD_TYPE_UC}}")
set (FULL_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${CMAKE_CXX_FLAGS_${CMAKE_BUILD_TYPE_UC}}")
set (FULL_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${CMAKE_EXE_LINKER_FLAGS_${CMAKE_BUILD_TYPE_UC}}")
message (STATUS "compiler C = ${CMAKE_C_COMPILER} ${FULL_C_FLAGS}")
message (STATUS "compiler CXX = ${CMAKE_CXX_COMPILER} ${FULL_CXX_FLAGS}")
message (STATUS "LINKER_FLAGS = ${FULL_EXE_LINKER_FLAGS}")

27
cmake/sanitize.cmake Normal file
View File

@ -0,0 +1,27 @@
set (SAN_FLAGS "${SAN_FLAGS} -g -fno-omit-frame-pointer")
if (SAN_DEBUG)
set (SAN_FLAGS "${SAN_FLAGS} -O0")
else ()
set (SAN_FLAGS "${SAN_FLAGS} -O3")
endif ()
set (CMAKE_CXX_FLAGS_ASAN "${CMAKE_CXX_FLAGS_ASAN} ${SAN_FLAGS} -fsanitize=address")
set (CMAKE_C_FLAGS_ASAN "${CMAKE_C_FLAGS_ASAN} ${SAN_FLAGS} -fsanitize=address")
set (CMAKE_EXE_LINKER_FLAGS_ASAN "${CMAKE_EXE_LINKER_FLAGS_ASAN} -fsanitize=address")
set (CMAKE_CXX_FLAGS_UBSAN "${CMAKE_CXX_FLAGS_UBSAN} ${SAN_FLAGS} -fsanitize=undefined")
set (CMAKE_C_FLAGS_UBSAN "${CMAKE_C_FLAGS_UBSAN} ${SAN_FLAGS} -fsanitize=undefined")
set (CMAKE_EXE_LINKER_FLAGS_UBSAN "${CMAKE_EXE_LINKER_FLAGS_UBSAN} -fsanitize=undefined")
set (CMAKE_CXX_FLAGS_MSAN "${CMAKE_CXX_FLAGS_MSAN} ${SAN_FLAGS} -fsanitize=memory")
set (CMAKE_C_FLAGS_MSAN "${CMAKE_C_FLAGS_MSAN} ${SAN_FLAGS} -fsanitize=memory")
set (CMAKE_EXE_LINKER_FLAGS_MSAN "${CMAKE_EXE_LINKER_FLAGS_MSAN} -fsanitize=memory")
set (CMAKE_CXX_FLAGS_TSAN "${CMAKE_CXX_FLAGS_TSAN} ${SAN_FLAGS} -fsanitize=thread")
set (CMAKE_C_FLAGS_TSAN "${CMAKE_C_FLAGS_TSAN} ${SAN_FLAGS} -fsanitize=thread")
set (CMAKE_EXE_LINKER_FLAGS_TSAN "${CMAKE_EXE_LINKER_FLAGS_TSAN} -fsanitize=thread")
# clang use static linking by default
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS_ASAN "${CMAKE_EXE_LINKER_FLAGS_ASAN} -static-libasan")
set (CMAKE_EXE_LINKER_FLAGS_UBSAN "${CMAKE_EXE_LINKER_FLAGS_UBSAN} -static-libubsan")
set (CMAKE_EXE_LINKER_FLAGS_MSAN "${CMAKE_EXE_LINKER_FLAGS_MSAN} -static-libmsan")
set (CMAKE_EXE_LINKER_FLAGS_TSAN "${CMAKE_EXE_LINKER_FLAGS_TSAN} -static-libtsan")
endif ()

View File

@ -1,4 +1,6 @@
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-old-style-cast")
if (NOT MSVC)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-old-style-cast")
endif ()
if (USE_INTERNAL_BOOST_LIBRARY)
add_subdirectory (libboost)
@ -48,7 +50,7 @@ if (USE_INTERNAL_UNWIND_LIBRARY)
endif ()
if (USE_INTERNAL_ZLIB_LIBRARY)
add_subdirectory (zlib-ng)
add_subdirectory (${INTERNAL_ZLIB_NAME})
# todo: make pull to Dead2/zlib-ng and remove:
# We should use same defines when including zlib.h as used when zlib compiled
target_compile_definitions (zlib PUBLIC ZLIB_COMPAT WITH_GZFILEOP)
@ -81,16 +83,39 @@ if (NOT ARCH_ARM)
add_subdirectory (libcpuid)
endif ()
if (USE_INTERNAL_SSL_LIBRARY)
if (NOT MAKE_STATIC_LIBRARIES)
set (BUILD_SHARED 1)
endif ()
set (USE_SHARED ${USE_STATIC_LIBRARIES})
add_subdirectory (ssl)
target_include_directories(${OPENSSL_CRYPTO_LIBRARY} PUBLIC ${OPENSSL_INCLUDE_DIR})
target_include_directories(${OPENSSL_SSL_LIBRARY} PUBLIC ${OPENSSL_INCLUDE_DIR})
endif ()
if (USE_INTERNAL_RDKAFKA_LIBRARY)
set (RDKAFKA_BUILD_EXAMPLES OFF CACHE INTERNAL "")
set (RDKAFKA_BUILD_TESTS OFF CACHE INTERNAL "")
set (RDKAFKA_BUILD_STATIC ON CACHE INTERNAL "")
set (RDKAFKA_BUILD_STATIC ${MAKE_STATIC_LIBRARIES} CACHE INTERNAL "")
mark_as_advanced (ZLIB_INCLUDE_DIR)
if (USE_INTERNAL_SSL_LIBRARY)
add_library(bundled-ssl ALIAS ${OPENSSL_SSL_LIBRARY})
set (WITH_BUNDLED_SSL 1)
endif ()
add_subdirectory (librdkafka)
if (USE_INTERNAL_SSL_LIBRARY)
target_include_directories(rdkafka PRIVATE BEFORE ${OPENSSL_INCLUDE_DIR})
endif ()
target_include_directories(rdkafka PRIVATE BEFORE ${ZLIB_INCLUDE_DIR})
endif ()
if (USE_INTERNAL_CAPNP_LIBRARY)
if (NOT APPLE) # tests never end
if (APPLE) # tests never end
set (BUILD_TESTING 0 CACHE INTERNAL "")
else ()
set (BUILD_TESTING ${ENABLE_TESTS} CACHE INTERNAL "")
endif ()
set (_save ${CMAKE_CXX_EXTENSIONS})
@ -102,17 +127,23 @@ endif ()
if (USE_INTERNAL_POCO_LIBRARY)
set (ALLOW_DUPLICATE_CUSTOM_TARGETS 1)
set (save_CMAKE_CXX_FLAGS ${CMAKE_CXX_FLAGS})
set (save_CMAKE_C_FLAGS ${CMAKE_C_FLAGS})
set (_save ${ENABLE_TESTS})
set (ENABLE_TESTS 0)
set (CMAKE_DISABLE_FIND_PACKAGE_ZLIB 1)
if (USE_INTERNAL_SSL_LIBRARY)
set (DISABLE_INTERNAL_OPENSSL 1)
set (ENABLE_NETSSL 0) # TODO!
set (ENABLE_CRYPTO 0) # TODO!
endif ()
add_subdirectory (poco)
unset (CMAKE_DISABLE_FIND_PACKAGE_ZLIB)
set (ENABLE_TESTS ${_save})
set (CMAKE_CXX_FLAGS ${save_CMAKE_CXX_FLAGS})
set (CMAKE_C_FLAGS ${save_CMAKE_C_FLAGS})
if (OPENSSL_FOUND)
if (OPENSSL_FOUND AND TARGET Crypto)
# Bug in poco https://github.com/pocoproject/poco/pull/2100 found on macos
target_include_directories(Crypto PUBLIC ${OPENSSL_INCLUDE_DIR})
endif ()

View File

@ -1,5 +1,6 @@
add_definitions(-Wno-unused-variable)
add_definitions(-Wno-deprecated-declarations)
if (NOT MSVC)
add_definitions(-Wno-unused-variable -Wno-deprecated-declarations)
endif ()
add_library(boost_program_options_internal
boost_1_65_0/libs/program_options/src/cmdline.cpp

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit 81d4fdfcb887f89b0f7b1e9b503cbe63e6d8366b
Subproject commit cf1ad2e9a30ee9161772dc7bc9bf6e165cc51768

1
contrib/ssl vendored Submodule

@ -0,0 +1 @@
Subproject commit 6fbe1c6f404193989c5f6a63115d80fbe34ce2a3

View File

@ -18,7 +18,9 @@ get_property (BUILD_INCLUDE_DIRECTORIES DIRECTORY ${ClickHouse_SOURCE_DIR} PROPE
string (TIMESTAMP BUILD_DATE "%Y-%m-%d" UTC)
configure_file (${CMAKE_CURRENT_SOURCE_DIR}/src/Common/config_build.cpp.in ${CONFIG_BUILD})
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wextra")
if (NOT MSVC)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wextra")
endif ()
if (NOT NO_WERROR)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Werror")
@ -77,13 +79,17 @@ list (APPEND dbms_headers
src/AggregateFunctions/FactoryHelpers.h
src/AggregateFunctions/parseAggregateFunctionParameters.h)
list (APPEND dbms_sources src/TableFunctions/TableFunctionFactory.cpp)
list (APPEND dbms_sources src/TableFunctions/ITableFunction.cpp src/TableFunctions/TableFunctionFactory.cpp)
list (APPEND dbms_headers src/TableFunctions/ITableFunction.h src/TableFunctions/TableFunctionFactory.h)
add_library(clickhouse_common_io ${SPLIT_SHARED} ${clickhouse_common_io_headers} ${clickhouse_common_io_sources})
if (ARCH_FREEBSD)
target_compile_definitions (clickhouse_common_io PUBLIC CLOCK_MONOTONIC_COARSE=CLOCK_MONOTONIC_FAST)
endif ()
add_subdirectory(src/Common/ZooKeeper)
add_subdirectory(src/Common/ConfigProcessor)
add_subdirectory(src/Common/Config)
if (MAKE_STATIC_LIBRARIES)
add_library(dbms ${dbms_headers} ${dbms_sources})
@ -137,7 +143,7 @@ target_link_libraries (clickhouse_common_io
target_link_libraries (dbms
clickhouse_parsers
clickhouse_common_configprocessor
clickhouse_common_config
clickhouse_common_io
${MYSQLXX_LIBRARY}
${FARMHASH_LIBRARIES}
@ -162,6 +168,7 @@ endif ()
if (Poco_DataODBC_FOUND)
target_link_libraries (dbms ${Poco_DataODBC_LIBRARY})
target_include_directories (dbms PRIVATE ${ODBC_INCLUDE_DIRECTORIES})
endif()
if (Poco_MongoDB_FOUND)

View File

@ -4,13 +4,6 @@
#include <AggregateFunctions/AggregateFunctionQuantile.h>
#include <AggregateFunctions/QuantileReservoirSampler.h>
#include <AggregateFunctions/QuantileReservoirSamplerDeterministic.h>
#include <AggregateFunctions/QuantileExact.h>
#include <AggregateFunctions/QuantileExactWeighted.h>
#include <AggregateFunctions/QuantileTiming.h>
#include <AggregateFunctions/QuantileTDigest.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
@ -55,27 +48,6 @@ AggregateFunctionPtr createAggregateFunctionQuantile(const std::string & name, c
#undef FOR_NUMERIC_TYPES
struct NameQuantile { static constexpr auto name = "quantile"; };
struct NameQuantiles { static constexpr auto name = "quantiles"; };
struct NameQuantileDeterministic { static constexpr auto name = "quantileDeterministic"; };
struct NameQuantilesDeterministic { static constexpr auto name = "quantilesDeterministic"; };
struct NameQuantileExact { static constexpr auto name = "quantileExact"; };
struct NameQuantileExactWeighted { static constexpr auto name = "quantileExactWeighted"; };
struct NameQuantilesExact { static constexpr auto name = "quantilesExact"; };
struct NameQuantilesExactWeighted { static constexpr auto name = "quantilesExactWeighted"; };
struct NameQuantileTiming { static constexpr auto name = "quantileTiming"; };
struct NameQuantileTimingWeighted { static constexpr auto name = "quantileTimingWeighted"; };
struct NameQuantilesTiming { static constexpr auto name = "quantilesTiming"; };
struct NameQuantilesTimingWeighted { static constexpr auto name = "quantilesTimingWeighted"; };
struct NameQuantileTDigest { static constexpr auto name = "quantileTDigest"; };
struct NameQuantileTDigestWeighted { static constexpr auto name = "quantileTDigestWeighted"; };
struct NameQuantilesTDigest { static constexpr auto name = "quantilesTDigest"; };
struct NameQuantilesTDigestWeighted { static constexpr auto name = "quantilesTDigestWeighted"; };
}
void registerAggregateFunctionsQuantile(AggregateFunctionFactory & factory)

View File

@ -158,3 +158,37 @@ public:
};
}
/// These must be exposed in header for the purpose of dynamic compilation.
#include <AggregateFunctions/QuantileReservoirSampler.h>
#include <AggregateFunctions/QuantileReservoirSamplerDeterministic.h>
#include <AggregateFunctions/QuantileExact.h>
#include <AggregateFunctions/QuantileExactWeighted.h>
#include <AggregateFunctions/QuantileTiming.h>
#include <AggregateFunctions/QuantileTDigest.h>
namespace DB
{
struct NameQuantile { static constexpr auto name = "quantile"; };
struct NameQuantiles { static constexpr auto name = "quantiles"; };
struct NameQuantileDeterministic { static constexpr auto name = "quantileDeterministic"; };
struct NameQuantilesDeterministic { static constexpr auto name = "quantilesDeterministic"; };
struct NameQuantileExact { static constexpr auto name = "quantileExact"; };
struct NameQuantileExactWeighted { static constexpr auto name = "quantileExactWeighted"; };
struct NameQuantilesExact { static constexpr auto name = "quantilesExact"; };
struct NameQuantilesExactWeighted { static constexpr auto name = "quantilesExactWeighted"; };
struct NameQuantileTiming { static constexpr auto name = "quantileTiming"; };
struct NameQuantileTimingWeighted { static constexpr auto name = "quantileTimingWeighted"; };
struct NameQuantilesTiming { static constexpr auto name = "quantilesTiming"; };
struct NameQuantilesTimingWeighted { static constexpr auto name = "quantilesTimingWeighted"; };
struct NameQuantileTDigest { static constexpr auto name = "quantileTDigest"; };
struct NameQuantileTDigestWeighted { static constexpr auto name = "quantileTDigestWeighted"; };
struct NameQuantilesTDigest { static constexpr auto name = "quantilesTDigest"; };
struct NameQuantilesTDigestWeighted { static constexpr auto name = "quantilesTDigestWeighted"; };
}

View File

@ -59,7 +59,7 @@
/** This hash function is not the most optimal, but UniquesHashSet states counted with it,
  * stored in many places on disks (in the Meter), so it continues to be used.
  * stored in many places on disks (in the Yandex.Metrika), so it continues to be used.
  */
struct UniquesHashSetDefaultHash
{
@ -337,8 +337,8 @@ public:
/** Correction of a systematic error due to collisions during hashing in UInt32.
* `fixed_res(res)` formula
* - with how many different elements of fixed_res,
* when randomly scattered across 2^32 baskets,
* filled baskets with average of res is obtained.
* when randomly scattered across 2^32 buckets,
* filled buckets with average of res is obtained.
*/
size_t p32 = 1ULL << 32;
size_t fixed_res = round(p32 * (log(p32) - log(p32 - res)));

View File

@ -135,7 +135,7 @@ ASTPtr createASTIdentifierForColumnInTable(const String & column, const CollectT
{
ASTPtr database_name_identifier_node;
if (!table.database_name.empty())
database_name_identifier_node = std::make_shared<ASTIdentifier>(StringRange(), table.database_name, ASTIdentifier::Column);
database_name_identifier_node = std::make_shared<ASTIdentifier>(table.database_name, ASTIdentifier::Column);
ASTPtr table_name_identifier_node;
String table_name_or_alias;
@ -146,9 +146,9 @@ ASTPtr createASTIdentifierForColumnInTable(const String & column, const CollectT
table_name_or_alias = table.alias;
if (!table_name_or_alias.empty())
table_name_identifier_node = std::make_shared<ASTIdentifier>(StringRange(), table_name_or_alias, ASTIdentifier::Column);
table_name_identifier_node = std::make_shared<ASTIdentifier>(table_name_or_alias, ASTIdentifier::Column);
ASTPtr column_identifier_node = std::make_shared<ASTIdentifier>(StringRange(), column, ASTIdentifier::Column);
ASTPtr column_identifier_node = std::make_shared<ASTIdentifier>(column, ASTIdentifier::Column);
String compound_name;
if (database_name_identifier_node)
@ -157,8 +157,7 @@ ASTPtr createASTIdentifierForColumnInTable(const String & column, const CollectT
compound_name += table_name_or_alias + ".";
compound_name += column;
auto elem = std::make_shared<ASTIdentifier>(
StringRange(), compound_name, ASTIdentifier::Column);
auto elem = std::make_shared<ASTIdentifier>(compound_name, ASTIdentifier::Column);
if (database_name_identifier_node)
elem->children.emplace_back(std::move(database_name_identifier_node));
@ -385,7 +384,7 @@ void AnalyzeColumns::process(ASTPtr & ast, const CollectAliases & aliases, const
for (auto & child : ast->children)
{
if (select && (child.get() == select->format.get() || child.get() == select->settings.get()))
if (select && child.get() == select->settings.get())
continue;
processImpl(child, columns, aliases, tables);

View File

@ -6,6 +6,7 @@
#include <Analyzers/AnalyzeResultOfQuery.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/convertFieldToType.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ASTIdentifier.h>
@ -72,8 +73,8 @@ void processLiteral(const String & column_name, const ASTPtr & ast, TypeAndConst
TypeAndConstantInference::ExpressionInfo expression_info;
expression_info.node = ast;
expression_info.is_constant_expression = true;
expression_info.value = literal->value;
expression_info.data_type = applyVisitor(FieldToDataType(), expression_info.value);
expression_info.data_type = applyVisitor(FieldToDataType(), literal->value);
expression_info.value = convertFieldToType(literal->value, *expression_info.data_type);
info.emplace(column_name, std::move(expression_info));
}

View File

@ -393,7 +393,7 @@ void Connection::sendData(const Block & block, const String & name)
else
maybe_compressed_out = out;
block_out = std::make_shared<NativeBlockOutputStream>(*maybe_compressed_out, server_revision);
block_out = std::make_shared<NativeBlockOutputStream>(*maybe_compressed_out, server_revision, block.cloneEmpty());
}
writeVarUInt(Protocol::Client::Data, *out);

View File

@ -3,7 +3,6 @@
#include <Common/Throttler.h>
#include <Client/Connection.h>
#include <Client/ConnectionPoolWithFailover.h>
#include <Poco/ScopedLock.h>
#include <mutex>
namespace DB

View File

@ -78,6 +78,7 @@ public:
bool isFixedAndContiguous() const override { return false; }
bool valuesHaveFixedSize() const override { return nested_column->valuesHaveFixedSize(); }
size_t sizeOfValueIfFixed() const override { return null_map->sizeOfValueIfFixed() + nested_column->sizeOfValueIfFixed(); }
bool onlyNull() const override { return nested_column->isDummy(); }
/// Return the column that represents values.

View File

@ -160,7 +160,7 @@ class AIOContextPool : public ext::singleton<AIOContextPool>
const auto it = promises.find(id);
if (it == std::end(promises))
{
LOG_CRITICAL(&Poco::Logger::get("AIOcontextPool"), "Found io_event with unknown id " << id);
LOG_ERROR(&Poco::Logger::get("AIOcontextPool"), "Found io_event with unknown id " << id);
continue;
}

View File

@ -16,8 +16,8 @@ public:
bool isCancelled() const { return counter > 0; }
/// Temporarily blocks corresponding actions (while the returned object is alive)
struct BlockHolder;
BlockHolder cancel() const { return BlockHolder(this); }
struct LockHolder;
LockHolder cancel() const { return LockHolder(this); }
/// Cancel the actions forever.
void cancelForever() const { ++counter; }
@ -26,30 +26,30 @@ public:
auto & getCounter() { return counter; }
/// Blocks related action while a BlockerHolder instance exists
struct BlockHolder
struct LockHolder
{
explicit BlockHolder(const ActionBlocker * var_ = nullptr) : var(var_)
explicit LockHolder(const ActionBlocker * var_ = nullptr) : var(var_)
{
if (var)
++var->counter;
}
BlockHolder(BlockHolder && other) noexcept
LockHolder(LockHolder && other) noexcept
{
*this = std::move(other);
}
BlockHolder & operator=(BlockHolder && other) noexcept
LockHolder & operator=(LockHolder && other) noexcept
{
var = other.var;
other.var = nullptr;
return *this;
}
BlockHolder(const BlockHolder & other) = delete;
BlockHolder & operator=(const BlockHolder & other) = delete;
LockHolder(const LockHolder & other) = delete;
LockHolder & operator=(const LockHolder & other) = delete;
~BlockHolder()
~LockHolder()
{
if (var)
--var->counter;

View File

@ -0,0 +1,9 @@
include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(clickhouse_common_config .)
add_library(clickhouse_common_config ${SPLIT_SHARED} ${clickhouse_common_config_headers} ${clickhouse_common_config_sources})
target_link_libraries (clickhouse_common_config clickhouse_common_zookeeper string_utils)
target_include_directories (clickhouse_common_config PRIVATE ${DBMS_INCLUDE_DIR})

View File

@ -377,7 +377,9 @@ ConfigProcessor::Files ConfigProcessor::getConfigMergeFiles(const std::string &
for (Poco::DirectoryIterator it(merge_dir_name); it != Poco::DirectoryIterator(); ++it)
{
Poco::File & file = *it;
if (file.isFile() && (endsWith(file.path(), ".xml") || endsWith(file.path(), ".conf")))
if (file.isFile()
&& (endsWith(file.path(), ".xml") || endsWith(file.path(), ".conf"))
&& !startsWith(file.path(), ".")) // skip temporary files
{
files.push_back(file.path());
}

View File

@ -2,12 +2,9 @@
#include <Poco/Util/Application.h>
#include <Poco/File.h>
#include <common/logger_useful.h>
#include <Interpreters/Context.h>
#include <Common/setThreadName.h>
#include <Common/ConfigProcessor/ConfigProcessor.h>
#include "ConfigProcessor.h"
namespace DB

View File

@ -1,9 +1,8 @@
#pragma once
#include <Common/ConfigProcessor/ConfigProcessor.h>
#include "ConfigProcessor.h"
#include <Common/ZooKeeper/Common.h>
#include <Common/ZooKeeper/ZooKeeperNodeCache.h>
#include <time.h>
#include <string>
#include <thread>

View File

@ -1,9 +0,0 @@
include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(clickhouse_common_configprocessor .)
add_library(clickhouse_common_configprocessor ${SPLIT_SHARED} ${clickhouse_common_configprocessor_headers} ${clickhouse_common_configprocessor_sources})
target_link_libraries (clickhouse_common_configprocessor clickhouse_common_zookeeper string_utils)
target_include_directories (clickhouse_common_configprocessor PRIVATE ${DBMS_INCLUDE_DIR})

View File

@ -9,7 +9,6 @@
#include <Poco/File.h>
#include <Poco/Exception.h>
#include <mutex>
#include <Poco/ScopedLock.h>
#include <Common/Exception.h>
#include <IO/ReadBufferFromFileDescriptor.h>

View File

@ -262,7 +262,6 @@ namespace ErrorCodes
extern const int PARTITION_ALREADY_EXISTS = 256;
extern const int PARTITION_DOESNT_EXIST = 257;
extern const int UNION_ALL_RESULT_STRUCTURES_MISMATCH = 258;
extern const int UNION_ALL_COLUMN_ALIAS_MISMATCH = 259;
extern const int CLIENT_OUTPUT_FORMAT_SPECIFIED = 260;
extern const int UNKNOWN_BLOCK_INFO_FIELD = 261;
extern const int BAD_COLLATION = 262;
@ -368,6 +367,8 @@ namespace ErrorCodes
extern const int INSERT_WAS_DEDUPLICATED = 389;
extern const int CANNOT_GET_CREATE_TABLE_QUERY = 390;
extern const int EXTERNAL_LIBRARY_ERROR = 391;
extern const int QUERY_IS_PROHIBITED = 392;
extern const int THERE_IS_NO_QUERY = 393;
extern const int KEEPER_EXCEPTION = 999;

View File

@ -30,6 +30,7 @@
M(ArenaAllocChunks) \
M(ArenaAllocBytes) \
M(FunctionExecute) \
M(TableFunctionExecute) \
M(MarkCacheHits) \
M(MarkCacheMisses) \
M(CreatedReadBufferOrdinary) \

View File

@ -1,14 +1,25 @@
#pragma once
#include <time.h>
#include <mutex>
#include <Poco/ScopedLock.h>
#include <atomic>
#include <common/Types.h>
#ifdef __APPLE__
#include <common/apple_rt.h>
#endif
namespace StopWatchDetail
{
inline UInt64 nanoseconds(clockid_t clock_type)
{
struct timespec ts;
clock_gettime(clock_type, &ts);
return ts.tv_sec * 1000000000ULL + ts.tv_nsec;
}
}
/** Differs from Poco::Stopwatch only by using 'clock_gettime' instead of 'gettimeofday',
* returns nanoseconds instead of microseconds, and also by other minor differencies.
*/
@ -18,77 +29,64 @@ public:
/** CLOCK_MONOTONIC works relatively efficient (~15 million calls/sec) and doesn't lead to syscall.
* Pass CLOCK_MONOTONIC_COARSE, if you need better performance with acceptable cost of several milliseconds of inaccuracy.
*/
Stopwatch(clockid_t clock_type_ = CLOCK_MONOTONIC) : clock_type(clock_type_) { restart(); }
Stopwatch(clockid_t clock_type_ = CLOCK_MONOTONIC) : clock_type(clock_type_) { start(); }
void start() { setStart(); is_running = true; }
void stop() { updateElapsed(); is_running = false; }
void restart() { elapsed_ns = 0; start(); }
UInt64 elapsed() const { updateElapsed(); return elapsed_ns; }
UInt64 elapsedMilliseconds() const { updateElapsed(); return elapsed_ns / 1000000UL; }
double elapsedSeconds() const { updateElapsed(); return static_cast<double>(elapsed_ns) / 1000000000ULL; }
void start() { start_ns = nanoseconds(); is_running = true; }
void stop() { stop_ns = nanoseconds(); is_running = false; }
void restart() { start(); }
UInt64 elapsed() const { return is_running ? nanoseconds() - start_ns : stop_ns - start_ns; }
UInt64 elapsedMilliseconds() const { return elapsed() / 1000000UL; }
double elapsedSeconds() const { return static_cast<double>(elapsed()) / 1000000000ULL; }
private:
mutable UInt64 start_ns;
mutable UInt64 elapsed_ns;
UInt64 start_ns = 0;
UInt64 stop_ns = 0;
clockid_t clock_type;
bool is_running;
bool is_running = false;
void setStart()
{
struct timespec ts;
clock_gettime(clock_type, &ts);
start_ns = ts.tv_sec * 1000000000ULL + ts.tv_nsec;
}
void updateElapsed() const
{
if (is_running)
{
struct timespec ts;
clock_gettime(clock_type, &ts);
UInt64 current_ns = ts.tv_sec * 1000000000ULL + ts.tv_nsec;
elapsed_ns += current_ns - start_ns;
start_ns = current_ns;
}
}
UInt64 nanoseconds() const { return StopWatchDetail::nanoseconds(clock_type); }
};
class StopwatchWithLock : public Stopwatch
class AtomicStopwatch
{
public:
/** If specified amount of time has passed and timer is not locked right now, then restarts timer and returns true.
AtomicStopwatch(clockid_t clock_type_ = CLOCK_MONOTONIC) : clock_type(clock_type_) { restart(); }
void restart() { start_ns = nanoseconds(); }
UInt64 elapsed() const { return nanoseconds() - start_ns; }
UInt64 elapsedMilliseconds() const { return elapsed() / 1000000UL; }
double elapsedSeconds() const { return static_cast<double>(elapsed()) / 1000000000ULL; }
/** If specified amount of time has passed, then restarts timer and returns true.
* Otherwise returns false.
* This is done atomically.
*/
bool lockTestAndRestart(double seconds)
bool compareAndRestart(double seconds)
{
std::unique_lock<std::mutex> lock(mutex, std::defer_lock);
if (!lock.try_lock())
return false;
UInt64 threshold = seconds * 1000000000ULL;
UInt64 current_ns = nanoseconds();
UInt64 current_start_ns = start_ns;
if (elapsedSeconds() >= seconds)
while (true)
{
restart();
return true;
if (current_ns < current_start_ns + threshold)
return false;
if (start_ns.compare_exchange_weak(current_start_ns, current_ns))
return true;
}
else
return false;
}
struct Lock
{
StopwatchWithLock * parent = nullptr;
std::unique_lock<std::mutex> lock;
AtomicStopwatch * parent = nullptr;
Lock() {}
operator bool() const { return parent != nullptr; }
Lock(StopwatchWithLock * parent, std::unique_lock<std::mutex> && lock)
: parent(parent), lock(std::move(lock))
{
}
Lock(AtomicStopwatch * parent) : parent(parent) {}
Lock(Lock &&) = default;
@ -105,21 +103,33 @@ public:
* This is done atomically.
*
* Usage:
* if (auto lock = timer.lockTestAndRestartAfter(1))
* if (auto lock = timer.compareAndRestartDeferred(1))
* /// do some work, that must be done in one thread and not more frequently than each second.
*/
Lock lockTestAndRestartAfter(double seconds)
Lock compareAndRestartDeferred(double seconds)
{
std::unique_lock<std::mutex> lock(mutex, std::defer_lock);
if (!lock.try_lock())
return {};
UInt64 threshold = seconds * 1000000000ULL;
UInt64 current_ns = nanoseconds();
UInt64 current_start_ns = start_ns;
if (elapsedSeconds() >= seconds)
return Lock(this, std::move(lock));
while (true)
{
if ((current_start_ns & 0x8000000000000000ULL))
return {};
return {};
if (current_ns < current_start_ns + threshold)
return {};
if (start_ns.compare_exchange_weak(current_start_ns, current_ns | 0x8000000000000000ULL))
return Lock(this);
}
}
private:
std::mutex mutex;
std::atomic<UInt64> start_ns;
std::atomic<bool> lock {false};
clockid_t clock_type;
/// Most significant bit is a lock. When it is set, compareAndRestartDeferred method will return false.
UInt64 nanoseconds() const { return StopWatchDetail::nanoseconds(clock_type) & 0x7FFFFFFFFFFFFFFFULL; }
};

View File

@ -641,7 +641,8 @@ int32_t ZooKeeper::tryMulti(const Ops & ops_, OpResultsPtr * out_results_)
int32_t ZooKeeper::tryMultiUnsafe(const Ops & ops, MultiTransactionInfo & info)
{
info.code = multiImpl(ops, &info.op_results);
info.ops = &ops;
for (const OpPtr & op : ops)
info.ops.emplace_back(op->clone());
return info.code;
}

View File

@ -499,13 +499,13 @@ struct MultiTransactionInfo
{
MultiTransactionInfo() = default;
const Ops * ops = nullptr;
Ops ops;
int32_t code = ZOK;
OpResultsPtr op_results;
bool empty() const
{
return ops == nullptr;
return ops.empty();
}
bool hasFailedOp() const
@ -515,7 +515,7 @@ struct MultiTransactionInfo
const Op & getFailedOp() const
{
return *ops->at(getFailedOpIndex(op_results, code));
return *ops.at(getFailedOpIndex(op_results, code));
}
KeeperException getException() const
@ -523,7 +523,7 @@ struct MultiTransactionInfo
if (hasFailedOp())
{
size_t i = getFailedOpIndex(op_results, code);
return KeeperException("Transaction failed at op #" + std::to_string(i) + ": " + ops->at(i)->describe(), code);
return KeeperException("Transaction failed at op #" + std::to_string(i) + ": " + ops.at(i)->describe(), code);
}
else
return KeeperException(code);

View File

@ -14,4 +14,4 @@ add_executable(zkutil_zookeeper_holder zkutil_zookeeper_holder.cpp)
target_link_libraries(zkutil_zookeeper_holder clickhouse_common_zookeeper)
add_executable (zk_many_watches_reconnect zk_many_watches_reconnect.cpp)
target_link_libraries (zk_many_watches_reconnect clickhouse_common_zookeeper clickhouse_common_configprocessor)
target_link_libraries (zk_many_watches_reconnect clickhouse_common_zookeeper clickhouse_common_config)

View File

@ -1,4 +1,4 @@
#include <Common/ConfigProcessor/ConfigProcessor.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Poco/Event.h>
#include <iostream>

View File

@ -68,3 +68,6 @@ target_link_libraries (allocator clickhouse_common_io)
add_executable (cow_columns cow_columns.cpp)
target_link_libraries (cow_columns clickhouse_common_io)
add_executable (stopwatch stopwatch.cpp)
target_link_libraries (stopwatch clickhouse_common_io)

View File

@ -167,7 +167,7 @@ void aggregate33(Map & local_map, Map & global_map, Mutex & mutex, Source::const
if (inserted && local_map.size() == threshold)
{
Poco::ScopedLock<Mutex> lock(mutex);
std::lock_guard<Mutex> lock(mutex);
for (auto & value_type : local_map)
global_map[value_type.first] += value_type.second;

View File

@ -0,0 +1,40 @@
#include <vector>
#include <thread>
#include <iostream>
#include <Common/Stopwatch.h>
int main(int, char **)
{
static constexpr size_t num_threads = 10;
static constexpr size_t num_iterations = 3;
std::vector<std::thread> threads(num_threads);
AtomicStopwatch watch;
Stopwatch total_watch;
for (size_t i = 0; i < num_threads; ++i)
{
threads[i] = std::thread([i, &watch, &total_watch]
{
size_t iteration = 0;
while (iteration < num_iterations)
{
if (auto lock = watch.compareAndRestartDeferred(1))
{
std::cerr << "Thread " << i << ": begin iteration " << iteration << ", elapsed: " << total_watch.elapsedMilliseconds() << " ms.\n";
std::this_thread::sleep_for(std::chrono::milliseconds(500));
std::cerr << "Thread " << i << ": end iteration " << iteration << ", elapsed: " << total_watch.elapsedMilliseconds() << " ms.\n";
++iteration;
}
std::this_thread::sleep_for(std::chrono::milliseconds(500));
}
});
}
for (auto & thread : threads)
thread.join();
return 0;
}

View File

@ -1,4 +1,5 @@
#include <Common/Exception.h>
#include <Common/FieldVisitors.h>
#include <Core/Block.h>
@ -6,6 +7,8 @@
#include <IO/Operators.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnConst.h>
#include <iterator>
#include <memory>
@ -18,6 +21,7 @@ namespace ErrorCodes
extern const int POSITION_OUT_OF_BOUND;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int SIZES_OF_COLUMNS_DOESNT_MATCH;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
}
@ -276,13 +280,7 @@ std::string Block::dumpStructure() const
{
if (it != data.begin())
out << ", ";
out << it->name << ' ' << it->type->getName();
if (it->column)
out << ' ' << it->column->dumpStructure();
else
out << " nullptr";
it->dumpStructure(out);
}
return out.str();
}
@ -379,22 +377,64 @@ Names Block::getNames() const
}
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
template <typename ReturnType>
static ReturnType checkBlockStructure(const Block & lhs, const Block & rhs, const std::string & context_description)
{
size_t columns = lhs.columns();
if (rhs.columns() != columns)
return false;
auto on_error = [](const std::string & message [[maybe_unused]], int code [[maybe_unused]])
{
if constexpr (std::is_same_v<ReturnType, void>)
throw Exception(message, code);
else
return false;
};
size_t columns = rhs.columns();
if (lhs.columns() != columns)
return on_error("Block structure mismatch in " + context_description + " stream: different number of columns:\n"
+ lhs.dumpStructure() + "\n" + rhs.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
for (size_t i = 0; i < columns; ++i)
{
const IDataType & lhs_type = *lhs.safeGetByPosition(i).type;
const IDataType & rhs_type = *rhs.safeGetByPosition(i).type;
const auto & expected = rhs.getByPosition(i);
const auto & actual = lhs.getByPosition(i);
if (!lhs_type.equals(rhs_type))
return false;
if (actual.name != expected.name)
return on_error("Block structure mismatch in " + context_description + " stream: different names of columns:\n"
+ lhs.dumpStructure() + "\n" + rhs.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
if (!actual.type->equals(*expected.type))
return on_error("Block structure mismatch in " + context_description + " stream: different types:\n"
+ lhs.dumpStructure() + "\n" + rhs.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
if (actual.column->getName() != expected.column->getName())
return on_error("Block structure mismatch in " + context_description + " stream: different columns:\n"
+ lhs.dumpStructure() + "\n" + rhs.dumpStructure(), ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
if (actual.column->isColumnConst() && expected.column->isColumnConst())
{
Field actual_value = static_cast<const ColumnConst &>(*actual.column).getField();
Field expected_value = static_cast<const ColumnConst &>(*expected.column).getField();
if (actual_value != expected_value)
return on_error("Block structure mismatch in " + context_description + " stream: different values of constants, actual: "
+ applyVisitor(FieldVisitorToString(), actual_value) + ", expected: " + applyVisitor(FieldVisitorToString(), expected_value),
ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
}
}
return true;
return ReturnType(true);
}
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs)
{
return checkBlockStructure<bool>(lhs, rhs, {});
}
void assertBlocksHaveEqualStructure(const Block & lhs, const Block & rhs, const std::string & context_description)
{
checkBlockStructure<void>(lhs, rhs, context_description);
}
@ -453,12 +493,12 @@ void getBlocksDifference(const Block & lhs, const Block & rhs, std::string & out
for (auto it = left_columns.rbegin(); it != left_columns.rend(); ++it)
{
lhs_diff_writer << it->prettyPrint();
lhs_diff_writer << it->dumpStructure();
lhs_diff_writer << ", position: " << lhs.getPositionByName(it->name) << '\n';
}
for (auto it = right_columns.rbegin(); it != right_columns.rend(); ++it)
{
rhs_diff_writer << it->prettyPrint();
rhs_diff_writer << it->dumpStructure();
rhs_diff_writer << ", position: " << rhs.getPositionByName(it->name) << '\n';
}
}

View File

@ -137,10 +137,13 @@ using Blocks = std::vector<Block>;
using BlocksList = std::list<Block>;
/// Compare column types for blocks. The order of the columns matters. Names do not matter.
/// Compare number of columns, data types, column types, column names, and values of constant columns.
bool blocksHaveEqualStructure(const Block & lhs, const Block & rhs);
/// Calculate difference in structure of blocks and write description into output strings.
/// Throw exception when blocks are different.
void assertBlocksHaveEqualStructure(const Block & lhs, const Block & rhs, const std::string & context_description);
/// Calculate difference in structure of blocks and write description into output strings. NOTE It doesn't compare values of constant columns.
void getBlocksDifference(const Block & lhs, const Block & rhs, std::string & out_lhs_diff, std::string & out_rhs_diff);

View File

@ -1,6 +1,7 @@
#include <Core/ColumnsWithTypeAndName.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
namespace DB
@ -19,7 +20,7 @@ ColumnWithTypeAndName ColumnWithTypeAndName::cloneEmpty() const
}
bool ColumnWithTypeAndName::operator== (const ColumnWithTypeAndName & other) const
bool ColumnWithTypeAndName::operator==(const ColumnWithTypeAndName & other) const
{
return name == other.name
&& ((!type && !other.type) || (type && other.type && type->equals(*other.type)))
@ -27,20 +28,25 @@ bool ColumnWithTypeAndName::operator== (const ColumnWithTypeAndName & other) con
}
String ColumnWithTypeAndName::prettyPrint() const
void ColumnWithTypeAndName::dumpStructure(WriteBuffer & out) const
{
out << name;
if (type)
out << ' ' << type->getName();
else
out << " nullptr";
if (column)
out << ' ' << column->dumpStructure();
else
out << " nullptr";
}
String ColumnWithTypeAndName::dumpStructure() const
{
WriteBufferFromOwnString out;
writeString(name, out);
if (type)
{
writeChar(' ', out);
writeString(type->getName(), out);
}
if (column)
{
writeChar(' ', out);
writeString(column->getName(), out);
}
dumpStructure(out);
return out.str();
}

View File

@ -7,6 +7,9 @@
namespace DB
{
class WriteBuffer;
/** Column data along with its data type and name.
* Column data could be nullptr - to represent just 'header' of column.
* Name could be either name from a table or some temporary generated name during expression evaluation.
@ -28,7 +31,9 @@ struct ColumnWithTypeAndName
ColumnWithTypeAndName cloneEmpty() const;
bool operator==(const ColumnWithTypeAndName & other) const;
String prettyPrint() const;
void dumpStructure(WriteBuffer & out) const;
String dumpStructure() const;
};
}

View File

@ -123,9 +123,7 @@ std::ostream & operator<<(std::ostream & stream, const SubqueryForSet & what)
std::ostream & operator<<(std::ostream & stream, const IAST & what)
{
stream << "IAST("
<< "query_string = " << what.query_string
<<"){";
stream << "IAST{";
what.dumpTree(stream);
stream << "}";
return stream;
@ -135,10 +133,8 @@ std::ostream & operator<<(std::ostream & stream, const ExpressionAnalyzer & what
{
stream << "ExpressionAnalyzer{"
<< "hasAggregation=" << what.hasAggregation()
<< ", RequiredColumns=" << what.getRequiredColumns()
<< ", SubqueriesForSet=" << what.getSubqueriesForSets()
<< ", ExternalTables=" << what.getExternalTables()
// TODO
<< "}";
return stream;
}

View File

@ -10,7 +10,7 @@
namespace DB
{
void AddingDefaultBlockOutputStream::write(const DB::Block & block)
void AddingDefaultBlockOutputStream::write(const Block & block)
{
Block res = block;
@ -71,9 +71,6 @@ void AddingDefaultBlockOutputStream::write(const DB::Block & block)
}
/// Computes explicitly specified values (in column_defaults) by default.
/** @todo if somehow block does not contain values for implicitly-defaulted columns that are prerequisites
* for explicitly-defaulted ones, exception will be thrown during evaluating such columns
* (implicitly-defaulted columns are evaluated on the line after following one. */
evaluateMissingDefaults(res, required_columns, column_defaults, context);
output->write(res);

View File

@ -19,16 +19,18 @@ class AddingDefaultBlockOutputStream : public IBlockOutputStream
public:
AddingDefaultBlockOutputStream(
const BlockOutputStreamPtr & output_,
const Block & header_,
NamesAndTypesList required_columns_,
const ColumnDefaults & column_defaults_,
const Context & context_,
bool only_explicit_column_defaults_)
: output(output_), required_columns(required_columns_),
: output(output_), header(header_), required_columns(required_columns_),
column_defaults(column_defaults_), context(context_),
only_explicit_column_defaults(only_explicit_column_defaults_)
{
}
Block getHeader() const override { return header; }
void write(const Block & block) override;
void flush() override;
@ -38,6 +40,7 @@ public:
private:
BlockOutputStreamPtr output;
Block header;
NamesAndTypesList required_columns;
const ColumnDefaults column_defaults;
const Context & context;

View File

@ -76,7 +76,7 @@ Block AggregatingBlockInputStream::readImpl()
AggregatingBlockInputStream::TemporaryFileStream::TemporaryFileStream(const std::string & path)
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) {}
: file_in(path), compressed_in(file_in),
block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) {}
}

View File

@ -21,8 +21,6 @@ struct BlockIO
BlockInputStreamPtr in;
BlockOutputStreamPtr out;
Block out_sample; /// Example of a block to be written to `out`.
/// Callbacks for query logging could be set here.
std::function<void(IBlockInputStream *, IBlockOutputStream *)> finish_callback;
std::function<void()> exception_callback;
@ -50,7 +48,6 @@ struct BlockIO
process_list_entry = rhs.process_list_entry;
in = rhs.in;
out = rhs.out;
out_sample = rhs.out_sample;
finish_callback = rhs.finish_callback;
exception_callback = rhs.exception_callback;

View File

@ -5,8 +5,8 @@
namespace DB
{
BlockOutputStreamFromRowOutputStream::BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_)
: row_output(row_output_), first_row(true) {}
BlockOutputStreamFromRowOutputStream::BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_, const Block & header_)
: row_output(row_output_), header(header_) {}
void BlockOutputStreamFromRowOutputStream::write(const Block & block)

View File

@ -13,7 +13,9 @@ namespace DB
class BlockOutputStreamFromRowOutputStream : public IBlockOutputStream
{
public:
BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_);
BlockOutputStreamFromRowOutputStream(RowOutputStreamPtr row_output_, const Block & header_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void writePrefix() override { row_output->writePrefix(); }
void writeSuffix() override { row_output->writeSuffix(); }
@ -29,7 +31,8 @@ public:
private:
RowOutputStreamPtr row_output;
bool first_row;
Block header;
bool first_row = true;
};
}

View File

@ -77,11 +77,11 @@ void BlockStreamProfileInfo::collectInfosForStreamsWithName(const char * name, B
return;
}
for (const auto & child_stream : parent->getChildren())
parent->forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
if (const auto * profiling_child = dynamic_cast<const IProfilingBlockInputStream *>(child_stream.get()))
profiling_child->getProfileInfo().collectInfosForStreamsWithName(name, res);
}
child.getProfileInfo().collectInfosForStreamsWithName(name, res);
return false;
});
}
@ -107,11 +107,11 @@ void BlockStreamProfileInfo::calculateRowsBeforeLimit() const
for (const BlockStreamProfileInfo * info_limit_or_sort : limits_or_sortings)
{
for (const auto & child_stream : info_limit_or_sort->parent->getChildren())
info_limit_or_sort->parent->forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
if (const auto * profiling_child = dynamic_cast<const IProfilingBlockInputStream *>(child_stream.get()))
rows_before_limit += profiling_child->getProfileInfo().rows;
}
rows_before_limit += child.getProfileInfo().rows;
return false;
});
}
}
else

View File

@ -1,75 +0,0 @@
#include <DataStreams/CastTypeBlockInputStream.h>
#include <Interpreters/castColumn.h>
namespace DB
{
CastTypeBlockInputStream::CastTypeBlockInputStream(
const Context & context_,
const BlockInputStreamPtr & input_,
const Block & reference_definition_)
: context(context_), ref_definition(reference_definition_)
{
children.emplace_back(input_);
}
String CastTypeBlockInputStream::getName() const
{
return "CastType";
}
Block CastTypeBlockInputStream::readImpl()
{
Block block = children.back()->read();
if (!block)
return block;
if (!initialized)
{
initialized = true;
initialize(block);
}
if (cast_description.empty())
return block;
size_t num_columns = block.columns();
Block res = block;
for (size_t col = 0; col < num_columns; ++col)
{
auto it = cast_description.find(col);
if (cast_description.end() != it)
{
auto & elem = res.getByPosition(col);
elem.column = castColumn(elem, it->second, context);
elem.type = it->second;
}
}
return res;
}
void CastTypeBlockInputStream::initialize(const Block & src_block)
{
for (size_t src_col = 0, num_columns = src_block.columns(); src_col < num_columns; ++src_col)
{
const auto & src_column = src_block.getByPosition(src_col);
/// Skip, if it is a problem, it will be detected on the next pipeline stage
if (!ref_definition.has(src_column.name))
continue;
const auto & ref_column = ref_definition.getByName(src_column.name);
/// Force conversion if source and destination types is different.
if (!ref_column.type->equals(*src_column.type))
cast_description.emplace(src_col, ref_column.type);
}
}
}

View File

@ -1,38 +0,0 @@
#pragma once
#include <unordered_map>
#include <DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
/// Implicitly converts string and numeric values to Enum, numeric types to other numeric types.
class CastTypeBlockInputStream : public IProfilingBlockInputStream
{
public:
CastTypeBlockInputStream(const Context & context,
const BlockInputStreamPtr & input,
const Block & reference_definition);
String getName() const override;
Block getHeader() const override { return ref_definition; }
protected:
Block readImpl() override;
private:
const Context & context;
Block ref_definition;
/// Initializes cast_description and prepares tmp_conversion_block
void initialize(const Block & src_block);
bool initialized = false;
/// Describes required conversions on source block
/// Contains column numbers in source block that should be converted
std::unordered_map<size_t, DataTypePtr> cast_description;
};
}

View File

@ -0,0 +1,100 @@
#include <DataStreams/ConvertingBlockInputStream.h>
#include <Interpreters/castColumn.h>
#include <Columns/ColumnConst.h>
#include <Parsers/IAST.h>
namespace DB
{
namespace ErrorCodes
{
extern const int THERE_IS_NO_COLUMN;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
}
ConvertingBlockInputStream::ConvertingBlockInputStream(
const Context & context_,
const BlockInputStreamPtr & input,
const Block & result_header,
MatchColumnsMode mode)
: context(context_), header(result_header), conversion(header.columns())
{
children.emplace_back(input);
Block input_header = input->getHeader();
size_t num_input_columns = input_header.columns();
size_t num_result_columns = result_header.columns();
if (mode == MatchColumnsMode::Position && num_input_columns != num_result_columns)
throw Exception("Number of columns doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
for (size_t result_col_num = 0; result_col_num < num_result_columns; ++result_col_num)
{
const auto & res_elem = result_header.getByPosition(result_col_num);
switch (mode)
{
case MatchColumnsMode::Position:
conversion[result_col_num] = result_col_num;
break;
case MatchColumnsMode::Name:
if (input_header.has(res_elem.name))
conversion[result_col_num] = input_header.getPositionByName(res_elem.name);
else
throw Exception("Cannot find column " + backQuoteIfNeed(res_elem.name) + " in source stream",
ErrorCodes::THERE_IS_NO_COLUMN);
break;
}
const auto & src_elem = input_header.getByPosition(conversion[result_col_num]);
/// Check constants.
if (res_elem.column->isColumnConst())
{
if (!src_elem.column->isColumnConst())
throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name)
+ " because it is non constant in source stream but must be constant in result",
ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
else if (static_cast<const ColumnConst &>(*src_elem.column).getField() != static_cast<const ColumnConst &>(*res_elem.column).getField())
throw Exception("Cannot convert column " + backQuoteIfNeed(res_elem.name)
+ " because it is constant but values of constants are different in source and result",
ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
}
/// Check conversion by dry run CAST function.
castColumn(src_elem, res_elem.type, context);
}
}
Block ConvertingBlockInputStream::readImpl()
{
Block src = children.back()->read();
if (!src)
return src;
Block res = header.cloneEmpty();
for (size_t res_pos = 0, size = conversion.size(); res_pos < size; ++res_pos)
{
const auto & src_elem = src.getByPosition(conversion[res_pos]);
auto & res_elem = res.getByPosition(res_pos);
ColumnPtr converted = castColumn(src_elem, res_elem.type, context);
if (src_elem.column->isColumnConst() && !res_elem.column->isColumnConst())
converted = converted->convertToFullColumnIfConst();
res_elem.column = std::move(converted);
}
return res;
}
}

View File

@ -0,0 +1,54 @@
#pragma once
#include <unordered_map>
#include <DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
/** Convert one block structure to another:
*
* Leaves only necessary columns;
*
* Columns are searched in source first by name;
* and if there is no column with same name, then by position.
*
* Converting types of matching columns (with CAST function).
*
* Materializing columns which are const in source and non-const in result,
* throw if they are const in result and non const in source,
* or if they are const and have different values.
*/
class ConvertingBlockInputStream : public IProfilingBlockInputStream
{
public:
enum class MatchColumnsMode
{
/// Require same number of columns in source and result. Match columns by corresponding positions, regardless to names.
Position,
/// Find columns in source by their names. Allow excessive columns in source.
Name
};
ConvertingBlockInputStream(
const Context & context,
const BlockInputStreamPtr & input,
const Block & result_header,
MatchColumnsMode mode);
String getName() const override { return "Converting"; }
Block getHeader() const override { return header; }
private:
Block readImpl() override;
const Context & context;
Block header;
/// How to construct result block. Position in source block, where to get each column.
using Conversion = std::vector<size_t>;
Conversion conversion;
};
}

View File

@ -12,7 +12,6 @@ namespace DB
class CountingBlockOutputStream : public IBlockOutputStream
{
public:
CountingBlockOutputStream(const BlockOutputStreamPtr & stream_)
: stream(stream_) {}
@ -31,6 +30,7 @@ public:
return progress;
}
Block getHeader() const override { return stream->getHeader(); }
void write(const Block & block) override;
void writePrefix() override { stream->writePrefix(); }
@ -40,7 +40,6 @@ public:
String getContentType() const override { return stream->getContentType(); }
protected:
BlockOutputStreamPtr stream;
Progress progress;
ProgressCallback progress_callback;

View File

@ -1,5 +1,6 @@
#include <Interpreters/Set.h>
#include <Interpreters/Join.h>
#include <DataStreams/materializeBlock.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/CreatingSetsBlockInputStream.h>
#include <Storages/IStorage.h>
@ -34,7 +35,7 @@ void CreatingSetsBlockInputStream::readPrefixImpl()
}
const Block & CreatingSetsBlockInputStream::getTotals()
Block CreatingSetsBlockInputStream::getTotals()
{
auto input = dynamic_cast<IProfilingBlockInputStream *>(children.back().get());
@ -108,6 +109,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (!done_with_table)
{
block = materializeBlock(block);
table_out->write(block);
rows_to_transfer += block.rows();
@ -146,24 +148,20 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (table_out)
table_out->writeSuffix();
/// We will display information about how many rows and bytes are read.
size_t rows = 0;
size_t bytes = 0;
watch.stop();
subquery.source->getLeafRowsBytes(rows, bytes);
size_t head_rows = 0;
if (IProfilingBlockInputStream * profiling_in = dynamic_cast<IProfilingBlockInputStream *>(&*subquery.source))
{
head_rows = profiling_in->getProfileInfo().rows;
const BlockStreamProfileInfo & profile_info = profiling_in->getProfileInfo();
head_rows = profile_info.rows;
if (subquery.join)
subquery.join->setTotals(profiling_in->getTotals());
}
if (rows != 0)
if (head_rows != 0)
{
std::stringstream msg;
msg << std::fixed << std::setprecision(3);
@ -176,9 +174,7 @@ void CreatingSetsBlockInputStream::createOne(SubqueryForSet & subquery)
if (subquery.table)
msg << "Table with " << head_rows << " rows. ";
msg << "Read " << rows << " rows, " << bytes / 1048576.0 << " MiB in " << watch.elapsedSeconds() << " sec., "
<< static_cast<size_t>(rows / watch.elapsedSeconds()) << " rows/sec., " << bytes / 1048576.0 / watch.elapsedSeconds() << " MiB/sec.";
msg << "In " << watch.elapsedSeconds() << " sec.";
LOG_DEBUG(log, msg.rdbuf());
}
else

View File

@ -38,7 +38,7 @@ public:
Block getHeader() const override { return children.back()->getHeader(); }
/// Takes `totals` only from the main source, not from subquery sources.
const Block & getTotals() override;
Block getTotals() override;
protected:
Block readImpl() override;

View File

@ -13,7 +13,7 @@ ExpressionBlockInputStream::ExpressionBlockInputStream(const BlockInputStreamPtr
String ExpressionBlockInputStream::getName() const { return "Expression"; }
const Block & ExpressionBlockInputStream::getTotals()
Block ExpressionBlockInputStream::getTotals()
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{

View File

@ -22,7 +22,7 @@ public:
ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_);
String getName() const override;
const Block & getTotals() override;
Block getTotals() override;
Block getHeader() const override;
protected:

View File

@ -53,7 +53,7 @@ FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input
String FilterBlockInputStream::getName() const { return "Filter"; }
const Block & FilterBlockInputStream::getTotals()
Block FilterBlockInputStream::getTotals()
{
if (IProfilingBlockInputStream * child = dynamic_cast<IProfilingBlockInputStream *>(&*children.back()))
{

View File

@ -25,7 +25,7 @@ public:
FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const String & filter_column_name_);
String getName() const override;
const Block & getTotals() override;
Block getTotals() override;
Block getHeader() const override;
protected:

View File

@ -21,7 +21,7 @@ public:
String getName() const override
{
return "FilterColumnsBlockInputStream";
return "FilterColumns";
}
Block getHeader() const override;

View File

@ -141,66 +141,66 @@ static BlockOutputStreamPtr getOutputImpl(const String & name, WriteBuffer & buf
FormatSettingsJSON json_settings(settings.output_format_json_quote_64bit_integers, settings.output_format_json_quote_denormals);
if (name == "Native")
return std::make_shared<NativeBlockOutputStream>(buf);
return std::make_shared<NativeBlockOutputStream>(buf, 0, sample);
else if (name == "RowBinary")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<BinaryRowOutputStream>(buf));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<BinaryRowOutputStream>(buf), sample);
else if (name == "TabSeparated" || name == "TSV")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRowOutputStream>(buf, sample));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRowOutputStream>(buf, sample), sample);
else if (name == "TabSeparatedWithNames" || name == "TSVWithNames")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRowOutputStream>(buf, sample, true));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRowOutputStream>(buf, sample, true), sample);
else if (name == "TabSeparatedWithNamesAndTypes" || name == "TSVWithNamesAndTypes")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRowOutputStream>(buf, sample, true, true));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRowOutputStream>(buf, sample, true, true), sample);
else if (name == "TabSeparatedRaw" || name == "TSVRaw")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRawRowOutputStream>(buf, sample));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TabSeparatedRawRowOutputStream>(buf, sample), sample);
else if (name == "CSV")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<CSVRowOutputStream>(buf, sample));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<CSVRowOutputStream>(buf, sample), sample);
else if (name == "CSVWithNames")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<CSVRowOutputStream>(buf, sample, true));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<CSVRowOutputStream>(buf, sample, true), sample);
else if (name == "Pretty")
return std::make_shared<PrettyBlockOutputStream>(buf, false, settings.output_format_pretty_max_rows, context);
return std::make_shared<PrettyBlockOutputStream>(buf, sample, false, settings.output_format_pretty_max_rows, context);
else if (name == "PrettyCompact")
return std::make_shared<PrettyCompactBlockOutputStream>(buf, false, settings.output_format_pretty_max_rows, context);
return std::make_shared<PrettyCompactBlockOutputStream>(buf, sample, false, settings.output_format_pretty_max_rows, context);
else if (name == "PrettyCompactMonoBlock")
{
BlockOutputStreamPtr dst = std::make_shared<PrettyCompactBlockOutputStream>(buf, false, settings.output_format_pretty_max_rows, context);
BlockOutputStreamPtr dst = std::make_shared<PrettyCompactBlockOutputStream>(buf, sample, false, settings.output_format_pretty_max_rows, context);
auto res = std::make_shared<SquashingBlockOutputStream>(dst, settings.output_format_pretty_max_rows, 0);
res->disableFlush();
return res;
}
else if (name == "PrettySpace")
return std::make_shared<PrettySpaceBlockOutputStream>(buf, false, settings.output_format_pretty_max_rows, context);
return std::make_shared<PrettySpaceBlockOutputStream>(buf, sample, false, settings.output_format_pretty_max_rows, context);
else if (name == "PrettyNoEscapes")
return std::make_shared<PrettyBlockOutputStream>(buf, true, settings.output_format_pretty_max_rows, context);
return std::make_shared<PrettyBlockOutputStream>(buf, sample, true, settings.output_format_pretty_max_rows, context);
else if (name == "PrettyCompactNoEscapes")
return std::make_shared<PrettyCompactBlockOutputStream>(buf, true, settings.output_format_pretty_max_rows, context);
return std::make_shared<PrettyCompactBlockOutputStream>(buf, sample, true, settings.output_format_pretty_max_rows, context);
else if (name == "PrettySpaceNoEscapes")
return std::make_shared<PrettySpaceBlockOutputStream>(buf, true, settings.output_format_pretty_max_rows, context);
return std::make_shared<PrettySpaceBlockOutputStream>(buf, sample, true, settings.output_format_pretty_max_rows, context);
else if (name == "Vertical")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<VerticalRowOutputStream>(
buf, sample, settings.output_format_pretty_max_rows));
buf, sample, settings.output_format_pretty_max_rows), sample);
else if (name == "VerticalRaw")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<VerticalRawRowOutputStream>(
buf, sample, settings.output_format_pretty_max_rows));
buf, sample, settings.output_format_pretty_max_rows), sample);
else if (name == "Values")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<ValuesRowOutputStream>(buf));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<ValuesRowOutputStream>(buf), sample);
else if (name == "JSON")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONRowOutputStream>(
buf, sample, settings.output_format_write_statistics, json_settings));
buf, sample, settings.output_format_write_statistics, json_settings), sample);
else if (name == "JSONCompact")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONCompactRowOutputStream>(
buf, sample, settings.output_format_write_statistics, json_settings));
buf, sample, settings.output_format_write_statistics, json_settings), sample);
else if (name == "JSONEachRow")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<JSONEachRowRowOutputStream>(
buf, sample, json_settings));
buf, sample, json_settings), sample);
else if (name == "XML")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<XMLRowOutputStream>(buf, sample,
settings.output_format_write_statistics));
settings.output_format_write_statistics), sample);
else if (name == "TSKV")
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TSKVRowOutputStream>(buf, sample));
return std::make_shared<BlockOutputStreamFromRowOutputStream>(std::make_shared<TSKVRowOutputStream>(buf, sample), sample);
else if (name == "ODBCDriver")
return std::make_shared<ODBCDriverBlockOutputStream>(buf, sample);
else if (name == "Null")
return std::make_shared<NullBlockOutputStream>();
return std::make_shared<NullBlockOutputStream>(sample);
else
throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT);
}
@ -211,7 +211,7 @@ BlockOutputStreamPtr FormatFactory::getOutput(const String & name, WriteBuffer &
/** Materialization is needed, because formats can use the functions `IDataType`,
* which only work with full columns.
*/
return std::make_shared<MaterializingBlockOutputStream>(getOutputImpl(name, buf, sample, context));
return std::make_shared<MaterializingBlockOutputStream>(getOutputImpl(name, buf, materializeBlock(sample), context), sample);
}
}

View File

@ -13,6 +13,10 @@ namespace ErrorCodes
}
/** It's safe to access children without mutex as long as these methods are called before first call to read, readPrefix.
*/
String IBlockInputStream::getTreeID() const
{
std::stringstream s;
@ -64,7 +68,7 @@ void IBlockInputStream::dumpTree(std::ostream & ostr, size_t indent, size_t mult
ostr << String(indent, ' ') << getName();
if (multiplier > 1)
ostr << " × " << multiplier;
// ostr << ": " << getHeader().dumpStructure();
//ostr << ": " << getHeader().dumpStructure();
ostr << std::endl;
++indent;
@ -87,44 +91,5 @@ void IBlockInputStream::dumpTree(std::ostream & ostr, size_t indent, size_t mult
}
}
BlockInputStreams IBlockInputStream::getLeaves()
{
BlockInputStreams res;
getLeavesImpl(res, nullptr);
return res;
}
void IBlockInputStream::getLeafRowsBytes(size_t & rows, size_t & bytes)
{
BlockInputStreams leaves = getLeaves();
rows = 0;
bytes = 0;
for (BlockInputStreams::const_iterator it = leaves.begin(); it != leaves.end(); ++it)
{
if (const IProfilingBlockInputStream * profiling = dynamic_cast<const IProfilingBlockInputStream *>(&**it))
{
const BlockStreamProfileInfo & info = profiling->getProfileInfo();
rows += info.rows;
bytes += info.bytes;
}
}
}
void IBlockInputStream::getLeavesImpl(BlockInputStreams & res, const BlockInputStreamPtr & this_shared_ptr)
{
if (children.empty())
{
if (this_shared_ptr)
res.push_back(this_shared_ptr);
}
else
for (BlockInputStreams::iterator it = children.begin(); it != children.end(); ++it)
(*it)->getLeavesImpl(res, *it);
}
}

View File

@ -2,6 +2,7 @@
#include <vector>
#include <memory>
#include <mutex>
#include <functional>
#include <boost/noncopyable.hpp>
#include <Core/Block.h>
@ -89,18 +90,13 @@ public:
/// In case of isGroupedOutput or isSortedOutput, return corresponding SortDescription
virtual const SortDescription & getSortDescription() const { throw Exception("Output of " + getName() + " is not sorted", ErrorCodes::OUTPUT_IS_NOT_SORTED); }
BlockInputStreams & getChildren() { return children; }
/** Must be called before read, readPrefix.
*/
void dumpTree(std::ostream & ostr, size_t indent = 0, size_t multiplier = 1);
/// Get leaf sources (not including this one).
BlockInputStreams getLeaves();
/// Get the number of rows and bytes read in the leaf sources.
void getLeafRowsBytes(size_t & rows, size_t & bytes);
/** Check the depth of the pipeline.
* If max_depth is specified and the `depth` is greater - throw an exception.
* Must be called before read, readPrefix.
*/
size_t checkDepth(size_t max_depth) const;
@ -108,19 +104,26 @@ public:
*/
void addTableLock(const TableStructureReadLockPtr & lock) { table_locks.push_back(lock); }
protected:
TableStructureReadLocks table_locks;
template <typename F>
void forEachChild(F && f)
{
std::lock_guard lock(children_mutex);
for (auto & child : children)
if (f(*child))
return;
}
protected:
BlockInputStreams children;
std::mutex children_mutex;
private:
void getLeavesImpl(BlockInputStreams & res, const BlockInputStreamPtr & this_shared_ptr);
TableStructureReadLocks table_locks;
size_t checkDepthImpl(size_t max_depth, size_t level) const;
/** Get text that identifies this source and the entire subtree.
* Unlike getID - without taking into account the parameters.
*/
/// Get text with names of this source and the entire subtree.
String getTreeID() const;
};

View File

@ -4,12 +4,12 @@
#include <vector>
#include <memory>
#include <boost/noncopyable.hpp>
#include <Core/Block.h>
namespace DB
{
class Block;
struct Progress;
class TableStructureReadLock;
@ -26,6 +26,12 @@ class IBlockOutputStream : private boost::noncopyable
public:
IBlockOutputStream() {}
/** Get data structure of the stream in a form of "header" block (it is also called "sample block").
* Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values.
* You must pass blocks of exactly this structure to the 'write' method.
*/
virtual Block getHeader() const = 0;
/** Write block.
*/
virtual void write(const Block & block) = 0;
@ -60,7 +66,7 @@ public:
*/
void addTableLock(const TableStructureReadLockPtr & lock) { table_locks.push_back(lock); }
protected:
private:
TableStructureReadLocks table_locks;
};

View File

@ -5,6 +5,7 @@
#include <Interpreters/ProcessList.h>
#include <DataStreams/IProfilingBlockInputStream.h>
namespace DB
{
@ -15,6 +16,7 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
extern const int TOO_SLOW;
extern const int LOGICAL_ERROR;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
}
@ -25,7 +27,11 @@ IProfilingBlockInputStream::IProfilingBlockInputStream()
Block IProfilingBlockInputStream::read()
{
collectAndSendTotalRowsApprox();
if (total_rows_approx)
{
progressImpl(Progress(0, 0, total_rows_approx));
total_rows_approx = 0;
}
if (!info.started)
{
@ -62,7 +68,7 @@ Block IProfilingBlockInputStream::read()
/** If the thread is over, then we will ask all children to abort the execution.
* This makes sense when running a query with LIMIT
* - there is a situation when all the necessary data has already been read,
* but `children sources are still working,
* but children sources are still working,
* herewith they can work in separate threads or even remotely.
*/
cancel();
@ -70,6 +76,15 @@ Block IProfilingBlockInputStream::read()
progress(Progress(res.rows(), res.bytes()));
#ifndef NDEBUG
if (res)
{
Block header = getHeader();
if (header)
assertBlocksHaveEqualStructure(res, header, getName());
}
#endif
return res;
}
@ -78,15 +93,21 @@ void IProfilingBlockInputStream::readPrefix()
{
readPrefixImpl();
for (auto & child : children)
child->readPrefix();
forEachChild([&] (IBlockInputStream & child)
{
child.readPrefix();
return false;
});
}
void IProfilingBlockInputStream::readSuffix()
{
for (auto & child : children)
child->readSuffix();
forEachChild([&] (IBlockInputStream & child)
{
child.readSuffix();
return false;
});
readSuffixImpl();
}
@ -335,9 +356,11 @@ void IProfilingBlockInputStream::cancel()
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
for (auto & child : children)
if (IProfilingBlockInputStream * p_child = dynamic_cast<IProfilingBlockInputStream *>(&*child))
p_child->cancel();
forEachProfilingChild([] (IProfilingBlockInputStream & child)
{
child.cancel();
return false;
});
}
@ -345,9 +368,11 @@ void IProfilingBlockInputStream::setProgressCallback(const ProgressCallback & ca
{
progress_callback = callback;
for (auto & child : children)
if (IProfilingBlockInputStream * p_child = dynamic_cast<IProfilingBlockInputStream *>(&*child))
p_child->setProgressCallback(callback);
forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
child.setProgressCallback(callback);
return false;
});
}
@ -355,71 +380,44 @@ void IProfilingBlockInputStream::setProcessListElement(ProcessListElement * elem
{
process_list_elem = elem;
for (auto & child : children)
if (IProfilingBlockInputStream * p_child = dynamic_cast<IProfilingBlockInputStream *>(&*child))
p_child->setProcessListElement(elem);
forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
child.setProcessListElement(elem);
return false;
});
}
const Block & IProfilingBlockInputStream::getTotals()
Block IProfilingBlockInputStream::getTotals()
{
if (totals)
return totals;
for (auto & child : children)
Block res;
forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
if (IProfilingBlockInputStream * p_child = dynamic_cast<IProfilingBlockInputStream *>(&*child))
{
const Block & res = p_child->getTotals();
if (res)
return res;
}
}
return totals;
res = child.getTotals();
if (res)
return true;
return false;
});
return res;
}
const Block & IProfilingBlockInputStream::getExtremes() const
Block IProfilingBlockInputStream::getExtremes()
{
if (extremes)
return extremes;
for (const auto & child : children)
Block res;
forEachProfilingChild([&] (IProfilingBlockInputStream & child)
{
if (const IProfilingBlockInputStream * p_child = dynamic_cast<const IProfilingBlockInputStream *>(&*child))
{
const Block & res = p_child->getExtremes();
if (res)
return res;
}
}
return extremes;
}
void IProfilingBlockInputStream::collectTotalRowsApprox()
{
bool old_val = false;
if (!collected_total_rows_approx.compare_exchange_strong(old_val, true))
return;
for (auto & child : children)
{
if (IProfilingBlockInputStream * p_child = dynamic_cast<IProfilingBlockInputStream *>(&*child))
{
p_child->collectTotalRowsApprox();
total_rows_approx += p_child->total_rows_approx;
}
}
}
void IProfilingBlockInputStream::collectAndSendTotalRowsApprox()
{
if (collected_total_rows_approx)
return;
collectTotalRowsApprox();
progressImpl(Progress(0, 0, total_rows_approx));
res = child.getExtremes();
if (res)
return true;
return false;
});
return res;
}
}

View File

@ -26,6 +26,8 @@ using ProfilingBlockInputStreamPtr = std::shared_ptr<IProfilingBlockInputStream>
*/
class IProfilingBlockInputStream : public IBlockInputStream
{
friend struct BlockStreamProfileInfo;
public:
IProfilingBlockInputStream();
@ -56,10 +58,10 @@ public:
* Call this method only after all the data has been retrieved with `read`,
* otherwise there will be problems if any data at the same time is computed in another thread.
*/
virtual const Block & getTotals();
virtual Block getTotals();
/// The same for minimums and maximums.
const Block & getExtremes() const;
Block getExtremes();
/** Set the execution progress bar callback.
@ -98,7 +100,7 @@ public:
/** Set the approximate total number of rows to read.
*/
void setTotalRowsApprox(size_t value) { total_rows_approx = value; }
void addTotalRowsApprox(size_t value) { total_rows_approx += value; }
/** Ask to abort the receipt of data as soon as possible.
@ -180,15 +182,17 @@ protected:
Block totals;
/// Minimums and maximums. The first row of the block - minimums, the second - the maximums.
Block extremes;
/// The approximate total number of rows to read. For progress bar.
size_t total_rows_approx = 0;
void addChild(BlockInputStreamPtr & child)
{
std::lock_guard lock(children_mutex);
children.push_back(child);
}
private:
bool enabled_extremes = false;
/// Information about the approximate total number of rows is collected in the parent source.
std::atomic_bool collected_total_rows_approx {false};
/// The limit on the number of rows/bytes has been exceeded, and you need to stop execution on the next `read` call, as if the thread has run out.
bool limit_exceeded_need_break = false;
@ -199,6 +203,9 @@ private:
QuotaForIntervals * quota = nullptr; /// If nullptr - the quota is not used.
double prev_elapsed = 0;
/// The approximate total number of rows to read. For progress bar.
size_t total_rows_approx = 0;
/// The successors must implement this function.
virtual Block readImpl() = 0;
@ -217,13 +224,16 @@ private:
bool checkTimeLimits();
void checkQuota(Block & block);
/// Gather information about the approximate total number of rows from all children.
void collectTotalRowsApprox();
/** Send information about the approximate total number of rows to the progress bar.
* It is done so that sending occurs only in the upper stream.
*/
void collectAndSendTotalRowsApprox();
template <typename F>
void forEachProfilingChild(F && f)
{
std::lock_guard lock(children_mutex);
for (auto & child : children)
if (IProfilingBlockInputStream * p_child = dynamic_cast<IProfilingBlockInputStream *>(child.get()))
if (f(*p_child))
return;
}
};
}

View File

@ -43,7 +43,7 @@ InputStreamFromASTInsertQuery::InputStreamFromASTInsertQuery(
input_buffer_contacenated = std::make_unique<ConcatReadBuffer>(buffers);
res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out_sample, context.getSettings().max_insert_block_size);
res_stream = context.getInputFormat(format, *input_buffer_contacenated, streams.out->getHeader(), context.getSettings().max_insert_block_size);
}
}

View File

@ -27,12 +27,6 @@ public:
String getName() const override { return name; }
void cancel() override
{
std::lock_guard<std::mutex> lock(cancel_mutex);
IProfilingBlockInputStream::cancel();
}
Block getHeader() const override
{
return header;
@ -62,26 +56,7 @@ protected:
input->readPrefix();
{
std::lock_guard<std::mutex> lock(cancel_mutex);
/** TODO Data race here. See IProfilingBlockInputStream::collectAndSendTotalRowsApprox.
Assume following pipeline:
RemoteBlockInputStream
AsynchronousBlockInputStream
LazyBlockInputStream
RemoteBlockInputStream calls AsynchronousBlockInputStream::readPrefix
and AsynchronousBlockInputStream spawns a thread and returns.
The separate thread will call LazyBlockInputStream::read
LazyBlockInputStream::read will add more children to itself
In the same moment, in main thread, RemoteBlockInputStream::read is called,
then IProfilingBlockInputStream::collectAndSendTotalRowsApprox is called
and iterates over set of children.
*/
children.push_back(input);
addChild(input);
if (isCancelled() && p_input)
p_input->cancel();
@ -97,8 +72,6 @@ private:
Generator generator;
BlockInputStreamPtr input;
std::mutex cancel_mutex;
};
}

View File

@ -12,9 +12,10 @@ namespace DB
class MaterializingBlockOutputStream : public IBlockOutputStream
{
public:
MaterializingBlockOutputStream(const BlockOutputStreamPtr & output)
: output{output} {}
MaterializingBlockOutputStream(const BlockOutputStreamPtr & output, const Block & header)
: output{output}, header(header) {}
Block getHeader() const override { return header; }
void write(const Block & block) override { output->write(materializeBlock(block)); }
void flush() override { output->flush(); }
void writePrefix() override { output->writePrefix(); }
@ -27,6 +28,7 @@ public:
private:
BlockOutputStreamPtr output;
Block header;
};
}

View File

@ -34,29 +34,29 @@ static void removeConstantsFromBlock(Block & block)
}
}
static void removeConstantsFromSortDescription(const Block & sample_block, SortDescription & description)
static void removeConstantsFromSortDescription(const Block & header, SortDescription & description)
{
description.erase(std::remove_if(description.begin(), description.end(),
[&](const SortColumnDescription & elem)
{
if (!elem.column_name.empty())
return sample_block.getByName(elem.column_name).column->isColumnConst();
return header.getByName(elem.column_name).column->isColumnConst();
else
return sample_block.safeGetByPosition(elem.column_number).column->isColumnConst();
return header.safeGetByPosition(elem.column_number).column->isColumnConst();
}), description.end());
}
/** Add into block, whose constant columns was removed by previous function,
* constant columns from sample_block (which must have structure as before removal of constants from block).
* constant columns from header (which must have structure as before removal of constants from block).
*/
static void enrichBlockWithConstants(Block & block, const Block & sample_block)
static void enrichBlockWithConstants(Block & block, const Block & header)
{
size_t rows = block.rows();
size_t columns = sample_block.columns();
size_t columns = header.columns();
for (size_t i = 0; i < columns; ++i)
{
const auto & col_type_name = sample_block.getByPosition(i);
const auto & col_type_name = header.getByPosition(i);
if (col_type_name.column->isColumnConst())
block.insert(i, {col_type_name.column->cloneResized(rows), col_type_name.type, col_type_name.name});
}
@ -65,6 +65,12 @@ static void enrichBlockWithConstants(Block & block, const Block & sample_block)
Block MergeSortingBlockInputStream::readImpl()
{
if (!header)
{
header = getHeader();
removeConstantsFromSortDescription(header, description);
}
/** Algorithm:
* - read to memory blocks from source stream;
* - if too much of them and if external sorting is enabled,
@ -77,12 +83,6 @@ Block MergeSortingBlockInputStream::readImpl()
{
while (Block block = children.back()->read())
{
if (!sample_block)
{
sample_block = block.cloneEmpty();
removeConstantsFromSortDescription(sample_block, description);
}
/// If there were only const columns in sort description, then there is no need to sort.
/// Return the blocks as is.
if (description.empty())
@ -103,7 +103,7 @@ Block MergeSortingBlockInputStream::readImpl()
const std::string & path = temporary_files.back()->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeBlockOutputStream block_out(compressed_buf);
NativeBlockOutputStream block_out(compressed_buf, 0, block.cloneEmpty());
MergeSortingBlocksBlockInputStream block_in(blocks, description, max_merged_block_size, limit);
LOG_INFO(log, "Sorting and writing part of data into temporary file " + path);
@ -148,7 +148,7 @@ Block MergeSortingBlockInputStream::readImpl()
Block res = impl->read();
if (res)
enrichBlockWithConstants(res, sample_block);
enrichBlockWithConstants(res, header);
return res;
}

View File

@ -108,7 +108,7 @@ private:
/// Before operation, will remove constant columns from blocks. And after, place constant columns back.
/// (to avoid excessive virtual function calls and because constants cannot be serialized in Native format for temporary files)
/// Save original block structure here.
Block sample_block;
Block header;
/// Everything below is for external sorting.
std::vector<std::unique_ptr<Poco::TemporaryFile>> temporary_files;

View File

@ -219,10 +219,10 @@ Block MergingAggregatedMemoryEfficientBlockInputStream::readImpl()
parallel_merge_data->merged_blocks_changed.wait(lock, [this]
{
return parallel_merge_data->finish /// Requested to finish early.
|| parallel_merge_data->exception /// An error in merging thread.
|| parallel_merge_data->exhausted /// No more data in sources.
|| !parallel_merge_data->merged_blocks.empty(); /// Have another merged block.
return parallel_merge_data->finish /// Requested to finish early.
|| parallel_merge_data->exception /// An error in merging thread.
|| parallel_merge_data->exhausted /// No more data in sources.
|| !parallel_merge_data->merged_blocks.empty(); /// Have another merged block.
});
if (parallel_merge_data->exception)
@ -493,7 +493,7 @@ MergingAggregatedMemoryEfficientBlockInputStream::BlocksToMerge MergingAggregate
while (true)
{
if (current_bucket_num == NUM_BUCKETS)
if (current_bucket_num >= NUM_BUCKETS)
{
/// All ordinary data was processed. Maybe, there are also 'overflows'-blocks.
// std::cerr << "at end\n";

View File

@ -84,7 +84,7 @@ protected:
Block readImpl() override;
private:
static constexpr size_t NUM_BUCKETS = 256;
static constexpr int NUM_BUCKETS = 256;
Aggregator aggregator;
bool final;

View File

@ -11,7 +11,6 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int BLOCKS_HAVE_DIFFERENT_STRUCTURE;
}
@ -92,19 +91,7 @@ void MergingSortedBlockInputStream::init(Block & header, MutableColumns & merged
if (!*shared_block_ptr)
continue;
size_t src_columns = shared_block_ptr->columns();
size_t dst_columns = header.columns();
if (src_columns != dst_columns)
throw Exception("Merging blocks have different number of columns ("
+ toString(src_columns) + " and " + toString(dst_columns) + ")",
ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
for (size_t i = 0; i < src_columns; ++i)
if (!blocksHaveEqualStructure(*shared_block_ptr, header))
throw Exception("Merging blocks have different names or types of columns:\n"
+ shared_block_ptr->dumpStructure() + "\nand\n" + header.dumpStructure(),
ErrorCodes::BLOCKS_HAVE_DIFFERENT_STRUCTURE);
assertBlocksHaveEqualStructure(*shared_block_ptr, header, getName());
}
merged_columns.resize(num_columns);

View File

@ -52,7 +52,7 @@ NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server
for (const auto & column : index_block_it->columns)
{
auto type = DataTypeFactory::instance().get(column.type);
header.insert({ type->createColumn(), type, column.name });
header.insert(ColumnWithTypeAndName{ type, column.name });
}
}

View File

@ -20,9 +20,9 @@ namespace ErrorCodes
NativeBlockOutputStream::NativeBlockOutputStream(
WriteBuffer & ostr_, UInt64 client_revision_,
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_,
WriteBuffer * index_ostr_, size_t initial_size_of_file_)
: ostr(ostr_), client_revision(client_revision_),
: ostr(ostr_), client_revision(client_revision_), header(header_),
index_ostr(index_ostr_), initial_size_of_file(initial_size_of_file_)
{
if (index_ostr)

View File

@ -23,9 +23,10 @@ public:
/** If non-zero client_revision is specified, additional block information can be written.
*/
NativeBlockOutputStream(
WriteBuffer & ostr_, UInt64 client_revision_ = 0,
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_,
WriteBuffer * index_ostr_ = nullptr, size_t initial_size_of_file_ = 0);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void flush() override;
@ -36,7 +37,7 @@ public:
private:
WriteBuffer & ostr;
UInt64 client_revision;
Block header;
WriteBuffer * index_ostr;
size_t initial_size_of_file; /// The initial size of the data file, if `append` done. Used for the index.
/// If you need to write index, then `ostr` must be a CompressedWriteBuffer.

View File

@ -11,7 +11,12 @@ namespace DB
class NullBlockOutputStream : public IBlockOutputStream
{
public:
NullBlockOutputStream(const Block & header) : header(header) {}
Block getHeader() const override { return header; }
void write(const Block &) override {}
private:
Block header;
};
}

View File

@ -1,122 +0,0 @@
#include <DataStreams/NullableAdapterBlockInputStream.h>
#include <Columns/ColumnNullable.h>
#include <Columns/ColumnsCommon.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN;
extern const int TYPE_MISMATCH;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
}
NullableAdapterBlockInputStream::NullableAdapterBlockInputStream(
const BlockInputStreamPtr & input,
const Block & in_sample_, const Block & out_sample_)
: header(out_sample_)
{
buildActions(in_sample_, out_sample_);
children.push_back(input);
}
Block NullableAdapterBlockInputStream::readImpl()
{
Block block = children.back()->read();
if (!block && !must_transform)
return block;
Block res;
size_t s = block.columns();
for (size_t i = 0; i < s; ++i)
{
const auto & elem = block.getByPosition(i);
switch (actions[i])
{
case TO_ORDINARY:
{
const auto & nullable_col = static_cast<const ColumnNullable &>(*elem.column);
const auto & nullable_type = static_cast<const DataTypeNullable &>(*elem.type);
const auto & null_map = nullable_col.getNullMapData();
bool has_nulls = !memoryIsZero(null_map.data(), null_map.size());
if (has_nulls)
throw Exception{"Cannot insert NULL value into non-nullable column",
ErrorCodes::CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN};
else
res.insert({
nullable_col.getNestedColumnPtr(),
nullable_type.getNestedType(),
rename[i].value_or(elem.name)
});
break;
}
case TO_NULLABLE:
{
ColumnPtr null_map = ColumnUInt8::create(elem.column->size(), 0);
res.insert({
ColumnNullable::create(elem.column, null_map),
std::make_shared<DataTypeNullable>(elem.type),
rename[i].value_or(elem.name)});
break;
}
case NONE:
{
if (rename[i])
res.insert({elem.column, elem.type, *rename[i]});
else
res.insert(elem);
break;
}
}
}
return res;
}
void NullableAdapterBlockInputStream::buildActions(
const Block & in_sample,
const Block & out_sample)
{
size_t in_size = in_sample.columns();
if (out_sample.columns() != in_size)
throw Exception("Number of columns in INSERT SELECT doesn't match", ErrorCodes::NUMBER_OF_COLUMNS_DOESNT_MATCH);
actions.reserve(in_size);
rename.reserve(in_size);
for (size_t i = 0; i < in_size; ++i)
{
const auto & in_elem = in_sample.getByPosition(i);
const auto & out_elem = out_sample.getByPosition(i);
bool is_in_nullable = in_elem.type->isNullable();
bool is_out_nullable = out_elem.type->isNullable();
if (is_in_nullable && !is_out_nullable)
actions.push_back(TO_ORDINARY);
else if (!is_in_nullable && is_out_nullable)
actions.push_back(TO_NULLABLE);
else
actions.push_back(NONE);
if (in_elem.name != out_elem.name)
rename.emplace_back(std::make_optional(out_elem.name));
else
rename.emplace_back();
if (actions.back() != NONE || rename.back())
must_transform = true;
}
}
}

View File

@ -1,60 +0,0 @@
#pragma once
#include <DataStreams/IProfilingBlockInputStream.h>
#include <optional>
namespace DB
{
/// This stream allows perfoming INSERT requests in which the types of
/// the target and source blocks are compatible up to nullability:
///
/// - if a target column is nullable while the corresponding source
/// column is not, we embed the source column into a nullable column;
/// - if a source column is nullable while the corresponding target
/// column is not, we extract the nested column from the source
/// while checking that is doesn't actually contain NULLs;
/// - otherwise we just perform an identity mapping.
class NullableAdapterBlockInputStream : public IProfilingBlockInputStream
{
public:
NullableAdapterBlockInputStream(const BlockInputStreamPtr & input, const Block & in_sample_, const Block & out_sample_);
String getName() const override { return "NullableAdapterBlockInputStream"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
private:
/// Given a column of a block we have just read,
/// how must we process it?
enum Action
{
/// Do nothing.
NONE = 0,
/// Convert nullable column to ordinary column.
TO_ORDINARY,
/// Convert non-nullable column to nullable column.
TO_NULLABLE
};
/// Actions to be taken for each column of a block.
using Actions = std::vector<Action>;
private:
/// Determine the actions to be taken using the source sample block,
/// which describes the columns from which we fetch data inside an INSERT
/// query, and the target sample block which contains the columns
/// we insert data into.
void buildActions(const Block & in_sample, const Block & out_sample);
private:
Block header;
Actions actions;
std::vector<std::optional<String>> rename;
bool must_transform = false;
};
}

View File

@ -7,9 +7,8 @@
namespace DB
{
ODBCDriverBlockOutputStream::ODBCDriverBlockOutputStream(WriteBuffer & out_, const Block & sample_)
: out(out_)
, sample(sample_)
ODBCDriverBlockOutputStream::ODBCDriverBlockOutputStream(WriteBuffer & out_, const Block & header_)
: out(out_), header(header_)
{
}
@ -43,7 +42,7 @@ void ODBCDriverBlockOutputStream::write(const Block & block)
void ODBCDriverBlockOutputStream::writePrefix()
{
const size_t columns = sample.columns();
const size_t columns = header.columns();
/// Number of columns.
writeVarUInt(columns, out);
@ -51,7 +50,7 @@ void ODBCDriverBlockOutputStream::writePrefix()
/// Names and types of columns.
for (size_t i = 0; i < columns; ++i)
{
const ColumnWithTypeAndName & col = sample.getByPosition(i);
const ColumnWithTypeAndName & col = header.getByPosition(i);
writeStringBinary(col.name, out);
writeStringBinary(col.type->getName(), out);

View File

@ -19,8 +19,9 @@ class WriteBuffer;
class ODBCDriverBlockOutputStream : public IBlockOutputStream
{
public:
ODBCDriverBlockOutputStream(WriteBuffer & out_, const Block & sample_);
ODBCDriverBlockOutputStream(WriteBuffer & out_, const Block & header_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void writePrefix() override;
@ -29,7 +30,7 @@ public:
private:
WriteBuffer & out;
const Block sample;
const Block header;
};
}

View File

@ -100,7 +100,8 @@ Block ParallelAggregatingBlockInputStream::readImpl()
ParallelAggregatingBlockInputStream::TemporaryFileStream::TemporaryFileStream(const std::string & path)
: file_in(path), compressed_in(file_in), block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) {}
: file_in(path), compressed_in(file_in),
block_in(std::make_shared<NativeBlockInputStream>(compressed_in, ClickHouseRevision::get())) {}

View File

@ -17,8 +17,9 @@ namespace ErrorCodes
}
PrettyBlockOutputStream::PrettyBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_, size_t max_rows_, const Context & context_)
: ostr(ostr_), max_rows(max_rows_), no_escapes(no_escapes_), context(context_)
PrettyBlockOutputStream::PrettyBlockOutputStream(
WriteBuffer & ostr_, const Block & header_, bool no_escapes_, size_t max_rows_, const Context & context_)
: ostr(ostr_), header(header_), max_rows(max_rows_), no_escapes(no_escapes_), context(context_)
{
struct winsize w;
if (0 == ioctl(STDOUT_FILENO, TIOCGWINSZ, &w))

View File

@ -17,8 +17,9 @@ class PrettyBlockOutputStream : public IBlockOutputStream
{
public:
/// no_escapes - do not use ANSI escape sequences - to display in the browser, not in the console.
PrettyBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_, size_t max_rows_, const Context & context_);
PrettyBlockOutputStream(WriteBuffer & ostr_, const Block & header_, bool no_escapes_, size_t max_rows_, const Context & context_);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void writeSuffix() override;
@ -32,6 +33,7 @@ protected:
void writeExtremes();
WriteBuffer & ostr;
const Block header;
size_t max_rows;
size_t total_rows = 0;
size_t terminal_width = 0;

View File

@ -11,8 +11,8 @@ namespace DB
class PrettyCompactBlockOutputStream : public PrettyBlockOutputStream
{
public:
PrettyCompactBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_, size_t max_rows_, const Context & context_)
: PrettyBlockOutputStream(ostr_, no_escapes_, max_rows_, context_) {}
PrettyCompactBlockOutputStream(WriteBuffer & ostr_, const Block & header_, bool no_escapes_, size_t max_rows_, const Context & context_)
: PrettyBlockOutputStream(ostr_, header_, no_escapes_, max_rows_, context_) {}
void write(const Block & block) override;

View File

@ -11,8 +11,8 @@ namespace DB
class PrettySpaceBlockOutputStream : public PrettyBlockOutputStream
{
public:
PrettySpaceBlockOutputStream(WriteBuffer & ostr_, bool no_escapes_, size_t max_rows_, const Context & context_)
: PrettyBlockOutputStream(ostr_, no_escapes_, max_rows_, context_) {}
PrettySpaceBlockOutputStream(WriteBuffer & ostr_, const Block & header_, bool no_escapes_, size_t max_rows_, const Context & context_)
: PrettyBlockOutputStream(ostr_, header_, no_escapes_, max_rows_, context_) {}
void write(const Block & block) override;
void writeSuffix() override;

View File

@ -1,23 +0,0 @@
#include <Core/Block.h>
#include <DataStreams/ProhibitColumnsBlockOutputStream.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
}
void ProhibitColumnsBlockOutputStream::write(const Block & block)
{
for (const auto & column : columns)
if (block.has(column.name))
throw Exception{"Cannot insert column " + column.name, ErrorCodes::ILLEGAL_COLUMN};
output->write(block);
}
}

View File

@ -1,31 +0,0 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <Core/NamesAndTypes.h>
namespace DB
{
/// Throws exception on encountering prohibited column in block
class ProhibitColumnsBlockOutputStream : public IBlockOutputStream
{
public:
ProhibitColumnsBlockOutputStream(const BlockOutputStreamPtr & output, const NamesAndTypesList & columns)
: output{output}, columns{columns}
{
}
private:
void write(const Block & block) override;
void flush() override { output->flush(); }
void writePrefix() override { output->writePrefix(); }
void writeSuffix() override { output->writeSuffix(); }
BlockOutputStreamPtr output;
NamesAndTypesList columns;
};
}

Some files were not shown because too many files have changed in this diff Show More