Merged with master.

This commit is contained in:
Nikolai Kochetov 2018-08-20 15:25:06 +03:00
commit 391b4b5927
853 changed files with 36705 additions and 3153 deletions

1
.gitignore vendored
View File

@ -178,7 +178,6 @@ utils/zookeeper-create-entry-to-download-part/zookeeper-create-entry-to-download
utils/zookeeper-dump-tree/zookeeper-dump-tree
utils/zookeeper-remove-by-list/zookeeper-remove-by-list
dbms/src/Storages/tests/remove_symlink_directory
dbms/tests/queries/1_stateful
debian/control
debian/copyright
debian/tmp/

2
.gitmodules vendored
View File

@ -15,7 +15,7 @@
url = https://github.com/google/cctz.git
[submodule "contrib/zlib-ng"]
path = contrib/zlib-ng
url = https://github.com/Dead2/zlib-ng.git
url = https://github.com/ClickHouse-Extras/zlib-ng.git
[submodule "contrib/googletest"]
path = contrib/googletest
url = https://github.com/google/googletest.git

View File

@ -3,26 +3,6 @@ language: generic
matrix:
fast_finish: true
include:
# - os: linux
#
# cache:
# ccache: true
# timeout: 1000
#
# addons:
# apt:
# update: true
# sources:
# - ubuntu-toolchain-r-test
# packages: [ g++-7, libicu-dev, libreadline-dev, libmysqlclient-dev, unixodbc-dev, libltdl-dev, libssl-dev, libboost-dev, zlib1g-dev, libdouble-conversion-dev, libsparsehash-dev, librdkafka-dev, libcapnp-dev, libsparsehash-dev, libgoogle-perftools-dev, bash, expect, python, python-lxml, python-termcolor, curl, perl, sudo, openssl ]
#
# env:
# - MATRIX_EVAL="export CC=gcc-7 CXX=g++-7"
#
# script:
# - env TEST_RUN= utils/travis/normal.sh
# We need to have gcc7 headers to compile c++17 code on clang
- os: linux
@ -46,28 +26,6 @@ matrix:
script:
- utils/travis/normal.sh
# TODO: fix internal compiler
# - os: linux
#
# sudo: required
#
# cache:
# timeout: 1000
# directories:
# - /var/cache/pbuilder/ccache
#
# addons:
# apt:
# packages: [ pbuilder, fakeroot, debhelper ]
#
# env:
# - MATRIX_EVAL="export DEB_CC=clang-5.0 DEB_CXX=clang++-5.0"
#
# script:
# - utils/travis/pbuilder.sh
- os: linux
sudo: required
@ -85,69 +43,6 @@ matrix:
script:
- utils/travis/pbuilder.sh
# - os: linux
#
# sudo: required
#
# cache:
# timeout: 1000
# directories:
# - /var/cache/pbuilder/ccache
#
# addons:
# apt:
# update: true
# packages: [ pbuilder, fakeroot, debhelper ]
#
# env:
# - MATRIX_EVAL="export ARCH=i386"
#
# script:
# - env PBUILDER_TIMEOUT=40m TEST_TRUE=true TEST_RUN= utils/travis/pbuilder.sh
# TODO: Can't bootstrap bionic on trusty host
# - os: linux
#
# sudo: required
#
# cache:
# timeout: 1000
# directories:
# - /var/cache/pbuilder/ccache
#
# addons:
# apt:
# update: true
# packages: [ pbuilder, fakeroot, debhelper ]
#
# env:
# - MATRIX_EVAL="export DEB_CC=clang-6.0 DEB_CXX=clang++-6.0 DIST=bionic EXTRAPACKAGES='clang-6.0 lld-6.0'"
#
# script:
# - utils/travis/pbuilder.sh
# Cant fit to time limit (48min)
# - os: osx
# osx_image: xcode9.2
#
# cache:
# ccache: true
# timeout: 1000
#
# before_install:
# - brew install unixodbc gcc ccache libtool gettext zlib readline double-conversion gperftools google-sparsehash lz4 zstd || true
# - brew link --overwrite gcc || true
#
# env:
# - MATRIX_EVAL="export CC=gcc-8 CXX=g++-8"
#
# script:
# - env CMAKE_FLAGS="-DUSE_INTERNAL_BOOST_LIBRARY=1" utils/travis/normal.sh
allow_failures:
- os: osx

View File

@ -1 +1,31 @@
## RU
## ClickHouse release 18.10.3, 2018-08-13
### Новые возможности:
* поддержка межсерверной репликации по HTTPS
* MurmurHash
* ODBCDriver2 с поддержкой NULL-ов
* поддержка UUID в ключевых колонках (экспериментально)
### Улучшения:
* добавлена поддержка SETTINGS для движка Kafka
* поддежка пустых кусков после мержей в движках Summing, Collapsing and VersionedCollapsing
* удаление старых записей о полностью выполнившихся мутациях
* исправлена логика REPLACE PARTITION для движка RplicatedMergeTree
* добавлена системная таблица system.merge_tree_settings
* в системную таблицу system.tables добавлены столбцы зависимостей: dependencies_database и dependencies_table
* заменен аллокатор, теперь используется jemalloc вместо tcmalloc
* улучшена валидация connection string ODBC
* удалена поддержка CHECK TABLE для распределенных таблиц
* добавлены stateful тесты (пока без данных)
* добавлена опция конфига max_partition_size_to_drop
* добавлена настройка output_format_json_escape_slashes
* добавлена настройка max_fetch_partition_retries_count
* добавлена настройка prefer_localhost_replica
* добавлены libressl, unixodbc и mariadb-connector-c как сабмодули
### Исправление ошибок:
* #2786
* #2777
* #2795

View File

@ -36,6 +36,10 @@
* Fixed a bug in the `anyHeavy` aggregate function ([a2101df2](https://github.com/yandex/ClickHouse/commit/a2101df25a6a0fba99aa71f8793d762af2b801ee))
* Fixed server crash when using the `countArray()` aggregate function.
### Backward incompatible changes:
* Parameters for `Kafka` engine was changed from `Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format[, kafka_schema, kafka_num_consumers])` to `Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])`. If your tables use `kafka_schema` or `kafka_num_consumers` parameters, you have to manually edit the metadata files `path/metadata/database/table.sql` and add `kafka_row_delimiter` parameter with `''` value.
## ClickHouse release 18.1.0, 2018-07-23
### New features:

View File

@ -43,6 +43,10 @@
* Исправлена ошибка в агрегатной функции `anyHeavy` ([a2101df2](https://github.com/yandex/ClickHouse/commit/a2101df25a6a0fba99aa71f8793d762af2b801ee))
* Исправлено падение сервера при использовании функции `countArray()`.
### Обратно несовместимые изменения:
* Список параметров для таблиц `Kafka` был изменён с `Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format[, kafka_schema, kafka_num_consumers])` на `Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])`. Если вы использовали параметры `kafka_schema` или `kafka_num_consumers`, вам необходимо вручную отредактировать файлы с метаданными `path/metadata/database/table.sql`, добавив параметр `kafka_row_delimiter` со значением `''` в соответствующее место.
## ClickHouse release 18.1.0, 2018-07-23

View File

@ -1,5 +1,5 @@
project (ClickHouse)
cmake_minimum_required (VERSION 2.8)
cmake_minimum_required (VERSION 3.3)
set(CMAKE_MODULE_PATH ${CMAKE_MODULE_PATH} "${ClickHouse_SOURCE_DIR}/cmake/Modules/")
@ -34,10 +34,9 @@ endif ()
string(TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE_UC)
message (STATUS "CMAKE_BUILD_TYPE: " ${CMAKE_BUILD_TYPE} )
# ASan - build type with address sanitizer
# UBSan - build type with undefined behaviour sanitizer
# TSan is not supported due to false positive errors in libstdc++ and necessity to rebuild libstdc++ with TSan
set (CMAKE_CONFIGURATION_TYPES "RelWithDebInfo;Debug;Release;MinSizeRel;ASan;UBSan" CACHE STRING "" FORCE)
set (CMAKE_CONFIGURATION_TYPES "RelWithDebInfo;Debug;Release;MinSizeRel" CACHE STRING "" FORCE)
include (cmake/sanitize.cmake)
include (cmake/arch.cmake)
@ -61,12 +60,8 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set (COMMON_WARNING_FLAGS "${COMMON_WARNING_FLAGS} -Wno-unused-command-line-argument")
endif ()
if (ARCH_LINUX)
set (CXX11_ABI "ENABLE" CACHE STRING "Use C++11 ABI: DEFAULT, ENABLE, DISABLE")
endif ()
option (TEST_COVERAGE "Enables flags for test coverage" OFF)
option (ENABLE_TESTS "Enables tests" ${NOT_MSVC})
option (ENABLE_TESTS "Enables tests" ON)
option (USE_STATIC_LIBRARIES "Set to FALSE to use shared libraries" ON)
option (MAKE_STATIC_LIBRARIES "Set to FALSE to make shared libraries" ${USE_STATIC_LIBRARIES})
@ -86,7 +81,7 @@ endif ()
if (CMAKE_LIBRARY_ARCHITECTURE MATCHES "amd64.*|x86_64.*|AMD64.*")
option (USE_INTERNAL_MEMCPY "Use internal implementation of 'memcpy' function instead of provided by libc. Only for x86_64." ON)
if (ARCH_LINUX)
if (OS_LINUX)
option (GLIBC_COMPATIBILITY "Set to TRUE to enable compatibility with older glibc libraries. Only for x86_64, Linux. Implies USE_INTERNAL_MEMCPY." ON)
endif()
endif ()
@ -95,15 +90,7 @@ if (GLIBC_COMPATIBILITY)
set (USE_INTERNAL_MEMCPY ON)
endif ()
if (CXX11_ABI STREQUAL ENABLE)
set (CXX11_ABI_FLAGS "-D_GLIBCXX_USE_CXX11_ABI=1")
elseif (CXX11_ABI STREQUAL DISABLE)
set (CXX11_ABI_FLAGS "-D_GLIBCXX_USE_CXX11_ABI=0")
else ()
set (CXX11_ABI_FLAGS "")
endif ()
set (COMPILER_FLAGS "${COMPILER_FLAGS} ${CXX11_ABI_FLAGS}")
set (COMPILER_FLAGS "${COMPILER_FLAGS}")
string(REGEX MATCH "-?[0-9]+(.[0-9]+)?$" COMPILER_POSTFIX ${CMAKE_CXX_COMPILER})
@ -150,17 +137,17 @@ else ()
endif ()
set (CMAKE_BUILD_COLOR_MAKEFILE ON)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${COMPILER_FLAGS} ${PLATFORM_EXTRA_CXX_FLAG} -fno-omit-frame-pointer ${COMMON_WARNING_FLAGS} ${CXX_WARNING_FLAGS} ${GLIBC_COMPATIBILITY_COMPILE_FLAGS}")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${COMPILER_FLAGS} ${PLATFORM_EXTRA_CXX_FLAG} -fno-omit-frame-pointer ${COMMON_WARNING_FLAGS} ${CXX_WARNING_FLAGS}")
#set (CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} ${CMAKE_CXX_FLAGS_ADD}")
set (CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -O3 ${CMAKE_CXX_FLAGS_ADD}")
set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g3 -ggdb3 -fno-inline ${CMAKE_CXX_FLAGS_ADD}")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${COMPILER_FLAGS} -fno-omit-frame-pointer ${COMMON_WARNING_FLAGS} ${GLIBC_COMPATIBILITY_COMPILE_FLAGS} ${CMAKE_C_FLAGS_ADD}")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${COMPILER_FLAGS} -fno-omit-frame-pointer ${COMMON_WARNING_FLAGS} ${CMAKE_C_FLAGS_ADD}")
#set (CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} ${CMAKE_C_FLAGS_ADD}")
set (CMAKE_C_FLAGS_RELWITHDEBINFO "${CMAKE_C_FLAGS_RELWITHDEBINFO} -O3 ${CMAKE_C_FLAGS_ADD}")
set (CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0 -g3 -ggdb3 -fno-inline ${CMAKE_C_FLAGS_ADD}")
if (MAKE_STATIC_LIBRARIES AND NOT APPLE AND NOT (CMAKE_CXX_COMPILER_ID STREQUAL "Clang" AND ARCH_FREEBSD))
if (MAKE_STATIC_LIBRARIES AND NOT APPLE AND NOT (CMAKE_CXX_COMPILER_ID STREQUAL "Clang" AND OS_FREEBSD))
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libgcc -static-libstdc++")
# Along with executables, we also build example of shared library for "library dictionary source"; and it also should be self-contained.
@ -171,8 +158,8 @@ set(THREADS_PREFER_PTHREAD_FLAG ON)
include (cmake/test_compiler.cmake)
if (ARCH_LINUX AND CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${GLIBC_COMPATIBILITY_LINK_FLAGS} ${CXX11_ABI_FLAGS}")
if (OS_LINUX AND CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS}")
option (USE_LIBCXX "Use libc++ and libc++abi instead of libstdc++ (only make sense on Linux with Clang)" ${HAVE_LIBCXX})
set (LIBCXX_PATH "" CACHE STRING "Use custom path for libc++. It should be used for MSan.")
@ -187,7 +174,7 @@ if (ARCH_LINUX AND CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
endif ()
if (LIBCXX_PATH)
# include_directories (BEFORE SYSTEM "${LIBCXX_PATH}/include" "${LIBCXX_PATH}/include/c++/v1")
# include_directories (SYSTEM BEFORE "${LIBCXX_PATH}/include" "${LIBCXX_PATH}/include/c++/v1")
link_directories ("${LIBCXX_PATH}/lib")
endif ()
endif ()
@ -202,8 +189,6 @@ if (NOT MAKE_STATIC_LIBRARIES)
set(CMAKE_POSITION_INDEPENDENT_CODE ON)
endif ()
include (cmake/sanitize.cmake)
# Using "include-what-you-use" tool.
option (USE_INCLUDE_WHAT_YOU_USE "Use 'include-what-you-use' tool" OFF)
if (USE_INCLUDE_WHAT_YOU_USE)
@ -233,14 +218,14 @@ else ()
set (CLICKHOUSE_ETC_DIR "${CMAKE_INSTALL_PREFIX}/etc")
endif ()
option (UNBUNDLED "Try find all libraries in system (if fail - use bundled from contrib/)" OFF)
option (UNBUNDLED "Try find all libraries in system. We recommend to avoid this mode for production builds, because we cannot guarantee exact versions and variants of libraries your system has installed. This mode exists for enthusiastic developers who search for trouble. Also it is useful for maintainers of OS packages." OFF)
if (UNBUNDLED)
set(NOT_UNBUNDLED 0)
else ()
set(NOT_UNBUNDLED 1)
endif ()
# Using system libs can cause lot of warnings in includes.
if (UNBUNDLED OR NOT (ARCH_LINUX OR APPLE) OR ARCH_32)
if (UNBUNDLED OR NOT (OS_LINUX OR APPLE) OR ARCH_32)
option (NO_WERROR "Disable -Werror compiler option" ON)
endif ()
@ -249,19 +234,15 @@ message (STATUS "Building for: ${CMAKE_SYSTEM} ${CMAKE_SYSTEM_PROCESSOR} ${CMAKE
include(GNUInstallDirs)
include (cmake/find_ssl.cmake)
if (NOT OPENSSL_FOUND)
message (FATAL_ERROR "Need openssl for build. debian tip: sudo apt install libssl-dev")
endif ()
include (cmake/lib_name.cmake)
include (cmake/find_icu4c.cmake)
include (cmake/find_boost.cmake)
# openssl, zlib before poco
include (cmake/find_zlib.cmake)
include (cmake/find_zstd.cmake)
include (cmake/find_ltdl.cmake) # for odbc
include (cmake/find_termcap.cmake)
include (cmake/find_odbc.cmake)
# openssl, zlib, odbc before poco
include (cmake/find_poco.cmake)
include (cmake/find_lz4.cmake)
include (cmake/find_sparsehash.cmake)

View File

@ -3,7 +3,6 @@ set -e -x
source default-config
./install-os-packages.sh libssl-dev
./install-os-packages.sh libicu-dev
./install-os-packages.sh libreadline-dev

View File

@ -43,9 +43,6 @@ case $PACKAGE_MANAGER in
jq)
$SUDO apt-get install -y jq
;;
libssl-dev)
$SUDO apt-get install -y libssl-dev
;;
libicu-dev)
$SUDO apt-get install -y libicu-dev
;;
@ -91,9 +88,6 @@ case $PACKAGE_MANAGER in
jq)
$SUDO yum install -y jq
;;
libssl-dev)
$SUDO yum install -y openssl-devel
;;
libicu-dev)
$SUDO yum install -y libicu-devel
;;
@ -133,9 +127,6 @@ case $PACKAGE_MANAGER in
jq)
$SUDO pkg install -y jq
;;
libssl-dev)
$SUDO pkg install -y openssl
;;
libicu-dev)
$SUDO pkg install -y icu
;;

View File

@ -21,7 +21,7 @@ BUILD_TARGETS=clickhouse
BUILD_TYPE=Debug
ENABLE_EMBEDDED_COMPILER=0
CMAKE_FLAGS="-D CMAKE_C_FLAGS_ADD=-g0 -D CMAKE_CXX_FLAGS_ADD=-g0 -D ENABLE_TCMALLOC=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_UNWIND=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_POCO_ODBC=0 -D ENABLE_ODBC=0 -D ENABLE_MYSQL=0"
CMAKE_FLAGS="-D CMAKE_C_FLAGS_ADD=-g0 -D CMAKE_CXX_FLAGS_ADD=-g0 -D ENABLE_JEMALLOC=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_UNWIND=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_POCO_ODBC=0 -D ENABLE_ODBC=0 -D ENABLE_MYSQL=0"
[[ $(uname) == "FreeBSD" ]] && COMPILER_PACKAGE_VERSION=devel && export COMPILER_PATH=/usr/local/bin

View File

@ -15,8 +15,8 @@
# ODBC_LIBRARIES, the libraries to link against to use ODBC
# ODBC_FOUND. If false, you cannot build anything that requires ODBC.
option (ENABLE_ODBC "Enable ODBC" ${ARCH_LINUX})
if (ARCH_LINUX)
option (ENABLE_ODBC "Enable ODBC" ${OS_LINUX})
if (OS_LINUX)
option (USE_INTERNAL_ODBC_LIBRARY "Set to FALSE to use system odbc library instead of bundled" ${NOT_UNBUNDLED})
else ()
option (USE_INTERNAL_ODBC_LIBRARY "Set to FALSE to use system odbc library instead of bundled" OFF)

View File

@ -7,16 +7,16 @@ endif ()
if (CMAKE_LIBRARY_ARCHITECTURE MATCHES "i386")
set (ARCH_I386 1)
endif ()
if ( ( ARCH_ARM AND NOT ARCH_AARCH64 ) OR ARCH_I386)
if ((ARCH_ARM AND NOT ARCH_AARCH64) OR ARCH_I386)
set (ARCH_32 1)
message (WARNING "Support for 32bit platforms is highly experimental")
message (FATAL_ERROR "32bit platforms are not supported")
endif ()
if (CMAKE_SYSTEM MATCHES "Linux")
set (ARCH_LINUX 1)
set (OS_LINUX 1)
endif ()
if (CMAKE_SYSTEM MATCHES "FreeBSD")
set (ARCH_FREEBSD 1)
set (OS_FREEBSD 1)
endif ()
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")

View File

@ -1,4 +1,4 @@
option (ENABLE_CAPNP "Enable Cap'n Proto" ${NOT_MSVC})
option (ENABLE_CAPNP "Enable Cap'n Proto" ON)
if (ENABLE_CAPNP)
# cmake 3.5.1 bug:

View File

@ -2,7 +2,7 @@
# TODO: test new libcpuid - maybe already fixed
if (NOT ARCH_ARM)
if (ARCH_FREEBSD)
if (OS_FREEBSD)
set (DEFAULT_USE_INTERNAL_CPUID_LIBRARY 1)
else ()
set (DEFAULT_USE_INTERNAL_CPUID_LIBRARY ${NOT_UNBUNDLED})

View File

@ -1,4 +1,4 @@
if (ARCH_FREEBSD)
if (OS_FREEBSD)
find_library (EXECINFO_LIBRARY execinfo)
find_library (ELF_LIBRARY elf)
message (STATUS "Using execinfo: ${EXECINFO_LIBRARY}")

View File

@ -43,6 +43,12 @@ if (ENABLE_EMBEDDED_COMPILER)
else()
set (USE_EMBEDDED_COMPILER 0)
endif()
if (LLVM_FOUND AND OS_LINUX AND USE_LIBCXX)
message(WARNING "Option USE_INTERNAL_LLVM_LIBRARY is not set but the LLVM library from OS packages in Linux is incompatible with libc++ ABI. LLVM Will be disabled.")
set (LLVM_FOUND 0)
set (USE_EMBEDDED_COMPILER 0)
endif ()
else()
set (LLVM_FOUND 1)
set (USE_EMBEDDED_COMPILER 1)

View File

@ -15,8 +15,8 @@
# ODBC_LIBRARIES, the libraries to link against to use ODBC
# ODBC_FOUND. If false, you cannot build anything that requires ODBC.
option (ENABLE_ODBC "Enable ODBC" ${ARCH_LINUX})
if (ARCH_LINUX)
option (ENABLE_ODBC "Enable ODBC" ${OS_LINUX})
if (OS_LINUX)
option (USE_INTERNAL_ODBC_LIBRARY "Set to FALSE to use system odbc library instead of bundled" ${NOT_UNBUNDLED})
else ()
option (USE_INTERNAL_ODBC_LIBRARY "Set to FALSE to use system odbc library instead of bundled" OFF)

View File

@ -92,8 +92,7 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
endif ()
endif ()
# TODO! fix internal ssl
if (OPENSSL_FOUND AND NOT USE_INTERNAL_SSL_LIBRARY AND (NOT DEFINED ENABLE_POCO_NETSSL OR ENABLE_POCO_NETSSL))
if (OPENSSL_FOUND AND (NOT DEFINED ENABLE_POCO_NETSSL OR ENABLE_POCO_NETSSL))
set (Poco_NetSSL_LIBRARY PocoNetSSL)
set (Poco_Crypto_LIBRARY PocoCrypto)
endif ()

View File

@ -1,8 +1,10 @@
option (ENABLE_RDKAFKA "Enable kafka" ${NOT_MSVC})
option (ENABLE_RDKAFKA "Enable kafka" ON)
if (ENABLE_RDKAFKA)
option (USE_INTERNAL_RDKAFKA_LIBRARY "Set to FALSE to use system librdkafka instead of the bundled" ${NOT_UNBUNDLED})
if (OS_LINUX)
option (USE_INTERNAL_RDKAFKA_LIBRARY "Set to FALSE to use system librdkafka instead of the bundled" ${NOT_UNBUNDLED})
endif ()
if (USE_INTERNAL_RDKAFKA_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/librdkafka/CMakeLists.txt")
message (WARNING "submodule contrib/librdkafka is missing. to fix try run: \n git submodule update --init --recursive")
@ -13,7 +15,7 @@ endif ()
if (NOT USE_INTERNAL_RDKAFKA_LIBRARY)
find_library (RDKAFKA_LIB rdkafka)
find_path (RDKAFKA_INCLUDE_DIR NAMES librdkafka/rdkafka.h PATHS ${RDKAFKA_INCLUDE_PATHS})
if (USE_STATIC_LIBRARIES AND NOT ARCH_FREEBSD)
if (USE_STATIC_LIBRARIES AND NOT OS_FREEBSD)
find_library (SASL2_LIBRARY sasl2)
endif ()
endif ()

View File

@ -1,7 +1,7 @@
if (APPLE)
# lib from libs/libcommon
set (RT_LIBRARY "apple_rt")
elseif (ARCH_FREEBSD)
elseif (OS_FREEBSD)
find_library (RT_LIBRARY rt)
else ()
set (RT_LIBRARY "")

View File

@ -1,4 +1,4 @@
option (USE_INTERNAL_SSL_LIBRARY "Set to FALSE to use system *ssl library instead of bundled" ${MSVC})
option (USE_INTERNAL_SSL_LIBRARY "Set to FALSE to use system *ssl library instead of bundled" ${OS_LINUX})
set (OPENSSL_USE_STATIC_LIBS ${USE_STATIC_LIBRARIES})

View File

@ -1,4 +1,6 @@
option (USE_INTERNAL_ZLIB_LIBRARY "Set to FALSE to use system zlib library instead of bundled" ${NOT_UNBUNDLED})
if (NOT OS_FREEBSD AND NOT APPLE)
option (USE_INTERNAL_ZLIB_LIBRARY "Set to FALSE to use system zlib library instead of bundled" ${NOT_UNBUNDLED})
endif ()
if (NOT USE_INTERNAL_ZLIB_LIBRARY)
find_package (ZLIB)
@ -17,7 +19,7 @@ if (NOT ZLIB_FOUND)
set (USE_INTERNAL_ZLIB_LIBRARY 1)
set (ZLIB_COMPAT 1) # for zlib-ng, also enables WITH_GZFILEOP
set (WITH_NATIVE_INSTRUCTIONS ${ARCHNATIVE})
if (ARCH_FREEBSD OR ARCH_I386)
if (OS_FREEBSD OR ARCH_I386)
set (WITH_OPTIM 0 CACHE INTERNAL "") # Bug in assembler
endif ()
if (ARCH_AARCH64)

View File

@ -1,27 +1,37 @@
option (SANITIZE "Enable sanitizer: address, memory, thread, undefined" "")
set (SAN_FLAGS "${SAN_FLAGS} -g -fno-omit-frame-pointer -DSANITIZER")
if (SAN_DEBUG)
set (SAN_FLAGS "${SAN_FLAGS} -O0")
else ()
set (SAN_FLAGS "${SAN_FLAGS} -O3")
endif ()
set (CMAKE_CXX_FLAGS_ASAN "${CMAKE_CXX_FLAGS_ASAN} ${SAN_FLAGS} -fsanitize=address")
set (CMAKE_C_FLAGS_ASAN "${CMAKE_C_FLAGS_ASAN} ${SAN_FLAGS} -fsanitize=address")
set (CMAKE_EXE_LINKER_FLAGS_ASAN "${CMAKE_EXE_LINKER_FLAGS_ASAN} -fsanitize=address")
set (CMAKE_CXX_FLAGS_UBSAN "${CMAKE_CXX_FLAGS_UBSAN} ${SAN_FLAGS} -fsanitize=undefined")
set (CMAKE_C_FLAGS_UBSAN "${CMAKE_C_FLAGS_UBSAN} ${SAN_FLAGS} -fsanitize=undefined")
set (CMAKE_EXE_LINKER_FLAGS_UBSAN "${CMAKE_EXE_LINKER_FLAGS_UBSAN} -fsanitize=undefined")
set (CMAKE_CXX_FLAGS_MSAN "${CMAKE_CXX_FLAGS_MSAN} ${SAN_FLAGS} -fsanitize=memory")
set (CMAKE_C_FLAGS_MSAN "${CMAKE_C_FLAGS_MSAN} ${SAN_FLAGS} -fsanitize=memory")
set (CMAKE_EXE_LINKER_FLAGS_MSAN "${CMAKE_EXE_LINKER_FLAGS_MSAN} -fsanitize=memory")
set (CMAKE_CXX_FLAGS_TSAN "${CMAKE_CXX_FLAGS_TSAN} ${SAN_FLAGS} -fsanitize=thread")
set (CMAKE_C_FLAGS_TSAN "${CMAKE_C_FLAGS_TSAN} ${SAN_FLAGS} -fsanitize=thread")
set (CMAKE_EXE_LINKER_FLAGS_TSAN "${CMAKE_EXE_LINKER_FLAGS_TSAN} -fsanitize=thread")
# clang use static linking by default
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS_ASAN "${CMAKE_EXE_LINKER_FLAGS_ASAN} -static-libasan")
set (CMAKE_EXE_LINKER_FLAGS_UBSAN "${CMAKE_EXE_LINKER_FLAGS_UBSAN} -static-libubsan")
set (CMAKE_EXE_LINKER_FLAGS_MSAN "${CMAKE_EXE_LINKER_FLAGS_MSAN} -static-libmsan")
set (CMAKE_EXE_LINKER_FLAGS_TSAN "${CMAKE_EXE_LINKER_FLAGS_TSAN} -static-libtsan")
endif ()
if (SANITIZE)
if (SANITIZE STREQUAL "address")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} -fsanitize=address")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SAN_FLAGS} -fsanitize=address")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address")
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libasan")
endif ()
elseif (SANITIZE STREQUAL "memory")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} -fsanitize=memory")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SAN_FLAGS} -fsanitize=memory")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=memory")
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libmsan")
endif ()
elseif (SANITIZE STREQUAL "thread")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} -fsanitize=thread")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SAN_FLAGS} -fsanitize=thread")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread")
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libtsan")
endif ()
elseif (SANITIZE STREQUAL "undefined")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} -fsanitize=undefined")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SAN_FLAGS} -fsanitize=undefined")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=undefined")
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libubsan")
endif ()
else ()
message (FATAL_ERROR "Unknown sanitizer type: ${SANITIZE}")
endif ()
endif()

View File

@ -1,10 +1,10 @@
# Third-party libraries may have substandard code.
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-unused-function -Wno-unused-variable -Wno-unused-but-set-variable -Wno-unused-result -Wno-deprecated-declarations -Wno-maybe-uninitialized -Wno-format")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-old-style-cast -Wno-unused-function -Wno-unused-variable -Wno-unused-but-set-variable -Wno-unused-result -Wno-deprecated-declarations -Wno-non-virtual-dtor -Wno-maybe-uninitialized -Wno-format -std=c++1z")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-unused-function -Wno-unused-variable -Wno-unused-but-set-variable -Wno-unused-result -Wno-deprecated-declarations -Wno-maybe-uninitialized -Wno-format -Wno-misleading-indentation -Wno-stringop-overflow")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-old-style-cast -Wno-unused-function -Wno-unused-variable -Wno-unused-but-set-variable -Wno-unused-result -Wno-deprecated-declarations -Wno-non-virtual-dtor -Wno-maybe-uninitialized -Wno-format -Wno-misleading-indentation -Wno-implicit-fallthrough -std=c++1z")
elseif (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-unused-function -Wno-unused-variable -Wno-unused-result -Wno-deprecated-declarations -Wno-format")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-unused-function -Wno-unused-variable -Wno-unused-result -Wno-deprecated-declarations -Wno-format -Wno-parentheses-equality")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wno-old-style-cast -Wno-unused-function -Wno-unused-variable -Wno-unused-result -Wno-deprecated-declarations -Wno-non-virtual-dtor -Wno-format -std=c++1z")
endif ()
@ -96,9 +96,10 @@ if (USE_INTERNAL_SSL_LIBRARY)
set (BUILD_SHARED 1)
endif ()
set (USE_SHARED ${USE_STATIC_LIBRARIES})
set (LIBRESSL_SKIP_INSTALL 1)
add_subdirectory (ssl)
target_include_directories(${OPENSSL_CRYPTO_LIBRARY} PUBLIC ${OPENSSL_INCLUDE_DIR})
target_include_directories(${OPENSSL_SSL_LIBRARY} PUBLIC ${OPENSSL_INCLUDE_DIR})
target_include_directories(${OPENSSL_CRYPTO_LIBRARY} SYSTEM PUBLIC ${OPENSSL_INCLUDE_DIR})
target_include_directories(${OPENSSL_SSL_LIBRARY} SYSTEM PUBLIC ${OPENSSL_INCLUDE_DIR})
endif ()
if (ENABLE_MYSQL AND USE_INTERNAL_MYSQL_LIBRARY)
@ -108,26 +109,9 @@ if (ENABLE_MYSQL AND USE_INTERNAL_MYSQL_LIBRARY)
endif ()
if (USE_INTERNAL_RDKAFKA_LIBRARY)
set (RDKAFKA_BUILD_EXAMPLES OFF CACHE INTERNAL "")
set (RDKAFKA_BUILD_TESTS OFF CACHE INTERNAL "")
set (RDKAFKA_BUILD_STATIC ${MAKE_STATIC_LIBRARIES} CACHE INTERNAL "")
mark_as_advanced (ZLIB_INCLUDE_DIR)
if (USE_INTERNAL_SSL_LIBRARY)
if (MAKE_STATIC_LIBRARIES)
add_library(bundled-ssl ALIAS ${OPENSSL_SSL_LIBRARY})
set (WITH_BUNDLED_SSL 1 CACHE INTERNAL "")
else ()
set (WITH_SSL 0 CACHE INTERNAL "")
endif ()
endif ()
add_subdirectory (librdkafka)
if (USE_INTERNAL_SSL_LIBRARY AND MAKE_STATIC_LIBRARIES)
target_include_directories(rdkafka PRIVATE BEFORE ${OPENSSL_INCLUDE_DIR})
endif ()
add_subdirectory (librdkafka-cmake)
target_include_directories(rdkafka PRIVATE BEFORE ${ZLIB_INCLUDE_DIR})
target_include_directories(rdkafka PRIVATE BEFORE ${OPENSSL_INCLUDE_DIR})
endif ()
if (ENABLE_ODBC AND USE_INTERNAL_ODBC_LIBRARY)
@ -150,11 +134,6 @@ if (USE_INTERNAL_POCO_LIBRARY)
set (_save ${ENABLE_TESTS})
set (ENABLE_TESTS 0)
set (CMAKE_DISABLE_FIND_PACKAGE_ZLIB 1)
if (USE_INTERNAL_SSL_LIBRARY OR (DEFINED ENABLE_POCO_NETSSL AND NOT ENABLE_POCO_NETSSL))
set (DISABLE_INTERNAL_OPENSSL 1 CACHE INTERNAL "")
set (ENABLE_NETSSL 0 CACHE INTERNAL "") # TODO!
set (ENABLE_CRYPTO 0 CACHE INTERNAL "") # TODO!
endif ()
if (MSVC)
set (ENABLE_DATA_ODBC 0 CACHE INTERNAL "") # TODO (build fail)
endif ()
@ -166,7 +145,7 @@ if (USE_INTERNAL_POCO_LIBRARY)
if (OPENSSL_FOUND AND TARGET Crypto AND (NOT DEFINED ENABLE_POCO_NETSSL OR ENABLE_POCO_NETSSL))
# Bug in poco https://github.com/pocoproject/poco/pull/2100 found on macos
target_include_directories(Crypto PUBLIC ${OPENSSL_INCLUDE_DIR})
target_include_directories(Crypto SYSTEM PUBLIC ${OPENSSL_INCLUDE_DIR})
endif ()
endif ()

View File

@ -42,9 +42,9 @@ ${LIBRARY_DIR}/libs/filesystem/src/windows_file_codecvt.cpp)
add_library(boost_system_internal
${LIBRARY_DIR}/libs/system/src/error_code.cpp)
target_include_directories (boost_program_options_internal BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
target_include_directories (boost_filesystem_internal BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
target_include_directories (boost_system_internal BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
target_include_directories (boost_program_options_internal SYSTEM BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
target_include_directories (boost_filesystem_internal SYSTEM BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
target_include_directories (boost_system_internal SYSTEM BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
target_compile_definitions (boost_program_options_internal PUBLIC BOOST_SYSTEM_NO_DEPRECATED)
target_compile_definitions (boost_filesystem_internal PUBLIC BOOST_SYSTEM_NO_DEPRECATED)

View File

@ -96,7 +96,8 @@
/*
* Defined if secure_getenv(3) is available.
*/
#define JEMALLOC_HAVE_SECURE_GETENV
// Don't want dependency on newer GLIBC
//#define JEMALLOC_HAVE_SECURE_GETENV
/*
* Defined if issetugid(2) is available.

View File

@ -17,4 +17,4 @@ include/libcpuid/recog_amd.h
include/libcpuid/recog_intel.h
)
target_include_directories (cpuid PUBLIC include)
target_include_directories (cpuid SYSTEM PUBLIC include)

View File

@ -0,0 +1,60 @@
set(RDKAFKA_SOURCE_DIR ${CMAKE_SOURCE_DIR}/contrib/librdkafka/src)
set(SRCS
${RDKAFKA_SOURCE_DIR}/crc32c.c
${RDKAFKA_SOURCE_DIR}/rdaddr.c
${RDKAFKA_SOURCE_DIR}/rdavl.c
${RDKAFKA_SOURCE_DIR}/rdbuf.c
${RDKAFKA_SOURCE_DIR}/rdcrc32.c
${RDKAFKA_SOURCE_DIR}/rdkafka.c
${RDKAFKA_SOURCE_DIR}/rdkafka_assignor.c
${RDKAFKA_SOURCE_DIR}/rdkafka_broker.c
${RDKAFKA_SOURCE_DIR}/rdkafka_buf.c
${RDKAFKA_SOURCE_DIR}/rdkafka_cgrp.c
${RDKAFKA_SOURCE_DIR}/rdkafka_conf.c
${RDKAFKA_SOURCE_DIR}/rdkafka_event.c
${RDKAFKA_SOURCE_DIR}/rdkafka_feature.c
${RDKAFKA_SOURCE_DIR}/rdkafka_lz4.c
${RDKAFKA_SOURCE_DIR}/rdkafka_metadata.c
${RDKAFKA_SOURCE_DIR}/rdkafka_metadata_cache.c
${RDKAFKA_SOURCE_DIR}/rdkafka_msg.c
${RDKAFKA_SOURCE_DIR}/rdkafka_msgset_reader.c
${RDKAFKA_SOURCE_DIR}/rdkafka_msgset_writer.c
${RDKAFKA_SOURCE_DIR}/rdkafka_offset.c
${RDKAFKA_SOURCE_DIR}/rdkafka_op.c
${RDKAFKA_SOURCE_DIR}/rdkafka_partition.c
${RDKAFKA_SOURCE_DIR}/rdkafka_pattern.c
${RDKAFKA_SOURCE_DIR}/rdkafka_queue.c
${RDKAFKA_SOURCE_DIR}/rdkafka_range_assignor.c
${RDKAFKA_SOURCE_DIR}/rdkafka_request.c
${RDKAFKA_SOURCE_DIR}/rdkafka_roundrobin_assignor.c
${RDKAFKA_SOURCE_DIR}/rdkafka_sasl.c
${RDKAFKA_SOURCE_DIR}/rdkafka_sasl_plain.c
${RDKAFKA_SOURCE_DIR}/rdkafka_subscription.c
${RDKAFKA_SOURCE_DIR}/rdkafka_timer.c
${RDKAFKA_SOURCE_DIR}/rdkafka_topic.c
${RDKAFKA_SOURCE_DIR}/rdkafka_transport.c
${RDKAFKA_SOURCE_DIR}/rdkafka_interceptor.c
${RDKAFKA_SOURCE_DIR}/rdkafka_header.c
${RDKAFKA_SOURCE_DIR}/rdlist.c
${RDKAFKA_SOURCE_DIR}/rdlog.c
${RDKAFKA_SOURCE_DIR}/rdmurmur2.c
${RDKAFKA_SOURCE_DIR}/rdports.c
${RDKAFKA_SOURCE_DIR}/rdrand.c
${RDKAFKA_SOURCE_DIR}/rdregex.c
${RDKAFKA_SOURCE_DIR}/rdstring.c
${RDKAFKA_SOURCE_DIR}/rdunittest.c
${RDKAFKA_SOURCE_DIR}/rdvarint.c
${RDKAFKA_SOURCE_DIR}/snappy.c
${RDKAFKA_SOURCE_DIR}/tinycthread.c
${RDKAFKA_SOURCE_DIR}/xxhash.c
${RDKAFKA_SOURCE_DIR}/lz4.c
${RDKAFKA_SOURCE_DIR}/lz4frame.c
${RDKAFKA_SOURCE_DIR}/lz4hc.c
${RDKAFKA_SOURCE_DIR}/rdgz.c
)
add_library(rdkafka STATIC ${SRCS})
target_include_directories(rdkafka PRIVATE include)
target_include_directories(rdkafka SYSTEM PUBLIC ${RDKAFKA_SOURCE_DIR})
target_link_libraries(rdkafka PUBLIC ${ZLIB_LIBRARIES} ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY})

View File

@ -0,0 +1,74 @@
// Automatically generated by ./configure
#ifndef _CONFIG_H_
#define _CONFIG_H_
#define ARCH "x86_64"
#define CPU "generic"
#define WITHOUT_OPTIMIZATION 0
#define ENABLE_DEVEL 0
#define ENABLE_VALGRIND 0
#define ENABLE_REFCNT_DEBUG 0
#define ENABLE_SHAREDPTR_DEBUG 0
#define ENABLE_LZ4_EXT 1
#define ENABLE_SSL 1
//#define ENABLE_SASL 1
#define MKL_APP_NAME "librdkafka"
#define MKL_APP_DESC_ONELINE "The Apache Kafka C/C++ library"
// distro
//#define SOLIB_EXT ".so"
// gcc
//#define WITH_GCC 1
// gxx
//#define WITH_GXX 1
// pkgconfig
//#define WITH_PKGCONFIG 1
// install
//#define WITH_INSTALL 1
// PIC
//#define HAVE_PIC 1
// gnulib
//#define WITH_GNULD 1
// __atomic_32
#define HAVE_ATOMICS_32 1
// __atomic_32
#define HAVE_ATOMICS_32_ATOMIC 1
// atomic_32
#define ATOMIC_OP32(OP1,OP2,PTR,VAL) __atomic_ ## OP1 ## _ ## OP2(PTR, VAL, __ATOMIC_SEQ_CST)
// __atomic_64
#define HAVE_ATOMICS_64 1
// __atomic_64
#define HAVE_ATOMICS_64_ATOMIC 1
// atomic_64
#define ATOMIC_OP64(OP1,OP2,PTR,VAL) __atomic_ ## OP1 ## _ ## OP2(PTR, VAL, __ATOMIC_SEQ_CST)
// atomic_64
#define ATOMIC_OP(OP1,OP2,PTR,VAL) __atomic_ ## OP1 ## _ ## OP2(PTR, VAL, __ATOMIC_SEQ_CST)
// parseversion
#define RDKAFKA_VERSION_STR "0.11.4"
// parseversion
#define MKL_APP_VERSION "0.11.4"
// libdl
//#define WITH_LIBDL 1
// WITH_PLUGINS
//#define WITH_PLUGINS 1
// zlib
#define WITH_ZLIB 1
// WITH_SNAPPY
#define WITH_SNAPPY 1
// WITH_SOCKEM
#define WITH_SOCKEM 1
// libssl
#define WITH_SSL 1
// WITH_SASL_SCRAM
//#define WITH_SASL_SCRAM 1
// crc32chw
#define WITH_CRC32C_HW 1
// regex
#define HAVE_REGEX 1
// strndup
#define HAVE_STRNDUP 1
// strerror_r
#define HAVE_STRERROR_R 1
// pthread_setname_gnu
#define HAVE_PTHREAD_SETNAME_GNU 1
// python
//#define HAVE_PYTHON 1
#endif /* _CONFIG_H_ */

View File

@ -0,0 +1 @@
This directory is needed because rdkafka files have #include "../config.h"

View File

@ -57,6 +57,8 @@ ${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64/libmariadb/ma_client_plugin.c
add_library(mysqlclient STATIC ${SRCS})
target_link_libraries(mysqlclient ${OPENSSL_LIBRARIES})
target_include_directories(mysqlclient PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64/include)
target_include_directories(mysqlclient PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/common/include)
target_include_directories(mysqlclient PUBLIC ${MARIADB_CLIENT_SOURCE_DIR}/include)

View File

@ -15,7 +15,7 @@
#define MARIADB_VERSION_ID 100306
#define MYSQL_VERSION_ID 100306
#define MARIADB_PORT 3306
#define MARIADB_UNIX_ADDR "/tmp/mysql.sock"
#define MARIADB_UNIX_ADDR "/var/run/mysqld/mysqld.sock"
#define MYSQL_CONFIG_NAME "my"
#define MARIADB_PACKAGE_VERSION "3.0.6"

View File

@ -1,5 +1,7 @@
add_library(murmurhash
src/murmurhash2.cpp
include/murmurhash2.h)
src/murmurhash3.cpp
include/murmurhash2.h
include/murmurhash3.h)
target_include_directories (murmurhash PUBLIC include)

View File

@ -0,0 +1,37 @@
//-----------------------------------------------------------------------------
// MurmurHash3 was written by Austin Appleby, and is placed in the public
// domain. The author hereby disclaims copyright to this source code.
#ifndef _MURMURHASH3_H_
#define _MURMURHASH3_H_
//-----------------------------------------------------------------------------
// Platform-specific functions and macros
// Microsoft Visual Studio
#if defined(_MSC_VER) && (_MSC_VER < 1600)
typedef unsigned char uint8_t;
typedef unsigned int uint32_t;
typedef unsigned __int64 uint64_t;
// Other compilers
#else // defined(_MSC_VER)
#include <stdint.h>
#endif // !defined(_MSC_VER)
//-----------------------------------------------------------------------------
void MurmurHash3_x86_32 ( const void * key, int len, uint32_t seed, void * out );
void MurmurHash3_x86_128 ( const void * key, int len, uint32_t seed, void * out );
void MurmurHash3_x64_128 ( const void * key, int len, uint32_t seed, void * out );
//-----------------------------------------------------------------------------
#endif // _MURMURHASH3_H_

View File

@ -0,0 +1,331 @@
// MurmurHash3 was written by Austin Appleby, and is placed in the public
// domain. The author hereby disclaims copyright to this source code.
// Note - The x86 and x64 versions do _not_ produce the same results, as the
// algorithms are optimized for their respective platforms. You can still
// compile and run any of them on any platform, but your performance with the
// non-native version will be less than optimal.
#include "murmurhash3.h"
//-----------------------------------------------------------------------------
// Platform-specific functions and macros
// Microsoft Visual Studio
#if defined(_MSC_VER)
#define FORCE_INLINE __forceinline
#include <stdlib.h>
#define ROTL32(x,y) _rotl(x,y)
#define ROTL64(x,y) _rotl64(x,y)
#define BIG_CONSTANT(x) (x)
// Other compilers
#else // defined(_MSC_VER)
#define FORCE_INLINE inline __attribute__((always_inline))
inline uint32_t rotl32 ( uint32_t x, int8_t r )
{
return (x << r) | (x >> (32 - r));
}
inline uint64_t rotl64 ( uint64_t x, int8_t r )
{
return (x << r) | (x >> (64 - r));
}
#define ROTL32(x,y) rotl32(x,y)
#define ROTL64(x,y) rotl64(x,y)
#define BIG_CONSTANT(x) (x##LLU)
#endif // !defined(_MSC_VER)
//-----------------------------------------------------------------------------
// Block read - if your platform needs to do endian-swapping or can only
// handle aligned reads, do the conversion here
FORCE_INLINE uint32_t getblock32 ( const uint32_t * p, int i )
{
return p[i];
}
FORCE_INLINE uint64_t getblock64 ( const uint64_t * p, int i )
{
return p[i];
}
//-----------------------------------------------------------------------------
// Finalization mix - force all bits of a hash block to avalanche
FORCE_INLINE uint32_t fmix32 ( uint32_t h )
{
h ^= h >> 16;
h *= 0x85ebca6b;
h ^= h >> 13;
h *= 0xc2b2ae35;
h ^= h >> 16;
return h;
}
//----------
FORCE_INLINE uint64_t fmix64 ( uint64_t k )
{
k ^= k >> 33;
k *= BIG_CONSTANT(0xff51afd7ed558ccd);
k ^= k >> 33;
k *= BIG_CONSTANT(0xc4ceb9fe1a85ec53);
k ^= k >> 33;
return k;
}
//-----------------------------------------------------------------------------
void MurmurHash3_x86_32 ( const void * key, int len,
uint32_t seed, void * out )
{
const uint8_t * data = (const uint8_t*)key;
const int nblocks = len / 4;
uint32_t h1 = seed;
const uint32_t c1 = 0xcc9e2d51;
const uint32_t c2 = 0x1b873593;
//----------
// body
const uint32_t * blocks = (const uint32_t *)(data + nblocks*4);
for(int i = -nblocks; i; i++)
{
uint32_t k1 = getblock32(blocks,i);
k1 *= c1;
k1 = ROTL32(k1,15);
k1 *= c2;
h1 ^= k1;
h1 = ROTL32(h1,13);
h1 = h1*5+0xe6546b64;
}
//----------
// tail
const uint8_t * tail = (const uint8_t*)(data + nblocks*4);
uint32_t k1 = 0;
switch(len & 3)
{
case 3: k1 ^= tail[2] << 16;
case 2: k1 ^= tail[1] << 8;
case 1: k1 ^= tail[0];
k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
};
//----------
// finalization
h1 ^= len;
h1 = fmix32(h1);
*(uint32_t*)out = h1;
}
//-----------------------------------------------------------------------------
void MurmurHash3_x86_128 ( const void * key, const int len,
uint32_t seed, void * out )
{
const uint8_t * data = (const uint8_t*)key;
const int nblocks = len / 16;
uint32_t h1 = seed;
uint32_t h2 = seed;
uint32_t h3 = seed;
uint32_t h4 = seed;
const uint32_t c1 = 0x239b961b;
const uint32_t c2 = 0xab0e9789;
const uint32_t c3 = 0x38b34ae5;
const uint32_t c4 = 0xa1e38b93;
//----------
// body
const uint32_t * blocks = (const uint32_t *)(data + nblocks*16);
for(int i = -nblocks; i; i++)
{
uint32_t k1 = getblock32(blocks,i*4+0);
uint32_t k2 = getblock32(blocks,i*4+1);
uint32_t k3 = getblock32(blocks,i*4+2);
uint32_t k4 = getblock32(blocks,i*4+3);
k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
h1 = ROTL32(h1,19); h1 += h2; h1 = h1*5+0x561ccd1b;
k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2;
h2 = ROTL32(h2,17); h2 += h3; h2 = h2*5+0x0bcaa747;
k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3;
h3 = ROTL32(h3,15); h3 += h4; h3 = h3*5+0x96cd1c35;
k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4;
h4 = ROTL32(h4,13); h4 += h1; h4 = h4*5+0x32ac3b17;
}
//----------
// tail
const uint8_t * tail = (const uint8_t*)(data + nblocks*16);
uint32_t k1 = 0;
uint32_t k2 = 0;
uint32_t k3 = 0;
uint32_t k4 = 0;
switch(len & 15)
{
case 15: k4 ^= tail[14] << 16;
case 14: k4 ^= tail[13] << 8;
case 13: k4 ^= tail[12] << 0;
k4 *= c4; k4 = ROTL32(k4,18); k4 *= c1; h4 ^= k4;
case 12: k3 ^= tail[11] << 24;
case 11: k3 ^= tail[10] << 16;
case 10: k3 ^= tail[ 9] << 8;
case 9: k3 ^= tail[ 8] << 0;
k3 *= c3; k3 = ROTL32(k3,17); k3 *= c4; h3 ^= k3;
case 8: k2 ^= tail[ 7] << 24;
case 7: k2 ^= tail[ 6] << 16;
case 6: k2 ^= tail[ 5] << 8;
case 5: k2 ^= tail[ 4] << 0;
k2 *= c2; k2 = ROTL32(k2,16); k2 *= c3; h2 ^= k2;
case 4: k1 ^= tail[ 3] << 24;
case 3: k1 ^= tail[ 2] << 16;
case 2: k1 ^= tail[ 1] << 8;
case 1: k1 ^= tail[ 0] << 0;
k1 *= c1; k1 = ROTL32(k1,15); k1 *= c2; h1 ^= k1;
};
//----------
// finalization
h1 ^= len; h2 ^= len; h3 ^= len; h4 ^= len;
h1 += h2; h1 += h3; h1 += h4;
h2 += h1; h3 += h1; h4 += h1;
h1 = fmix32(h1);
h2 = fmix32(h2);
h3 = fmix32(h3);
h4 = fmix32(h4);
h1 += h2; h1 += h3; h1 += h4;
h2 += h1; h3 += h1; h4 += h1;
((uint32_t*)out)[0] = h1;
((uint32_t*)out)[1] = h2;
((uint32_t*)out)[2] = h3;
((uint32_t*)out)[3] = h4;
}
//-----------------------------------------------------------------------------
void MurmurHash3_x64_128 ( const void * key, const int len,
const uint32_t seed, void * out )
{
const uint8_t * data = (const uint8_t*)key;
const int nblocks = len / 16;
uint64_t h1 = seed;
uint64_t h2 = seed;
const uint64_t c1 = BIG_CONSTANT(0x87c37b91114253d5);
const uint64_t c2 = BIG_CONSTANT(0x4cf5ad432745937f);
//----------
// body
const uint64_t * blocks = (const uint64_t *)(data);
for(int i = 0; i < nblocks; i++)
{
uint64_t k1 = getblock64(blocks,i*2+0);
uint64_t k2 = getblock64(blocks,i*2+1);
k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1;
h1 = ROTL64(h1,27); h1 += h2; h1 = h1*5+0x52dce729;
k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2;
h2 = ROTL64(h2,31); h2 += h1; h2 = h2*5+0x38495ab5;
}
//----------
// tail
const uint8_t * tail = (const uint8_t*)(data + nblocks*16);
uint64_t k1 = 0;
uint64_t k2 = 0;
switch(len & 15)
{
case 15: k2 ^= ((uint64_t)tail[14]) << 48;
case 14: k2 ^= ((uint64_t)tail[13]) << 40;
case 13: k2 ^= ((uint64_t)tail[12]) << 32;
case 12: k2 ^= ((uint64_t)tail[11]) << 24;
case 11: k2 ^= ((uint64_t)tail[10]) << 16;
case 10: k2 ^= ((uint64_t)tail[ 9]) << 8;
case 9: k2 ^= ((uint64_t)tail[ 8]) << 0;
k2 *= c2; k2 = ROTL64(k2,33); k2 *= c1; h2 ^= k2;
case 8: k1 ^= ((uint64_t)tail[ 7]) << 56;
case 7: k1 ^= ((uint64_t)tail[ 6]) << 48;
case 6: k1 ^= ((uint64_t)tail[ 5]) << 40;
case 5: k1 ^= ((uint64_t)tail[ 4]) << 32;
case 4: k1 ^= ((uint64_t)tail[ 3]) << 24;
case 3: k1 ^= ((uint64_t)tail[ 2]) << 16;
case 2: k1 ^= ((uint64_t)tail[ 1]) << 8;
case 1: k1 ^= ((uint64_t)tail[ 0]) << 0;
k1 *= c1; k1 = ROTL64(k1,31); k1 *= c2; h1 ^= k1;
};
//----------
// finalization
h1 ^= len; h2 ^= len;
h1 += h2;
h2 += h1;
h1 = fmix64(h1);
h2 = fmix64(h2);
h1 += h2;
h2 += h1;
((uint64_t*)out)[0] = h1;
((uint64_t*)out)[1] = h2;
}

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit 3a2d0a833a22ef5e1164a9ada54e3253cb038904
Subproject commit 3df947389e6d9654919002797bdd86ed190b3963

View File

@ -12,7 +12,8 @@ endforeach ()
add_library (re2_st ${RE2_ST_SOURCES})
target_compile_definitions (re2_st PRIVATE NDEBUG NO_THREADS re2=re2_st)
target_include_directories (re2_st PRIVATE . PUBLIC ${CMAKE_CURRENT_BINARY_DIR} ${RE2_SOURCE_DIR})
target_include_directories (re2_st PRIVATE .)
target_include_directories (re2_st SYSTEM PUBLIC ${CMAKE_CURRENT_BINARY_DIR} ${RE2_SOURCE_DIR})
file (MAKE_DIRECTORY ${CMAKE_CURRENT_BINARY_DIR}/re2_st)
foreach (FILENAME filtered_re2.h re2.h set.h stringpiece.h)

2
contrib/ssl vendored

@ -1 +1 @@
Subproject commit 6fbe1c6f404193989c5f6a63115d80fbe34ce2a3
Subproject commit de02224a42c69e3d8c9112c82018816f821878d0

View File

@ -279,7 +279,8 @@ target_link_libraries(unixodbc ltdl)
# SYSTEM_FILE_PATH was changed to /etc
target_include_directories(unixodbc PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64)
target_include_directories(unixodbc PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64/private)
target_include_directories(unixodbc PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64)
target_include_directories(unixodbc PUBLIC ${ODBC_SOURCE_DIR}/include)
target_compile_definitions(unixodbc PRIVATE -DHAVE_CONFIG_H)

2
contrib/zlib-ng vendored

@ -1 +1 @@
Subproject commit e07a52dbaa35d003f5659b221b29d220c091667b
Subproject commit 9173b89d46799582d20a30578e0aa9788bc7d6e1

View File

@ -85,7 +85,7 @@ list (APPEND dbms_headers src/TableFunctions/ITableFunction.h src/TableFunctions
add_library(clickhouse_common_io ${SPLIT_SHARED} ${clickhouse_common_io_headers} ${clickhouse_common_io_sources})
if (ARCH_FREEBSD)
if (OS_FREEBSD)
target_compile_definitions (clickhouse_common_io PUBLIC CLOCK_MONOTONIC_COARSE=CLOCK_MONOTONIC_FAST)
endif ()
@ -156,7 +156,6 @@ target_link_libraries (dbms
${MYSQLXX_LIBRARY}
${RE2_LIBRARY}
${RE2_ST_LIBRARY}
${OPENSSL_CRYPTO_LIBRARY}
${BTRIE_LIBRARIES}
)
@ -219,6 +218,8 @@ if (USE_RDKAFKA)
endif ()
endif ()
target_link_libraries(dbms ${OPENSSL_CRYPTO_LIBRARY})
target_link_libraries (dbms
Threads::Threads
)

View File

@ -1,11 +1,11 @@
# This strings autochanged from release_lib.sh:
set(VERSION_REVISION 54404 CACHE STRING "")
set(VERSION_REVISION 54406 CACHE STRING "")
set(VERSION_MAJOR 18 CACHE STRING "")
set(VERSION_MINOR 9 CACHE STRING "")
set(VERSION_MINOR 11 CACHE STRING "")
set(VERSION_PATCH 0 CACHE STRING "")
set(VERSION_GITHASH c83721a02db002eef7ff864f82d53ca89d47f9e6 CACHE STRING "")
set(VERSION_DESCRIBE v18.9.0-testing CACHE STRING "")
set(VERSION_STRING 18.9.0 CACHE STRING "")
set(VERSION_GITHASH 76af46ed5d223b3a7af92e31eae291174da16355 CACHE STRING "")
set(VERSION_DESCRIBE v18.11.0-testing CACHE STRING "")
set(VERSION_STRING 18.11.0 CACHE STRING "")
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -13,6 +13,7 @@ option (ENABLE_CLICKHOUSE_COMPRESSOR "Enable clickhouse-compressor" ${ENABLE_CLI
option (ENABLE_CLICKHOUSE_COPIER "Enable clickhouse-copier" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_FORMAT "Enable clickhouse-format" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_OBFUSCATOR "Enable clickhouse-obfuscator" ${ENABLE_CLICKHOUSE_ALL})
option (ENABLE_CLICKHOUSE_ODBC_BRIDGE "Enable clickhouse-odbc-bridge" ${ENABLE_CLICKHOUSE_ALL})
configure_file (config_tools.h.in ${CMAKE_CURRENT_BINARY_DIR}/config_tools.h)
@ -27,10 +28,11 @@ add_subdirectory (copier)
add_subdirectory (format)
add_subdirectory (clang)
add_subdirectory (obfuscator)
add_subdirectory (odbc-bridge)
if (CLICKHOUSE_SPLIT_BINARY)
set (CLICKHOUSE_ALL_TARGETS clickhouse-server clickhouse-client clickhouse-local clickhouse-benchmark clickhouse-performance-test
clickhouse-extract-from-config clickhouse-format clickhouse-copier)
clickhouse-extract-from-config clickhouse-compressor clickhouse-format clickhouse-copier clickhouse-odbc-bridge)
if (USE_EMBEDDED_COMPILER)
list (APPEND CLICKHOUSE_ALL_TARGETS clickhouse-clang clickhouse-lld)
@ -83,6 +85,9 @@ else ()
if (USE_EMBEDDED_COMPILER)
target_link_libraries (clickhouse clickhouse-compiler-lib)
endif ()
if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
target_link_libraries (clickhouse clickhouse-odbc-bridge-lib)
endif()
set (CLICKHOUSE_BUNDLE)
if (ENABLE_CLICKHOUSE_SERVER)
@ -135,6 +140,12 @@ else ()
install (FILES ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-obfuscator DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-obfuscator)
endif ()
if (ENABLE_CLICKHOUSE_ODBC_BRIDGE)
add_custom_target (clickhouse-odbc-bridge ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-odbc-bridge DEPENDS clickhouse)
install (FILES ${CMAKE_CURRENT_BINARY_DIR}/clickhouse-odbc-bridge DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
list(APPEND CLICKHOUSE_BUNDLE clickhouse-odbc-bridge)
endif ()
# install always because depian package want this files:
add_custom_target (clickhouse-clang ALL COMMAND ${CMAKE_COMMAND} -E create_symlink clickhouse clickhouse-clang DEPENDS clickhouse)

View File

@ -13,10 +13,10 @@ if (CLICKHOUSE_SPLIT_BINARY)
endif ()
endif ()
set(TMP_HEADERS_DIR "${CMAKE_CURRENT_BINARY_DIR}/headers")
set(TMP_HEADERS_DIR "${CMAKE_CURRENT_BINARY_DIR}/${INTERNAL_COMPILER_HEADERS_RELATIVE}")
# Make and install empty dir for debian package if compiler disabled
add_custom_target(make-headers-directory ALL COMMAND ${CMAKE_COMMAND} -E make_directory ${TMP_HEADERS_DIR})
install(DIRECTORY ${TMP_HEADERS_DIR} DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/clickhouse COMPONENT clickhouse)
install(DIRECTORY ${TMP_HEADERS_DIR} DESTINATION ${CMAKE_INSTALL_DATAROOTDIR}/clickhouse/${INTERNAL_COMPILER_HEADERS_DIR} COMPONENT clickhouse)
# TODO: fix on macos copy_headers.sh: sed --posix
if (USE_EMBEDDED_COMPILER AND NOT APPLE)
add_custom_target(copy-headers ALL env CLANG=${CMAKE_CURRENT_BINARY_DIR}/../clickhouse-clang BUILD_PATH=${ClickHouse_BINARY_DIR} DESTDIR=${ClickHouse_SOURCE_DIR} ${ClickHouse_SOURCE_DIR}/copy_headers.sh ${ClickHouse_SOURCE_DIR} ${TMP_HEADERS_DIR} DEPENDS clickhouse-clang WORKING_DIRECTORY ${ClickHouse_SOURCE_DIR} SOURCES ${ClickHouse_SOURCE_DIR}/copy_headers.sh)

View File

@ -43,4 +43,7 @@ LLVMSupport
#PollyPPCG
PUBLIC ${ZLIB_LIBRARIES} ${EXECINFO_LIBRARY} Threads::Threads
${MALLOC_LIBRARIES}
${GLIBC_COMPATIBILITY_LIBRARIES}
${MEMCPY_LIBRARIES}
)

View File

@ -43,4 +43,7 @@ ${REQUIRED_LLVM_LIBRARIES}
#PollyPPCG
PUBLIC ${ZLIB_LIBRARIES} ${EXECINFO_LIBRARY} Threads::Threads
${MALLOC_LIBRARIES}
${GLIBC_COMPATIBILITY_LIBRARIES}
${MEMCPY_LIBRARIES}
)

View File

@ -39,4 +39,7 @@ lldCore
${REQUIRED_LLVM_LIBRARIES}
PUBLIC ${ZLIB_LIBRARIES} ${EXECINFO_LIBRARY} Threads::Threads
${MALLOC_LIBRARIES}
${GLIBC_COMPATIBILITY_LIBRARIES}
${MEMCPY_LIBRARIES}
)

View File

@ -1,3 +1,5 @@
#include "TestHint.h"
#include <port/unistd.h>
#include <stdlib.h>
#include <fcntl.h>
@ -20,7 +22,6 @@
#include <Common/Stopwatch.h>
#include <Common/Exception.h>
#include <Common/ShellCommand.h>
#include <Common/ExternalTable.h>
#include <Common/UnicodeBar.h>
#include <Common/formatReadable.h>
#include <Common/NetException.h>
@ -31,6 +32,7 @@
#include <Common/config_version.h>
#include <Core/Types.h>
#include <Core/QueryProcessingStage.h>
#include <Core/ExternalTable.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFile.h>
@ -39,6 +41,7 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <DataStreams/AsynchronousBlockInputStream.h>
#include <DataStreams/InternalTextLogsRowOutputStream.h>
#include <Parsers/ParserQuery.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/ASTUseQuery.h>
@ -90,102 +93,6 @@ namespace ErrorCodes
}
/// Checks expected server and client error codes in testmode.
/// To enable it add special comment after the query: "-- { serverError 60 }" or "-- { clientError 20 }".
class TestHint
{
public:
TestHint(bool enabled_, const String & query)
: enabled(enabled_),
server_error(0),
client_error(0)
{
if (!enabled_)
return;
size_t pos = query.find("--");
if (pos != String::npos && query.find("--", pos + 2) != String::npos)
return; /// It's not last comment. Hint belongs to commented query.
if (pos != String::npos)
{
pos = query.find('{', pos + 2);
if (pos != String::npos)
{
String hint = query.substr(pos + 1);
pos = hint.find('}');
hint.resize(pos);
parse(hint);
}
}
}
/// @returns true if it's possible to continue without reconnect
bool checkActual(int & actual_server_error, int & actual_client_error,
bool & got_exception, std::unique_ptr<Exception> & last_exception) const
{
if (!enabled)
return true;
if (allErrorsExpected(actual_server_error, actual_client_error))
{
got_exception = false;
last_exception.reset();
actual_server_error = 0;
actual_client_error = 0;
return false;
}
if (lostExpectedError(actual_server_error, actual_client_error))
{
std::cerr << "Success when error expected. It expects server error "
<< server_error << ", client error " << client_error << "." << std::endl;
got_exception = true;
last_exception = std::make_unique<Exception>("Success when error expected", ErrorCodes::LOGICAL_ERROR); /// return error to OS
return false;
}
return true;
}
int serverError() const { return server_error; }
int clientError() const { return client_error; }
private:
bool enabled;
int server_error;
int client_error;
void parse(const String & hint)
{
std::stringstream ss;
ss << hint;
while (!ss.eof())
{
String item;
ss >> item;
if (item.empty())
break;
if (item == "serverError")
ss >> server_error;
else if (item == "clientError")
ss >> client_error;
}
}
bool allErrorsExpected(int actual_server_error, int actual_client_error) const
{
return (server_error || client_error) && (server_error == actual_server_error) && (client_error == actual_client_error);
}
bool lostExpectedError(int actual_server_error, int actual_client_error) const
{
return (server_error && !actual_server_error) || (client_error && !actual_client_error);
}
};
class Client : public Poco::Util::Application
{
public:
@ -204,6 +111,7 @@ private:
bool is_interactive = true; /// Use either readline interface or batch mode.
bool need_render_progress = true; /// Render query execution progress.
bool echo_queries = false; /// Print queries before execution in batch mode.
bool ignore_error = false; /// In case of errors, don't print error message, continue to next query. Only applicable for non-interactive mode.
bool print_time_to_stderr = false; /// Output execution time to stderr in batch mode.
bool stdin_is_not_tty = false; /// stdin is not a terminal.
@ -234,6 +142,11 @@ private:
std::optional<WriteBufferFromFile> out_file_buf;
BlockOutputStreamPtr block_out_stream;
/// The user could specify special file for server logs (stderr by default)
std::unique_ptr<WriteBuffer> out_logs_buf;
String server_logs_file;
BlockOutputStreamPtr logs_out_stream;
String home_path;
String current_profile;
@ -407,20 +320,10 @@ private:
/// If exception code isn't zero, we should return non-zero return code anyway.
return e.code() ? e.code() : -1;
}
catch (const Poco::Exception & e)
{
std::cerr << "Poco::Exception: " << e.displayText() << std::endl;
return ErrorCodes::POCO_EXCEPTION;
}
catch (const std::exception & e)
{
std::cerr << "std::exception: " << e.what() << std::endl;
return ErrorCodes::STD_EXCEPTION;
}
catch (...)
{
std::cerr << "Unknown exception" << std::endl;
return ErrorCodes::UNKNOWN_EXCEPTION;
std::cerr << getCurrentExceptionMessage(false) << std::endl;
return getCurrentExceptionCode();
}
}
@ -468,12 +371,18 @@ private:
format_max_block_size = config().getInt("format_max_block_size", context.getSettingsRef().max_block_size);
insert_format = "Values";
insert_format_max_block_size = config().getInt("insert_format_max_block_size", context.getSettingsRef().max_insert_block_size);
/// Setting value from cmd arg overrides one from config
if (context.getSettingsRef().max_insert_block_size.changed)
insert_format_max_block_size = context.getSettingsRef().max_insert_block_size;
else
insert_format_max_block_size = config().getInt("insert_format_max_block_size", context.getSettingsRef().max_insert_block_size);
if (!is_interactive)
{
need_render_progress = config().getBool("progress", false);
echo_queries = config().getBool("echo", false);
ignore_error = config().getBool("ignore-error", false);
}
connect();
@ -765,7 +674,6 @@ private:
bool process(const String & text)
{
const auto ignore_error = config().getBool("ignore-error", false);
const bool test_mode = config().has("testmode");
if (config().has("multiquery"))
{
@ -781,6 +689,7 @@ private:
{
const char * pos = begin;
ASTPtr ast = parseQuery(pos, end, true);
if (!ast)
{
if (ignore_error)
@ -796,7 +705,7 @@ private:
return true;
}
ASTInsertQuery * insert = typeid_cast<ASTInsertQuery *>(&*ast);
ASTInsertQuery * insert = typeid_cast<ASTInsertQuery *>(ast.get());
if (insert && insert->data)
{
@ -821,9 +730,10 @@ private:
}
catch (...)
{
actual_client_error = getCurrentExceptionCode();
if (!actual_client_error || actual_client_error != expected_client_error)
std::cerr << "Error on processing query: " << query << std::endl << getCurrentExceptionMessage(true);
last_exception = std::make_unique<Exception>(getCurrentExceptionMessage(true), getCurrentExceptionCode());
actual_client_error = last_exception->code();
if (!ignore_error && (!actual_client_error || actual_client_error != expected_client_error))
std::cerr << "Error on processing query: " << query << std::endl << last_exception->message();
got_exception = true;
}
@ -988,7 +898,7 @@ private:
/// If structure was received (thus, server has not thrown an exception),
/// send our data with that structure.
sendData(sample);
receivePacket();
receiveEndOfQuery();
}
}
@ -1070,6 +980,11 @@ private:
connection->sendData(block);
processed_rows += block.rows();
/// Check if server send Log packet
auto packet_type = connection->checkPacket();
if (packet_type && *packet_type == Protocol::Server::Log)
receiveAndProcessPacket();
if (!block)
break;
}
@ -1081,18 +996,28 @@ private:
/// Flush all buffers.
void resetOutput()
{
block_out_stream = nullptr;
block_out_stream.reset();
logs_out_stream.reset();
if (pager_cmd)
{
pager_cmd->in.close();
pager_cmd->wait();
}
pager_cmd = nullptr;
if (out_file_buf)
{
out_file_buf->next();
out_file_buf.reset();
}
if (out_logs_buf)
{
out_logs_buf->next();
out_logs_buf.reset();
}
std_out.next();
}
@ -1125,7 +1050,7 @@ private:
continue; /// If there is no new data, continue checking whether the query was cancelled after a timeout.
}
if (!receivePacket())
if (!receiveAndProcessPacket())
break;
}
@ -1136,7 +1061,7 @@ private:
/// Receive a part of the result, or progress info or an exception and process it.
/// Returns true if one should continue receiving packets.
bool receivePacket()
bool receiveAndProcessPacket()
{
Connection::Packet packet = connection->receivePacket();
@ -1167,6 +1092,10 @@ private:
last_exception = std::move(packet.exception);
return false;
case Protocol::Server::Log:
onLogData(packet.block);
return true;
case Protocol::Server::EndOfStream:
onEndOfStream();
return false;
@ -1180,22 +1109,59 @@ private:
/// Receive the block that serves as an example of the structure of table where data will be inserted.
bool receiveSampleBlock(Block & out)
{
Connection::Packet packet = connection->receivePacket();
switch (packet.type)
while (true)
{
case Protocol::Server::Data:
out = packet.block;
return true;
Connection::Packet packet = connection->receivePacket();
case Protocol::Server::Exception:
onException(*packet.exception);
last_exception = std::move(packet.exception);
return false;
switch (packet.type)
{
case Protocol::Server::Data:
out = packet.block;
return true;
default:
throw NetException("Unexpected packet from server (expected Data, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
case Protocol::Server::Exception:
onException(*packet.exception);
last_exception = std::move(packet.exception);
return false;
case Protocol::Server::Log:
onLogData(packet.block);
break;
default:
throw NetException("Unexpected packet from server (expected Data, Exception or Log, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
}
}
}
/// Process Log packets, exit when recieve Exception or EndOfStream
bool receiveEndOfQuery()
{
while (true)
{
Connection::Packet packet = connection->receivePacket();
switch (packet.type)
{
case Protocol::Server::EndOfStream:
onEndOfStream();
return true;
case Protocol::Server::Exception:
onException(*packet.exception);
last_exception = std::move(packet.exception);
return false;
case Protocol::Server::Log:
onLogData(packet.block);
break;
default:
throw NetException("Unexpected packet from server (expected Exception, EndOfStream or Log, got "
+ String(Protocol::Server::toString(packet.type)) + ")", ErrorCodes::UNEXPECTED_PACKET_FROM_SERVER);
}
}
}
@ -1251,6 +1217,38 @@ private:
}
void initLogsOutputStream()
{
if (!logs_out_stream)
{
WriteBuffer * wb = out_logs_buf.get();
if (!out_logs_buf)
{
if (server_logs_file.empty())
{
/// Use stderr by default
out_logs_buf = std::make_unique<WriteBufferFromFileDescriptor>(STDERR_FILENO);
wb = out_logs_buf.get();
}
else if (server_logs_file == "-")
{
/// Use stdout if --server_logs_file=- specified
wb = &std_out;
}
else
{
out_logs_buf = std::make_unique<WriteBufferFromFile>(server_logs_file, DBMS_DEFAULT_BUFFER_SIZE, O_WRONLY | O_APPEND | O_CREAT);
wb = out_logs_buf.get();
}
}
logs_out_stream = std::make_shared<InternalTextLogsRowOutputStream>(*wb);
logs_out_stream->writePrefix();
}
}
void onData(Block & block)
{
if (written_progress_chars)
@ -1274,6 +1272,14 @@ private:
}
void onLogData(Block & block)
{
initLogsOutputStream();
logs_out_stream->write(block);
logs_out_stream->flush();
}
void onTotals(Block & block)
{
initBlockOutputStream(block);
@ -1434,6 +1440,9 @@ private:
if (block_out_stream)
block_out_stream->writeSuffix();
if (logs_out_stream)
logs_out_stream->writeSuffix();
resetOutput();
if (is_interactive && !written_first_block)
@ -1509,7 +1518,9 @@ public:
ioctl(0, TIOCGWINSZ, &terminal_size);
unsigned line_length = boost::program_options::options_description::m_default_line_length;
namespace po = boost::program_options;
unsigned line_length = po::options_description::m_default_line_length;
unsigned min_description_length = line_length / 2;
if (!stdin_is_not_tty)
{
@ -1517,55 +1528,58 @@ public:
min_description_length = std::min(min_description_length, line_length - 2);
}
#define DECLARE_SETTING(TYPE, NAME, DEFAULT, DESCRIPTION) (#NAME, boost::program_options::value<std::string> (), DESCRIPTION)
#define DECLARE_SETTING(TYPE, NAME, DEFAULT, DESCRIPTION) (#NAME, po::value<std::string> (), DESCRIPTION)
/// Main commandline options related to client functionality and all parameters from Settings.
boost::program_options::options_description main_description("Main options", line_length, min_description_length);
po::options_description main_description("Main options", line_length, min_description_length);
main_description.add_options()
("help", "produce help message")
("config-file,c", boost::program_options::value<std::string>(), "config-file path")
("host,h", boost::program_options::value<std::string>()->default_value("localhost"), "server host")
("port", boost::program_options::value<int>()->default_value(9000), "server port")
("config-file,c", po::value<std::string>(), "config-file path")
("host,h", po::value<std::string>()->default_value("localhost"), "server host")
("port", po::value<int>()->default_value(9000), "server port")
("secure,s", "secure")
("user,u", boost::program_options::value<std::string>()->default_value("default"), "user")
("password", boost::program_options::value<std::string>(), "password")
("user,u", po::value<std::string>()->default_value("default"), "user")
("password", po::value<std::string>(), "password")
("ask-password", "ask-password")
("query_id", boost::program_options::value<std::string>(), "query_id")
("query,q", boost::program_options::value<std::string>(), "query")
("database,d", boost::program_options::value<std::string>(), "database")
("pager", boost::program_options::value<std::string>(), "pager")
("query_id", po::value<std::string>(), "query_id")
("query,q", po::value<std::string>(), "query")
("database,d", po::value<std::string>(), "database")
("pager", po::value<std::string>(), "pager")
("multiline,m", "multiline")
("multiquery,n", "multiquery")
("format,f", po::value<std::string>(), "default output format")
("testmode,T", "enable test hints in comments")
("ignore-error", "do not stop processing in multiquery mode")
("format,f", boost::program_options::value<std::string>(), "default output format")
("vertical,E", "vertical output format, same as --format=Vertical or FORMAT Vertical or \\G at end of command")
("time,t", "print query execution time to stderr in non-interactive mode (for benchmarks)")
("stacktrace", "print stack traces of exceptions")
("progress", "print progress even in non-interactive mode")
("version,V", "print version information and exit")
("version-clean", "print version in machine-readable format and exit")
("echo", "in batch mode, print query before execution")
("max_client_network_bandwidth", boost::program_options::value<int>(), "the maximum speed of data exchange over the network for the client in bytes per second.")
("compression", boost::program_options::value<bool>(), "enable or disable compression")
("max_client_network_bandwidth", po::value<int>(), "the maximum speed of data exchange over the network for the client in bytes per second.")
("compression", po::value<bool>(), "enable or disable compression")
("log-level", po::value<std::string>(), "client log level")
("server_logs_file", po::value<std::string>(), "put server logs into specified file")
APPLY_FOR_SETTINGS(DECLARE_SETTING)
;
#undef DECLARE_SETTING
/// Commandline options related to external tables.
boost::program_options::options_description external_description("External tables options");
po::options_description external_description("External tables options");
external_description.add_options()
("file", boost::program_options::value<std::string>(), "data file or - for stdin")
("name", boost::program_options::value<std::string>()->default_value("_data"), "name of the table")
("format", boost::program_options::value<std::string>()->default_value("TabSeparated"), "data format")
("structure", boost::program_options::value<std::string>(), "structure")
("types", boost::program_options::value<std::string>(), "types")
("file", po::value<std::string>(), "data file or - for stdin")
("name", po::value<std::string>()->default_value("_data"), "name of the table")
("format", po::value<std::string>()->default_value("TabSeparated"), "data format")
("structure", po::value<std::string>(), "structure")
("types", po::value<std::string>(), "types")
;
/// Parse main commandline options.
boost::program_options::parsed_options parsed = boost::program_options::command_line_parser(
po::parsed_options parsed = po::command_line_parser(
common_arguments.size(), common_arguments.data()).options(main_description).run();
boost::program_options::variables_map options;
boost::program_options::store(parsed, options);
po::variables_map options;
po::store(parsed, options);
if (options.count("version") || options.count("V"))
{
@ -1573,6 +1587,12 @@ public:
exit(0);
}
if (options.count("version-clean"))
{
std::cout << VERSION_STRING;
exit(0);
}
/// Output of help message.
if (options.count("help")
|| (options.count("host") && options["host"].as<std::string>() == "elp")) /// If user writes -help instead of --help.
@ -1582,14 +1602,17 @@ public:
exit(0);
}
if (options.count("log-level"))
Poco::Logger::root().setLevel(options["log-level"].as<std::string>());
size_t number_of_external_tables_with_stdin_source = 0;
for (size_t i = 0; i < external_tables_arguments.size(); ++i)
{
/// Parse commandline options related to external tables.
boost::program_options::parsed_options parsed = boost::program_options::command_line_parser(
po::parsed_options parsed = po::command_line_parser(
external_tables_arguments[i].size(), external_tables_arguments[i].data()).options(external_description).run();
boost::program_options::variables_map external_options;
boost::program_options::store(parsed, external_options);
po::variables_map external_options;
po::store(parsed, external_options);
try
{
@ -1663,6 +1686,8 @@ public:
max_client_network_bandwidth = options["max_client_network_bandwidth"].as<int>();
if (options.count("compression"))
config().setBool("compression", options["compression"].as<bool>());
if (options.count("server_logs_file"))
server_logs_file = options["server_logs_file"].as<std::string>();
}
};
@ -1682,6 +1707,11 @@ int mainEntryClickHouseClient(int argc, char ** argv)
std::cerr << "Bad arguments: " << e.what() << std::endl;
return 1;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << std::endl;
return 1;
}
return client.run();
}

View File

@ -0,0 +1,118 @@
#pragma once
#include <memory>
#include <sstream>
#include <iostream>
#include <Core/Types.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/// Checks expected server and client error codes in testmode.
/// To enable it add special comment after the query: "-- { serverError 60 }" or "-- { clientError 20 }".
class TestHint
{
public:
TestHint(bool enabled_, const String & query)
: enabled(enabled_)
{
if (!enabled_)
return;
/// TODO: This is absolutely wrong. Fragment may be contained inside string literal.
size_t pos = query.find("--");
if (pos != String::npos && query.find("--", pos + 2) != String::npos)
return; /// It's not last comment. Hint belongs to commented query. /// TODO Absolutely wrong: there maybe the following comment for the next query.
if (pos != String::npos)
{
/// TODO: This is also wrong. Comment may already have ended by line break.
pos = query.find('{', pos + 2);
if (pos != String::npos)
{
String hint = query.substr(pos + 1);
/// TODO: And this is wrong for the same reason.
pos = hint.find('}');
hint.resize(pos);
parse(hint);
}
}
}
/// @returns true if it's possible to continue without reconnect
bool checkActual(int & actual_server_error, int & actual_client_error,
bool & got_exception, std::unique_ptr<Exception> & last_exception) const
{
if (!enabled)
return true;
if (allErrorsExpected(actual_server_error, actual_client_error))
{
got_exception = false;
last_exception.reset();
actual_server_error = 0;
actual_client_error = 0;
return false;
}
if (lostExpectedError(actual_server_error, actual_client_error))
{
std::cerr << "Success when error expected. It expects server error "
<< server_error << ", client error " << client_error << "." << std::endl;
got_exception = true;
last_exception = std::make_unique<Exception>("Success when error expected", ErrorCodes::LOGICAL_ERROR); /// return error to OS
return false;
}
return true;
}
int serverError() const { return server_error; }
int clientError() const { return client_error; }
private:
bool enabled = false;
int server_error = 0;
int client_error = 0;
void parse(const String & hint)
{
std::stringstream ss;
ss << hint;
while (!ss.eof())
{
String item;
ss >> item;
if (item.empty())
break;
if (item == "serverError")
ss >> server_error;
else if (item == "clientError")
ss >> client_error;
}
}
bool allErrorsExpected(int actual_server_error, int actual_client_error) const
{
return (server_error || client_error) && (server_error == actual_server_error) && (client_error == actual_client_error);
}
bool lostExpectedError(int actual_server_error, int actual_client_error) const
{
return (server_error && !actual_server_error) || (client_error && !actual_client_error);
}
};
}

View File

@ -30,6 +30,7 @@
#include <Common/ClickHouseRevision.h>
#include <Common/formatReadable.h>
#include <Common/DNSResolver.h>
#include <Common/CurrentThread.h>
#include <Common/escapeForFileName.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Client/Connection.h>
@ -2143,6 +2144,9 @@ void ClusterCopierApp::mainImpl()
context->addDatabase(default_database, std::make_shared<DatabaseMemory>(default_database));
context->setCurrentDatabase(default_database);
/// Initialize query scope just in case.
CurrentThread::QueryScope query_scope(*context);
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, *context);
copier->setSafeMode(is_safe_mode);
copier->setCopyFaultProbability(copy_fault_probability);

View File

@ -25,6 +25,7 @@
#include <Parsers/IAST.h>
#include <common/ErrorHandlers.h>
#include <Common/StatusFile.h>
#include <Common/ThreadStatus.h>
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <TableFunctions/registerTableFunctions.h>
@ -270,6 +271,9 @@ void LocalServer::processQueries()
context->setCurrentQueryId("");
applyCmdSettings(*context);
/// Use the same query_id (and thread group) for all queries
CurrentThread::QueryScope query_scope_holder(*context);
bool echo_query = config().hasOption("echo") || config().hasOption("verbose");
std::exception_ptr exception;

View File

@ -56,6 +56,10 @@ int mainEntryClickHouseClusterCopier(int argc, char ** argv);
#if ENABLE_CLICKHOUSE_OBFUSCATOR
int mainEntryClickHouseObfuscator(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_ODBC_BRIDGE || !defined(ENABLE_CLICKHOUSE_ODBC_BRIDGE)
int mainEntryClickHouseODBCBridge(int argc, char ** argv);
#endif
#if USE_EMBEDDED_COMPILER
int mainEntryClickHouseClang(int argc, char ** argv);
@ -101,6 +105,10 @@ std::pair<const char *, MainFunc> clickhouse_applications[] =
#if ENABLE_CLICKHOUSE_OBFUSCATOR
{"obfuscator", mainEntryClickHouseObfuscator},
#endif
#if ENABLE_CLICKHOUSE_ODBC_BRIDGE || !defined(ENABLE_CLICKHOUSE_ODBC_BRIDGE)
{"odbc-bridge", mainEntryClickHouseODBCBridge},
#endif
#if USE_EMBEDDED_COMPILER
{"clang", mainEntryClickHouseClang},
{"clang++", mainEntryClickHouseClang},

View File

@ -0,0 +1,31 @@
add_library (clickhouse-odbc-bridge-lib
PingHandler.cpp
MainHandler.cpp
ColumnInfoHandler.cpp
HandlerFactory.cpp
ODBCBridge.cpp
validateODBCConnectionString.cpp
)
target_link_libraries (clickhouse-odbc-bridge-lib clickhouse_common_io daemon dbms)
target_include_directories (clickhouse-odbc-bridge-lib PUBLIC ${ClickHouse_SOURCE_DIR}/libs/libdaemon/include)
if (USE_POCO_SQLODBC)
target_link_libraries (clickhouse-odbc-bridge-lib ${Poco_SQLODBC_LIBRARY})
target_include_directories (clickhouse-odbc-bridge-lib SYSTEM PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_SQLODBC_INCLUDE_DIRS})
endif ()
if (USE_POCO_DATAODBC)
target_link_libraries (clickhouse-odbc-bridge-lib ${Poco_DataODBC_LIBRARY})
target_include_directories (clickhouse-odbc-bridge-lib SYSTEM PRIVATE ${ODBC_INCLUDE_DIRECTORIES} ${Poco_DataODBC_INCLUDE_DIRS})
endif()
if (ENABLE_TESTS)
add_subdirectory (tests)
endif ()
if (CLICKHOUSE_SPLIT_BINARY)
add_executable (clickhouse-odbc-bridge odbc-bridge.cpp)
target_link_libraries (clickhouse-odbc-bridge clickhouse-odbc-bridge-lib)
endif ()

View File

@ -0,0 +1,123 @@
#include "ColumnInfoHandler.h"
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
#include <Poco/Data/ODBC/ODBCException.h>
#include <Poco/Data/ODBC/SessionImpl.h>
#include <Poco/Data/ODBC/Utility.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <DataTypes/DataTypeFactory.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h>
#include <Common/HTMLForm.h>
#include <common/logger_useful.h>
#include <ext/scope_guard.h>
#include "validateODBCConnectionString.h"
namespace DB
{
namespace
{
DataTypePtr getDataType(SQLSMALLINT type)
{
const auto & factory = DataTypeFactory::instance();
switch (type)
{
case SQL_INTEGER:
return factory.get("Int32");
case SQL_SMALLINT:
return factory.get("Int16");
case SQL_FLOAT:
return factory.get("Float32");
case SQL_REAL:
return factory.get("Float32");
case SQL_DOUBLE:
return factory.get("Float64");
case SQL_DATETIME:
return factory.get("DateTime");
case SQL_TYPE_TIMESTAMP:
return factory.get("DateTime");
case SQL_TYPE_DATE:
return factory.get("Date");
default:
return factory.get("String");
}
}
}
void ODBCColumnsInfoHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
Poco::Net::HTMLForm params(request, request.stream());
LOG_TRACE(log, "Request URI: " + request.getURI());
auto process_error = [&response, this](const std::string & message) {
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << message << std::endl;
LOG_WARNING(log, message);
};
if (!params.has("table"))
{
process_error("No 'table' param in request URL");
return;
}
if (!params.has("connection_string"))
{
process_error("No 'connection_string' in request URL");
return;
}
std::string table_name = params.get("table");
std::string connection_string = params.get("connection_string");
LOG_TRACE(log, "Will fetch info for table '" << table_name << "'");
LOG_TRACE(log, "Got connection str '" << connection_string << "'");
try
{
Poco::Data::ODBC::SessionImpl session(validateODBCConnectionString(connection_string), DBMS_DEFAULT_CONNECT_TIMEOUT_SEC);
SQLHDBC hdbc = session.dbc().handle();
SQLHSTMT hstmt = nullptr;
if (Poco::Data::ODBC::Utility::isError(SQLAllocStmt(hdbc, &hstmt)))
throw Poco::Data::ODBC::ODBCException("Could not allocate connection handle.");
SCOPE_EXIT(SQLFreeStmt(hstmt, SQL_DROP));
/// TODO Why not do SQLColumns instead?
std::string query = "SELECT * FROM " + table_name + " WHERE 1 = 0";
if (Poco::Data::ODBC::Utility::isError(Poco::Data::ODBC::SQLPrepare(hstmt, reinterpret_cast<SQLCHAR *>(&query[0]), query.size())))
throw Poco::Data::ODBC::DescriptorException(session.dbc());
if (Poco::Data::ODBC::Utility::isError(SQLExecute(hstmt)))
throw Poco::Data::ODBC::StatementException(hstmt);
SQLSMALLINT cols = 0;
if (Poco::Data::ODBC::Utility::isError(SQLNumResultCols(hstmt, &cols)))
throw Poco::Data::ODBC::StatementException(hstmt);
/// TODO cols not checked
NamesAndTypesList columns;
for (SQLSMALLINT ncol = 1; ncol <= cols; ++ncol)
{
SQLSMALLINT type = 0;
/// TODO Why 301?
SQLCHAR column_name[301];
/// TODO Result is not checked.
Poco::Data::ODBC::SQLDescribeCol(hstmt, ncol, column_name, sizeof(column_name), NULL, &type, NULL, NULL, NULL);
columns.emplace_back(reinterpret_cast<char *>(column_name), getDataType(type));
}
WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
writeStringBinary(columns.toString(), out);
}
catch (...)
{
process_error("Error getting columns from ODBC '" + getCurrentExceptionMessage(false) + "'");
tryLogCurrentException(log);
}
}
}
#endif

View File

@ -0,0 +1,30 @@
#pragma once
#include <Common/config.h>
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequestHandler.h>
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
/** The structure of the table is taken from the query "SELECT * FROM table WHERE 1=0".
* TODO: It would be much better to utilize ODBC methods dedicated for columns description.
* If there is no such table, an exception is thrown.
*/
namespace DB
{
class ODBCColumnsInfoHandler : public Poco::Net::HTTPRequestHandler
{
public:
ODBCColumnsInfoHandler(size_t keep_alive_timeout_, std::shared_ptr<Context> context_)
: log(&Poco::Logger::get("ODBCColumnsInfoHandler")), keep_alive_timeout(keep_alive_timeout_), context(context_)
{
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
private:
Poco::Logger * log;
size_t keep_alive_timeout;
std::shared_ptr<Context> context;
};
}
#endif

View File

@ -0,0 +1,34 @@
#include "HandlerFactory.h"
#include "PingHandler.h"
#include "ColumnInfoHandler.h"
#include <Common/HTMLForm.h>
#include <Poco/Ext/SessionPoolHelpers.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <common/logger_useful.h>
namespace DB
{
Poco::Net::HTTPRequestHandler * HandlerFactory::createRequestHandler(const Poco::Net::HTTPServerRequest & request)
{
Poco::URI uri{request.getURI()};
LOG_TRACE(log, "Request URI: " + uri.toString());
if (uri.getPath() == "/ping" && request.getMethod() == Poco::Net::HTTPRequest::HTTP_GET)
return new PingHandler(keep_alive_timeout);
if (request.getMethod() == Poco::Net::HTTPRequest::HTTP_POST)
{
if (uri.getPath() == "/columns_info")
#if USE_POCO_SQLODBC || USE_POCO_DATAODBC
return new ODBCColumnsInfoHandler(keep_alive_timeout, context);
#else
return nullptr;
#endif
else
return new ODBCHandler(pool_map, keep_alive_timeout, context);
}
return nullptr;
}
}

View File

@ -0,0 +1,38 @@
#pragma once
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequestHandler.h>
#include <Poco/Net/HTTPRequestHandlerFactory.h>
#include "MainHandler.h"
#include "ColumnInfoHandler.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <Poco/Data/SessionPool.h>
#pragma GCC diagnostic pop
namespace DB
{
/** Factory for '/ping', '/' and '/columns_info' handlers.
* Also stores Session pools for ODBC connections
*/
class HandlerFactory : public Poco::Net::HTTPRequestHandlerFactory
{
public:
HandlerFactory(const std::string & name_, size_t keep_alive_timeout_, std::shared_ptr<Context> context_)
: log(&Poco::Logger::get(name_)), name(name_), keep_alive_timeout(keep_alive_timeout_), context(context_)
{
pool_map = std::make_shared<ODBCHandler::PoolMap>();
}
Poco::Net::HTTPRequestHandler * createRequestHandler(const Poco::Net::HTTPServerRequest & request) override;
private:
Poco::Logger * log;
std::string name;
size_t keep_alive_timeout;
std::shared_ptr<Context> context;
std::shared_ptr<ODBCHandler::PoolMap> pool_map;
};
}

View File

@ -0,0 +1,126 @@
#include "MainHandler.h"
#include "validateODBCConnectionString.h"
#include <memory>
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeFactory.h>
#include <Dictionaries/ODBCBlockInputStream.h>
#include <Formats/BinaryRowInputStream.h>
#include <Formats/FormatFactory.h>
#include <IO/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Poco/Ext/SessionPoolHelpers.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Common/HTMLForm.h>
#include <common/logger_useful.h>
namespace DB
{
namespace
{
std::unique_ptr<Block> parseColumns(std::string && column_string)
{
std::unique_ptr<Block> sample_block = std::make_unique<Block>();
auto names_and_types = NamesAndTypesList::parse(column_string);
for (const NameAndTypePair & column_data : names_and_types)
sample_block->insert({column_data.type, column_data.name});
return sample_block;
}
}
ODBCHandler::PoolPtr ODBCHandler::getPool(const std::string & connection_str)
{
std::lock_guard lock(mutex);
if (!pool_map->count(connection_str))
{
pool_map->emplace(connection_str, createAndCheckResizePocoSessionPool([connection_str] {
return std::make_shared<Poco::Data::SessionPool>("ODBC", validateODBCConnectionString(connection_str));
}));
}
return pool_map->at(connection_str);
}
void ODBCHandler::handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response)
{
Poco::Net::HTMLForm params(request, request.stream());
LOG_TRACE(log, "Request URI: " + request.getURI());
auto process_error = [&response, this](const std::string & message) {
response.setStatusAndReason(Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR);
if (!response.sent())
response.send() << message << std::endl;
LOG_WARNING(log, message);
};
if (!params.has("query"))
{
process_error("No 'query' in request body");
return;
}
if (!params.has("columns"))
{
process_error("No 'columns' in request URL");
return;
}
if (!params.has("connection_string"))
{
process_error("No 'connection_string' in request URL");
return;
}
size_t max_block_size = DEFAULT_BLOCK_SIZE;
if (params.has("max_block_size"))
{
std::string max_block_size_str = params.get("max_block_size", "");
if (max_block_size_str.empty())
{
process_error("Empty max_block_size specified");
return;
}
max_block_size = parse<size_t>(max_block_size_str);
}
std::string columns = params.get("columns");
std::unique_ptr<Block> sample_block;
try
{
sample_block = parseColumns(std::move(columns));
}
catch (const Exception & ex)
{
process_error("Invalid 'columns' parameter in request body '" + ex.message() + "'");
LOG_WARNING(log, ex.getStackTrace().toString());
return;
}
std::string format = params.get("format", "RowBinary");
std::string query = params.get("query");
LOG_TRACE(log, "Query: " << query);
std::string connection_string = params.get("connection_string");
LOG_TRACE(log, "Connection string: '" << connection_string << "'");
WriteBufferFromHTTPServerResponse out(request, response, keep_alive_timeout);
try
{
BlockOutputStreamPtr writer = FormatFactory::instance().getOutput(format, out, *sample_block, *context);
auto pool = getPool(connection_string);
ODBCBlockInputStream inp(pool->get(), query, *sample_block, max_block_size);
copyData(inp, *writer);
}
catch (...)
{
auto message = getCurrentExceptionMessage(true);
response.setStatusAndReason(
Poco::Net::HTTPResponse::HTTP_INTERNAL_SERVER_ERROR); // can't call process_error, bacause of too soon response sending
writeStringBinary(message, out);
tryLogCurrentException(log);
}
}
}

View File

@ -0,0 +1,49 @@
#pragma once
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <Poco/Net/HTTPRequestHandler.h>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include <Poco/Data/SessionPool.h>
#pragma GCC diagnostic pop
namespace DB
{
/** Main handler for requests to ODBC driver
* requires connection_string and columns in request params
* and also query in request body
* response in RowBinary format
*/
class ODBCHandler : public Poco::Net::HTTPRequestHandler
{
public:
using PoolPtr = std::shared_ptr<Poco::Data::SessionPool>;
using PoolMap = std::unordered_map<std::string, PoolPtr>;
ODBCHandler(std::shared_ptr<PoolMap> pool_map_,
size_t keep_alive_timeout_,
std::shared_ptr<Context> context_)
: log(&Poco::Logger::get("ODBCHandler"))
, pool_map(pool_map_)
, keep_alive_timeout(keep_alive_timeout_)
, context(context_)
{
}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
private:
Poco::Logger * log;
std::shared_ptr<PoolMap> pool_map;
size_t keep_alive_timeout;
std::shared_ptr<Context> context;
static inline std::mutex mutex;
PoolPtr getPool(const std::string & connection_str);
};
}

View File

@ -0,0 +1,205 @@
#include "ODBCBridge.h"
#include "HandlerFactory.h"
#include <string>
#include <errno.h>
#include <IO/ReadHelpers.h>
#include <boost/program_options.hpp>
#include <Poco/Net/HTTPServer.h>
#include <Poco/Net/NetException.h>
#include <Poco/String.h>
#include <Poco/Util/HelpFormatter.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/config.h>
#include <common/logger_useful.h>
#include <ext/scope_guard.h>
#include <ext/range.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
}
namespace
{
Poco::Net::SocketAddress makeSocketAddress(const std::string & host, UInt16 port, Poco::Logger * log)
{
Poco::Net::SocketAddress socket_address;
try
{
socket_address = Poco::Net::SocketAddress(host, port);
}
catch (const Poco::Net::DNSException & e)
{
const auto code = e.code();
if (code == EAI_FAMILY
#if defined(EAI_ADDRFAMILY)
|| code == EAI_ADDRFAMILY
#endif
)
{
LOG_ERROR(log,
"Cannot resolve listen_host (" << host << "), error " << e.code() << ": " << e.message()
<< ". "
"If it is an IPv6 address and your host has disabled IPv6, then consider to "
"specify IPv4 address to listen in <listen_host> element of configuration "
"file. Example: <listen_host>0.0.0.0</listen_host>");
}
throw;
}
return socket_address;
}
Poco::Net::SocketAddress socketBindListen(Poco::Net::ServerSocket & socket, const std::string & host, UInt16 port, Poco::Logger * log)
{
auto address = makeSocketAddress(host, port, log);
#if POCO_VERSION < 0x01080000
socket.bind(address, /* reuseAddress = */ true);
#else
socket.bind(address, /* reuseAddress = */ true, /* reusePort = */ false);
#endif
socket.listen(/* backlog = */ 64);
return address;
};
}
void ODBCBridge::handleHelp(const std::string &, const std::string &)
{
Poco::Util::HelpFormatter helpFormatter(options());
helpFormatter.setCommand(commandName());
helpFormatter.setHeader("HTTP-proxy for odbc requests");
helpFormatter.setUsage("--http-port <port>");
helpFormatter.format(std::cerr);
stopOptionsProcessing();
}
void ODBCBridge::defineOptions(Poco::Util::OptionSet & options)
{
options.addOption(Poco::Util::Option("http-port", "", "port to listen").argument("http-port", true).binding("http-port"));
options.addOption(
Poco::Util::Option("listen-host", "", "hostname to listen, default localhost").argument("listen-host").binding("listen-host"));
options.addOption(
Poco::Util::Option("http-timeout", "", "http timout for socket, default 1800").argument("http-timeout").binding("http-timeout"));
options.addOption(Poco::Util::Option("max-server-connections", "", "max connections to server, default 1024")
.argument("max-server-connections")
.binding("max-server-connections"));
options.addOption(Poco::Util::Option("keep-alive-timeout", "", "keepalive timeout, default 10")
.argument("keep-alive-timeout")
.binding("keep-alive-timeout"));
options.addOption(Poco::Util::Option("log-level", "", "sets log level, default info").argument("log-level").binding("logger.level"));
options.addOption(
Poco::Util::Option("log-path", "", "log path for all logs, default console").argument("log-path").binding("logger.log"));
options.addOption(Poco::Util::Option("err-log-path", "", "err log path for all logs, default no")
.argument("err-log-path")
.binding("logger.errorlog"));
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("help", "", "produce this help message")
.binding("help")
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
ServerApplication::defineOptions(options); /// Don't need complex BaseDaemon's .xml config
}
void ODBCBridge::initialize(Application & self)
{
BaseDaemon::closeFDs();
is_help = config().has("help");
if (is_help)
return;
if (!config().has("logger.log"))
config().setBool("logger.console", true);
config().setString("logger", "ODBCBridge");
buildLoggers(config());
log = &logger();
hostname = config().getString("listen-host", "localhost");
port = config().getUInt("http-port");
if (port > 0xFFFF)
throw Exception("Out of range 'http-port': " + std::to_string(port), ErrorCodes::ARGUMENT_OUT_OF_BOUND);
http_timeout = config().getUInt("http-timeout", DEFAULT_HTTP_READ_BUFFER_TIMEOUT);
max_server_connections = config().getUInt("max-server-connections", 1024);
keep_alive_timeout = config().getUInt("keep-alive-timeout", 10);
initializeTerminationAndSignalProcessing();
ServerApplication::initialize(self);
}
void ODBCBridge::uninitialize()
{
BaseDaemon::uninitialize();
}
int ODBCBridge::main(const std::vector<std::string> & /*args*/)
{
if (is_help)
return Application::EXIT_OK;
LOG_INFO(log, "Starting up");
Poco::Net::ServerSocket socket;
auto address = socketBindListen(socket, hostname, port, log);
socket.setReceiveTimeout(http_timeout);
socket.setSendTimeout(http_timeout);
Poco::ThreadPool server_pool(3, max_server_connections);
Poco::Net::HTTPServerParams::Ptr http_params = new Poco::Net::HTTPServerParams;
http_params->setTimeout(http_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);
context = std::make_shared<Context>(Context::createGlobal());
context->setGlobalContext(*context);
auto server = Poco::Net::HTTPServer(
new HandlerFactory("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context), server_pool, socket, http_params);
server.start();
LOG_INFO(log, "Listening http://" + address.toString());
SCOPE_EXIT({
LOG_DEBUG(log, "Received termination signal.");
LOG_DEBUG(log, "Waiting for current connections to close.");
server.stop();
for (size_t count : ext::range(1, 6))
{
if (server.currentConnections() == 0)
break;
LOG_DEBUG(log, "Waiting for " << server.currentConnections() << " connections, try " << count);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
});
waitForTerminationRequest();
return Application::EXIT_OK;
}
}
int mainEntryClickHouseODBCBridge(int argc, char ** argv)
{
DB::ODBCBridge app;
try
{
return app.run(argc, argv);
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << "\n";
auto code = DB::getCurrentExceptionCode();
return code ? code : 1;
}
}

View File

@ -0,0 +1,41 @@
#pragma once
#include <Interpreters/Context.h>
#include <Poco/Logger.h>
#include <daemon/BaseDaemon.h>
namespace DB
{
/** Class represents clickhouse-odbc-bridge server, which listen
* incoming HTTP POST and GET requests on specified port and host.
* Has two handlers '/' for all incoming POST requests to ODBC driver
* and /ping for GET request about service status
*/
class ODBCBridge : public BaseDaemon
{
public:
void defineOptions(Poco::Util::OptionSet & options) override;
protected:
void initialize(Application & self) override;
void uninitialize() override;
int main(const std::vector<std::string> & args) override;
private:
void handleHelp(const std::string &, const std::string &);
bool is_help;
std::string hostname;
size_t port;
size_t http_timeout;
std::string log_level;
size_t max_server_connections;
size_t keep_alive_timeout;
Poco::Logger * log;
std::shared_ptr<Context> context; /// need for settings only
};
}

View File

@ -0,0 +1,22 @@
#include "PingHandler.h"
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Common/Exception.h>
#include <IO/HTTPCommon.h>
namespace DB
{
void PingHandler::handleRequest(Poco::Net::HTTPServerRequest & /*request*/, Poco::Net::HTTPServerResponse & response)
{
try
{
setResponseDefaultHeaders(response, keep_alive_timeout);
const char * data = "Ok.\n";
response.sendBuffer(data, strlen(data));
}
catch (...)
{
tryLogCurrentException("PingHandler");
}
}
}

View File

@ -0,0 +1,17 @@
#pragma once
#include <Poco/Net/HTTPRequestHandler.h>
namespace DB
{
/** Simple ping handler, answers "Ok." to GET request
*/
class PingHandler : public Poco::Net::HTTPRequestHandler
{
public:
PingHandler(size_t keep_alive_timeout_) : keep_alive_timeout(keep_alive_timeout_) {}
void handleRequest(Poco::Net::HTTPServerRequest & request, Poco::Net::HTTPServerResponse & response) override;
private:
size_t keep_alive_timeout;
};
}

View File

@ -0,0 +1,38 @@
# clickhouse-odbc-bridge
Simple HTTP-server which works like a proxy for ODBC driver. The main motivation
was possible segfaults or another faults in ODBC implementations, which can
crash whole clickhouse-server process.
This tool works via HTTP, not via pipes, shared memory, or TCP because:
- It's simplier to implement
- It's simplier to debug
- jdbc-bridge can be implemented in the same way
## Usage
`clickhouse-server` use this tool inside odbc table function and StorageODBC.
However it can be used as standalone tool from command line with the following
parameters in POST-request URL:
- `connection_string` -- ODBC connection string.
- `columns` -- columns in ClickHouse NamesAndTypesList format, name in backticks,
type as string. Name and type are space separated, rows separated with
newline.
- `max_block_size` -- optional parameter, sets maximum size of single block.
Query is send in post body. Response is returned in RowBinary format.
## Example:
```bash
$ clickhouse-odbc-bridge --http-port 9018 --daemon
$ curl -d "query=SELECT PageID, ImpID, AdType FROM Keys ORDER BY PageID, ImpID" --data-urlencode "connection_string=DSN=ClickHouse;DATABASE=stat" --data-urlencode "columns=columns format version: 1
3 columns:
\`PageID\` String
\`ImpID\` String
\`AdType\` String
" "http://localhost:9018/" > result.txt
$ cat result.txt
12246623837185725195925621517
```

View File

@ -0,0 +1,2 @@
int mainEntryClickHouseODBCBridge(int argc, char ** argv);
int main(int argc_, char ** argv_) { return mainEntryClickHouseODBCBridge(argc_, argv_); }

View File

@ -0,0 +1,2 @@
add_executable (validate-odbc-connection-string validate-odbc-connection-string.cpp)
target_link_libraries (validate-odbc-connection-string clickhouse-odbc-bridge-lib)

View File

@ -1,6 +1,6 @@
#include <iostream>
#include <Common/Exception.h>
#include <Dictionaries/validateODBCConnectionString.h>
#include "../validateODBCConnectionString.h"
using namespace DB;

View File

@ -5,7 +5,7 @@
#include <common/find_first_symbols.h>
#include <Common/Exception.h>
#include <Common/StringUtils/StringUtils.h>
#include <Dictionaries/validateODBCConnectionString.h>
#include "validateODBCConnectionString.h"
namespace DB
@ -189,22 +189,22 @@ std::string validateODBCConnectionString(const std::string & connection_string)
{
reconstructed_connection_string += '{';
const char * pos = value.data();
const char * end = pos + value.size();
const char * value_pos = value.data();
const char * value_end = value_pos + value.size();
while (true)
{
const char * next_pos = find_first_symbols<'}'>(pos, end);
const char * next_pos = find_first_symbols<'}'>(value_pos, value_end);
if (next_pos == end)
if (next_pos == value_end)
{
reconstructed_connection_string.append(pos, next_pos - pos);
reconstructed_connection_string.append(value_pos, next_pos - value_pos);
break;
}
else
{
reconstructed_connection_string.append(pos, next_pos - pos);
reconstructed_connection_string.append(value_pos, next_pos - value_pos);
reconstructed_connection_string.append("}}");
pos = next_pos + 1;
value_pos = next_pos + 1;
}
}

View File

@ -19,7 +19,7 @@ if (CLICKHOUSE_SPLIT_BINARY)
install (TARGETS clickhouse-server ${CLICKHOUSE_ALL_TARGETS} RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
endif ()
if (ARCH_LINUX)
if (OS_LINUX)
set (GLIBC_MAX_REQUIRED 2.4)
add_test(NAME GLIBC_required_version COMMAND bash -c "readelf -s ${CMAKE_CURRENT_BINARY_DIR}/../clickhouse-server | grep '@GLIBC' | grep -oP 'GLIBC_[\\d\\.]+' | sort | uniq | sort -r | perl -lnE 'exit 1 if $_ gt q{GLIBC_${GLIBC_MAX_REQUIRED}}'")
endif ()

View File

@ -9,10 +9,11 @@
#include <ext/scope_guard.h>
#include <Common/ExternalTable.h>
#include <Core/ExternalTable.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h>
#include <Common/getFQDNOrHostName.h>
#include <Common/CurrentThread.h>
#include <IO/ReadBufferFromIStream.h>
#include <IO/ZlibInflatingReadBuffer.h>
#include <IO/ReadBufferFromString.h>
@ -208,6 +209,13 @@ void HTTPHandler::processQuery(
Poco::Net::HTTPServerResponse & response,
Output & used_output)
{
Context context = server.context();
context.setGlobalContext(server.context());
/// It will forcibly detach query even if unexpected error ocurred and detachQuery() was not called
/// Normal detaching is happen in BlockIO callbacks
CurrentThread::QueryScope query_scope_holder(context);
LOG_TRACE(log, "Request URI: " << request.getURI());
std::istream & istr = request.stream();
@ -257,14 +265,9 @@ void HTTPHandler::processQuery(
}
std::string query_id = params.get("query_id", "");
const auto & config = server.config();
Context context = server.context();
context.setGlobalContext(server.context());
context.setUser(user, password, request.clientAddress(), quota_key);
context.setCurrentQueryId(query_id);
CurrentThread::attachQueryContext(context);
/// The user could specify session identifier and session timeout.
/// It allows to modify settings, create temporary tables and reuse them in subsequent requests.
@ -273,6 +276,7 @@ void HTTPHandler::processQuery(
String session_id;
std::chrono::steady_clock::duration session_timeout;
bool session_is_set = params.has("session_id");
const auto & config = server.config();
if (session_is_set)
{
@ -421,34 +425,45 @@ void HTTPHandler::processQuery(
std::unique_ptr<ReadBuffer> in;
// Used in case of POST request with form-data, but it not to be expectd to be deleted after that scope
static const NameSet reserved_param_names{"query", "compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace",
"buffer_size", "wait_end_of_query", "session_id", "session_timeout", "session_check"};
Names reserved_param_suffixes;
auto param_could_be_skipped = [&] (const String & name)
{
if (reserved_param_names.count(name))
return true;
for (const String & suffix : reserved_param_suffixes)
{
if (endsWith(name, suffix))
return true;
}
return false;
};
/// Used in case of POST request with form-data, but it isn't expected to be deleted after that scope.
std::string full_query;
/// Support for "external data for query processing".
if (startsWith(request.getContentType().data(), "multipart/form-data"))
{
ExternalTablesHandler handler(context, params);
/// Params are of both form params POST and uri (GET params)
params.load(request, istr, handler);
for (const auto & it : params)
{
if (it.first == "query")
{
full_query += it.second;
}
}
in = std::make_unique<ReadBufferFromString>(full_query);
/// Skip unneeded parameters to avoid confusing them later with context settings or query parameters.
reserved_param_suffixes.emplace_back("_format");
reserved_param_suffixes.emplace_back("_types");
reserved_param_suffixes.emplace_back("_structure");
/// Erase unneeded parameters to avoid confusing them later with context settings or query
/// parameters.
for (const auto & it : handler.names)
{
params.erase(it + "_format");
params.erase(it + "_types");
params.erase(it + "_structure");
}
/// Params are of both form params POST and uri (GET params)
for (const auto & it : params)
if (it.first == "query")
full_query += it.second;
in = std::make_unique<ReadBufferFromString>(full_query);
}
else
in = std::make_unique<ConcatReadBuffer>(*in_param, *in_post_maybe_compressed);
@ -475,11 +490,6 @@ void HTTPHandler::processQuery(
auto readonly_before_query = settings.readonly;
NameSet reserved_param_names{"query", "compress", "decompress", "user", "password", "quota_key", "query_id", "stacktrace",
"buffer_size", "wait_end_of_query",
"session_id", "session_timeout", "session_check"
};
for (auto it = params.begin(); it != params.end(); ++it)
{
if (it->first == "database")
@ -490,7 +500,7 @@ void HTTPHandler::processQuery(
{
context.setDefaultFormat(it->second);
}
else if (reserved_param_names.find(it->first) != reserved_param_names.end())
else if (param_could_be_skipped(it->first))
{
}
else

View File

@ -81,7 +81,7 @@ void MetricsTransmitter::transmit(std::vector<ProfileEvents::Count> & prev_count
{
for (size_t i = 0, end = ProfileEvents::end(); i < end; ++i)
{
const auto counter = ProfileEvents::counters[i].load(std::memory_order_relaxed);
const auto counter = ProfileEvents::global_counters[i].load(std::memory_order_relaxed);
const auto counter_increment = counter - prev_counters[i];
prev_counters[i] = counter;

View File

@ -22,6 +22,7 @@
#include <Common/getFQDNOrHostName.h>
#include <Common/getMultipleKeysFromConfig.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/TaskStatsInfoGetter.h>
#include <IO/HTTPCommon.h>
#include <Interpreters/AsynchronousMetrics.h>
#include <Interpreters/DDLWorker.h>
@ -301,6 +302,9 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (config().has("max_table_size_to_drop"))
global_context->setMaxTableSizeToDrop(config().getUInt64("max_table_size_to_drop"));
if (config().has("max_partition_size_to_drop"))
global_context->setMaxPartitionSizeToDrop(config().getUInt64("max_partition_size_to_drop"));
/// Size of cache for uncompressed blocks. Zero means disabled.
size_t uncompressed_cache_size = config().getUInt64("uncompressed_cache_size", 0);
if (uncompressed_cache_size)
@ -362,6 +366,13 @@ int Server::main(const std::vector<std::string> & /*args*/)
dns_cache_updater = std::make_unique<DNSCacheUpdater>(*global_context);
}
if (!TaskStatsInfoGetter::checkProcessHasRequiredPermissions())
{
LOG_INFO(log, "It looks like the process has not CAP_NET_ADMIN capability, some performance statistics will be disabled."
" It could happen due to incorrect clickhouse package installation."
" You could resolve the problem manually calling 'sudo setcap cap_net_admin=+ep /usr/bin/clickhouse'");
}
{
Poco::Timespan keep_alive_timeout(config().getUInt("keep_alive_timeout", 10), 0);

View File

@ -1,7 +1,15 @@
#include "TCPHandler.h"
#include <iomanip>
#include <ext/scope_guard.h>
#include <Poco/Net/NetException.h>
#include <daemon/OwnSplitChannel.h>
#include <Common/ClickHouseRevision.h>
#include <Common/CurrentThread.h>
#include <Common/Stopwatch.h>
#include <Common/ClickHouseRevision.h>
#include <Common/Stopwatch.h>
#include <Common/NetException.h>
#include <Common/config_version.h>
#include <IO/Progress.h>
#include <IO/CompressedReadBuffer.h>
#include <IO/CompressedWriteBuffer.h>
@ -15,14 +23,13 @@
#include <Interpreters/executeQuery.h>
#include <Interpreters/Quota.h>
#include <Interpreters/TablesStatus.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Storages/StorageMemory.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Common/ClickHouseRevision.h>
#include <Common/Stopwatch.h>
#include <Common/ExternalTable.h>
#include <Common/NetException.h>
#include <Common/config_version.h>
#include <ext/scope_guard.h>
#include <Core/ExternalTable.h>
#include "TCPHandler.h"
namespace DB
{
@ -140,13 +147,29 @@ void TCPHandler::runImpl()
if (!receivePacket())
continue;
/// Get blocks of temporary tables
readData(global_settings);
CurrentThread::initializeQuery();
/// Reset the input stream, as we received an empty block while receiving external table data.
/// So, the stream has been marked as cancelled and we can't read from it anymore.
state.block_in.reset();
state.maybe_compressed_in.reset(); /// For more accurate accounting by MemoryTracker.
/// Should we send internal logs to client?
if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
&& query_context.getSettingsRef().send_logs_level.value != "none")
{
state.logs_queue = std::make_shared<InternalTextLogsQueue>();
state.logs_queue->max_priority = Poco::Logger::parseLevel(query_context.getSettingsRef().send_logs_level.value);
CurrentThread::attachInternalTextLogsQueue(state.logs_queue);
}
query_context.setExternalTablesInitializer([&global_settings, this] (Context & context) {
if (&context != &query_context)
throw Exception("Unexpected context in external tables initializer", ErrorCodes::LOGICAL_ERROR);
/// Get blocks of temporary tables
readData(global_settings);
/// Reset the input stream, as we received an empty block while receiving external table data.
/// So, the stream has been marked as cancelled and we can't read from it anymore.
state.block_in.reset();
state.maybe_compressed_in.reset(); /// For more accurate accounting by MemoryTracker.
});
/// Processing Query
state.io = executeQuery(state.query, query_context, false, state.stage);
@ -163,8 +186,9 @@ void TCPHandler::runImpl()
else
processOrdinaryQuery();
sendEndOfStream();
sendLogs();
sendEndOfStream();
state.reset();
}
catch (const Exception & e)
@ -209,7 +233,20 @@ void TCPHandler::runImpl()
try
{
if (exception)
{
try
{
/// Try to send logs to client, but it could be risky too
/// Assume that we can't break output here
sendLogs();
}
catch (...)
{
tryLogCurrentException(log, "Can't send logs to client");
}
sendException(*exception);
}
}
catch (...)
{
@ -220,6 +257,9 @@ void TCPHandler::runImpl()
try
{
/// It will forcibly detach query even if unexpected error ocсurred and detachQuery() was not called
CurrentThread::detachQueryIfNotDetached();
state.reset();
}
catch (...)
@ -252,12 +292,14 @@ void TCPHandler::readData(const Settings & global_settings)
constexpr size_t min_poll_interval = 5000; // 5 ms
size_t poll_interval = std::max(min_poll_interval, std::min(default_poll_interval, current_poll_interval));
while (1)
sendLogs();
while (true)
{
Stopwatch watch(CLOCK_MONOTONIC_COARSE);
/// We are waiting for a packet from the client. Thus, every `POLL_INTERVAL` seconds check whether we need to shut down.
while (1)
while (true)
{
if (static_cast<ReadBufferFromPocoSocket &>(*in).poll(poll_interval))
break;
@ -289,6 +331,8 @@ void TCPHandler::readData(const Settings & global_settings)
/// We accept and process data. And if they are over, then we leave.
if (!receivePacket())
break;
sendLogs();
}
}
@ -346,6 +390,8 @@ void TCPHandler::processOrdinaryQuery()
sendProgress();
}
sendLogs();
if (async_in.poll(query_context.getSettingsRef().interactive_delay / 1000))
{
/// There is the following result block.
@ -368,6 +414,7 @@ void TCPHandler::processOrdinaryQuery()
sendExtremes();
sendProfileInfo();
sendProgress();
sendLogs();
}
sendData(block);
@ -692,11 +739,14 @@ void TCPHandler::initBlockOutput(const Block & block)
{
if (!state.block_out)
{
if (state.compression == Protocol::Compression::Enable)
state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
*out, CompressionSettings(query_context.getSettingsRef()));
else
state.maybe_compressed_out = out;
if (!state.maybe_compressed_out)
{
if (state.compression == Protocol::Compression::Enable)
state.maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(
*out, CompressionSettings(query_context.getSettingsRef()));
else
state.maybe_compressed_out = out;
}
state.block_out = std::make_shared<NativeBlockOutputStream>(
*state.maybe_compressed_out,
@ -705,6 +755,18 @@ void TCPHandler::initBlockOutput(const Block & block)
}
}
void TCPHandler::initLogsBlockOutput(const Block & block)
{
if (!state.logs_block_out)
{
/// Use uncompressed stream since log blocks usually contain only one row
state.logs_block_out = std::make_shared<NativeBlockOutputStream>(
*out,
client_revision,
block.cloneEmpty());
}
}
bool TCPHandler::isQueryCancelled()
{
@ -745,6 +807,7 @@ void TCPHandler::sendData(const Block & block)
initBlockOutput(block);
writeVarUInt(Protocol::Server::Data, *out);
/// Send external table name (empty name is the main table)
writeStringBinary("", *out);
state.block_out->write(block);
@ -753,6 +816,19 @@ void TCPHandler::sendData(const Block & block)
}
void TCPHandler::sendLogData(const Block & block)
{
initLogsBlockOutput(block);
writeVarUInt(Protocol::Server::Log, *out);
/// Send log tag (empty tag is the default tag)
writeStringBinary("", *out);
state.logs_block_out->write(block);
out->next();
}
void TCPHandler::sendException(const Exception & e)
{
writeVarUInt(Protocol::Server::Exception, *out);
@ -784,6 +860,37 @@ void TCPHandler::sendProgress()
}
void TCPHandler::sendLogs()
{
if (!state.logs_queue)
return;
MutableColumns logs_columns;
MutableColumns curr_logs_columns;
size_t rows = 0;
for (; state.logs_queue->tryPop(curr_logs_columns); ++rows)
{
if (rows == 0)
{
logs_columns = std::move(curr_logs_columns);
}
else
{
for (size_t j = 0; j < logs_columns.size(); ++j)
logs_columns[j]->insertRangeFrom(*curr_logs_columns[j], 0, curr_logs_columns[j]->size());
}
}
if (rows > 0)
{
Block block = InternalTextLogsQueue::getSampleBlock();
block.setColumns(std::move(logs_columns));
sendLogData(block);
}
}
void TCPHandler::run()
{
try

View File

@ -5,16 +5,18 @@
#include <Common/getFQDNOrHostName.h>
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <IO/Progress.h>
#include <Core/Protocol.h>
#include <Core/QueryProcessingStage.h>
#include <DataStreams/BlockIO.h>
#include <IO/Progress.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <DataStreams/BlockIO.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Client/TimeoutSetter.h>
#include "IServer.h"
namespace CurrentMetrics
{
extern const Metric TCPConnection;
@ -63,6 +65,9 @@ struct QueryState
/// Timeouts setter for current query
std::unique_ptr<TimeoutSetter> timeout_setter;
/// A queue with internal logs that will be passed to client
InternalTextLogsQueuePtr logs_queue;
BlockOutputStreamPtr logs_block_out;
void reset()
{
@ -140,8 +145,10 @@ private:
void sendHello();
void sendData(const Block & block); /// Write a block to the network.
void sendLogData(const Block & block);
void sendException(const Exception & e);
void sendProgress();
void sendLogs();
void sendEndOfStream();
void sendProfileInfo();
void sendTotals();
@ -150,6 +157,7 @@ private:
/// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled.
void initBlockInput();
void initBlockOutput(const Block & block);
void initLogsBlockOutput(const Block & block);
bool isQueryCancelled();

View File

@ -0,0 +1 @@
<yandex><listen_host>0.0.0.0</listen_host></yandex>

View File

@ -322,10 +322,12 @@
<!-- Protection from accidental DROP.
If size of a MergeTree table is greater than max_table_size_to_drop (in bytes) than table could not be dropped with any DROP query.
If you want do delete one table and don't want to restart clickhouse-server, you could create special file <clickhouse-path>/flags/force_drop_table and make DROP once.
By default max_table_size_to_drop is 50GB, max_table_size_to_drop=0 allows to DROP any tables.
By default max_table_size_to_drop is 50GB; max_table_size_to_drop=0 allows to DROP any tables.
The same for max_partition_size_to_drop.
Uncomment to disable protection.
-->
<!-- <max_table_size_to_drop>0</max_table_size_to_drop> -->
<!-- <max_partition_size_to_drop>0</max_partition_size_to_drop> -->
<!-- Example of parameters for GraphiteMergeTree table engine -->
<graphite_rollup_example>

View File

@ -13,6 +13,7 @@
#include <Common/typeid_cast.h>
#include <Poco/String.h>
#include <DataTypes/DataTypeWithDictionary.h>
namespace DB
@ -41,6 +42,20 @@ void AggregateFunctionFactory::registerFunction(const String & name, Creator cre
ErrorCodes::LOGICAL_ERROR);
}
static DataTypes convertTypesWithDictionaryToNested(const DataTypes & types)
{
DataTypes res_types;
res_types.reserve(types.size());
for (const auto & type : types)
{
if (auto * type_with_dict = typeid_cast<const DataTypeWithDictionary *>(type.get()))
res_types.push_back(type_with_dict->getDictionaryType());
else
res_types.push_back(type);
}
return res_types;
}
AggregateFunctionPtr AggregateFunctionFactory::get(
const String & name,
@ -48,6 +63,8 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
const Array & parameters,
int recursion_level) const
{
auto type_without_dictionary = convertTypesWithDictionaryToNested(argument_types);
/// If one of types is Nullable, we apply aggregate function combinator "Null".
if (std::any_of(argument_types.begin(), argument_types.end(),
@ -57,7 +74,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
if (!combinator)
throw Exception("Logical error: cannot find aggregate function combinator to apply a function to Nullable arguments.", ErrorCodes::LOGICAL_ERROR);
DataTypes nested_types = combinator->transformArguments(argument_types);
DataTypes nested_types = combinator->transformArguments(type_without_dictionary);
AggregateFunctionPtr nested_function;
@ -70,7 +87,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
return combinator->transformAggregateFunction(nested_function, argument_types, parameters);
}
auto res = getImpl(name, argument_types, parameters, recursion_level);
auto res = getImpl(name, type_without_dictionary, parameters, recursion_level);
if (!res)
throw Exception("Logical error: AggregateFunctionFactory returned nullptr", ErrorCodes::LOGICAL_ERROR);
return res;

View File

@ -0,0 +1,30 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionRetention.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
namespace DB
{
namespace
{
AggregateFunctionPtr createAggregateFunctionRetention(const std::string & name, const DataTypes & arguments, const Array & params)
{
assertNoParameters(name, params);
if (arguments.size() > AggregateFunctionRetentionData::max_events )
throw Exception("Too many event arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
return std::make_shared<AggregateFunctionRetention>(arguments);
}
}
void registerAggregateFunctionRetention(AggregateFunctionFactory & factory)
{
factory.registerFunction("retention", createAggregateFunctionRetention, AggregateFunctionFactory::CaseInsensitive);
}
}

View File

@ -0,0 +1,150 @@
#pragma once
#include <iostream>
#include <sstream>
#include <unordered_set>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeArray.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/ArenaAllocator.h>
#include <Common/typeid_cast.h>
#include <ext/range.h>
#include <bitset>
#include <AggregateFunctions/IAggregateFunction.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
}
struct AggregateFunctionRetentionData
{
static constexpr auto max_events = 32;
using Events = std::bitset<max_events>;
Events events;
void add(UInt8 event)
{
events.set(event);
}
void merge(const AggregateFunctionRetentionData & other)
{
events |= other.events;
}
void serialize(WriteBuffer & buf) const
{
UInt32 event_value = events.to_ulong();
writeBinary(event_value, buf);
}
void deserialize(ReadBuffer & buf)
{
UInt32 event_value;
readBinary(event_value, buf);
events = event_value;
}
};
/**
* The max size of events is 32, that's enough for retention analytics
*
* Usage:
* - retention(cond1, cond2, cond3, ....)
* - returns [cond1_flag, cond1_flag && cond2_flag, cond1_flag && cond3_flag, ...]
*/
class AggregateFunctionRetention final
: public IAggregateFunctionDataHelper<AggregateFunctionRetentionData, AggregateFunctionRetention>
{
private:
UInt8 events_size;
public:
String getName() const override
{
return "retention";
}
AggregateFunctionRetention(const DataTypes & arguments)
{
for (const auto i : ext::range(0, arguments.size()))
{
auto cond_arg = arguments[i].get();
if (!typeid_cast<const DataTypeUInt8 *>(cond_arg))
throw Exception{"Illegal type " + cond_arg->getName() + " of argument " + toString(i) + " of aggregate function "
+ getName() + ", must be UInt8",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
}
events_size = arguments.size();
}
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeArray>(std::make_shared<DataTypeUInt8>());
}
void add(AggregateDataPtr place, const IColumn ** columns, const size_t row_num, Arena *) const override
{
for (const auto i : ext::range(0, events_size))
{
auto event = static_cast<const ColumnVector<UInt8> *>(columns[i])->getData()[row_num];
if (event)
{
this->data(place).add(i);
break;
}
}
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
this->data(place).merge(this->data(rhs));
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
this->data(place).serialize(buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
this->data(place).deserialize(buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
auto & data_to = static_cast<ColumnArray &>(to).getData();
auto & offsets_to = static_cast<ColumnArray &>(to).getOffsets();
const bool first_flag = this->data(place).events.test(0);
data_to.insert(first_flag ? Field(static_cast<UInt64>(1)) : Field(static_cast<UInt64>(0)));
for (const auto i : ext::range(1, events_size))
{
if (first_flag && this->data(place).events.test(i))
data_to.insert(Field(static_cast<UInt64>(1)));
else
data_to.insert(Field(static_cast<UInt64>(0)));
}
offsets_to.push_back(offsets_to.size() == 0 ? events_size : offsets_to.back() + events_size);
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
};
}

View File

@ -77,7 +77,7 @@ struct QuantileExact
return array[n];
}
return Value();
return std::numeric_limits<Value>::quiet_NaN();
}
/// Get the `size` values of `levels` quantiles. Write `size` results starting with `result` address.

View File

@ -72,7 +72,7 @@ struct QuantileExactWeighted
size_t size = map.size();
if (0 == size)
return Value();
return std::numeric_limits<Value>::quiet_NaN();
/// Copy the data to a temporary array to get the element you need in order.
using Pair = typename Map::value_type;

View File

@ -34,6 +34,7 @@ void registerAggregateFunctionCombinatorMerge(AggregateFunctionCombinatorFactory
void registerAggregateFunctionCombinatorNull(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionHistogram(AggregateFunctionFactory & factory);
void registerAggregateFunctionRetention(AggregateFunctionFactory & factory);
void registerAggregateFunctions()
{
@ -59,6 +60,7 @@ void registerAggregateFunctions()
registerAggregateFunctionsBitwise(factory);
registerAggregateFunctionsMaxIntersections(factory);
registerAggregateFunctionHistogram(factory);
registerAggregateFunctionRetention(factory);
}
{

View File

@ -114,6 +114,7 @@ void Connection::disconnect()
//LOG_TRACE(log_wrapper.get(), "Disconnecting");
in = nullptr;
last_input_packet_type.reset();
out = nullptr; // can write to socket
if (socket)
socket->close();
@ -379,6 +380,7 @@ void Connection::sendQuery(
maybe_compressed_in.reset();
maybe_compressed_out.reset();
block_in.reset();
block_logs_in.reset();
block_out.reset();
/// Send empty block which means end of data.
@ -506,20 +508,50 @@ bool Connection::poll(size_t timeout_microseconds)
}
bool Connection::hasReadBufferPendingData() const
bool Connection::hasReadPendingData() const
{
return static_cast<const ReadBufferFromPocoSocket &>(*in).hasPendingData();
return last_input_packet_type.has_value() || static_cast<const ReadBufferFromPocoSocket &>(*in).hasPendingData();
}
std::optional<UInt64> Connection::checkPacket(size_t timeout_microseconds)
{
if (last_input_packet_type.has_value())
return last_input_packet_type;
if (hasReadPendingData() || poll(timeout_microseconds))
{
// LOG_TRACE(log_wrapper.get(), "Receiving packet type");
UInt64 packet_type;
readVarUInt(packet_type, *in);
last_input_packet_type.emplace(packet_type);
return last_input_packet_type;
}
return {};
}
Connection::Packet Connection::receivePacket()
{
//LOG_TRACE(log_wrapper.get(), "Receiving packet");
try
{
Packet res;
readVarUInt(res.type, *in);
/// Have we already read packet type?
if (last_input_packet_type)
{
res.type = *last_input_packet_type;
last_input_packet_type.reset();
}
else
{
//LOG_TRACE(log_wrapper.get(), "Receiving packet type");
readVarUInt(res.type, *in);
}
//LOG_TRACE(log_wrapper.get(), "Receiving packet " << res.type << " " << Protocol::Server::toString(res.type));
switch (res.type)
{
@ -549,6 +581,10 @@ Connection::Packet Connection::receivePacket()
res.block = receiveData();
return res;
case Protocol::Server::Log:
res.block = receiveLogData();
return res;
case Protocol::Server::EndOfStream:
return res;
@ -576,14 +612,26 @@ Block Connection::receiveData()
//LOG_TRACE(log_wrapper.get(), "Receiving data");
initBlockInput();
return receiveDataImpl(block_in);
}
Block Connection::receiveLogData()
{
initBlockLogsInput();
return receiveDataImpl(block_logs_in);
}
Block Connection::receiveDataImpl(BlockInputStreamPtr & stream)
{
String external_table_name;
readStringBinary(external_table_name, *in);
size_t prev_bytes = in->count();
/// Read one block from network.
Block res = block_in->read();
Block res = stream->read();
if (throttler)
throttler->add(in->count() - prev_bytes);
@ -592,20 +640,39 @@ Block Connection::receiveData()
}
void Connection::initInputBuffers()
{
}
void Connection::initBlockInput()
{
if (!block_in)
{
if (compression == Protocol::Compression::Enable)
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in);
else
maybe_compressed_in = in;
if (!maybe_compressed_in)
{
if (compression == Protocol::Compression::Enable)
maybe_compressed_in = std::make_shared<CompressedReadBuffer>(*in);
else
maybe_compressed_in = in;
}
block_in = std::make_shared<NativeBlockInputStream>(*maybe_compressed_in, server_revision);
}
}
void Connection::initBlockLogsInput()
{
if (!block_logs_in)
{
/// Have to return superset of SystemLogsQueue::getSampleBlock() columns
block_logs_in = std::make_shared<NativeBlockInputStream>(*in, server_revision);
}
}
void Connection::setDescription()
{
auto resolved_address = getResolvedAddress();

View File

@ -23,6 +23,7 @@
#include <Interpreters/TablesStatus.h>
#include <atomic>
#include <optional>
namespace DB
@ -138,7 +139,10 @@ public:
bool poll(size_t timeout_microseconds = 0);
/// Check, if has data in read buffer.
bool hasReadBufferPendingData() const;
bool hasReadPendingData() const;
/// Checks if there is input data in connection and reads packet ID.
std::optional<UInt64> checkPacket(size_t timeout_microseconds = 0);
/// Receive packet from server.
Packet receivePacket();
@ -195,6 +199,7 @@ private:
std::unique_ptr<Poco::Net::StreamSocket> socket;
std::shared_ptr<ReadBuffer> in;
std::shared_ptr<WriteBuffer> out;
std::optional<UInt64> last_input_packet_type;
String query_id;
Protocol::Compression compression; /// Enable data compression for communication.
@ -214,6 +219,7 @@ private:
/// From where to read query execution result.
std::shared_ptr<ReadBuffer> maybe_compressed_in;
BlockInputStreamPtr block_in;
BlockInputStreamPtr block_logs_in;
/// Where to write data for INSERT.
std::shared_ptr<WriteBuffer> maybe_compressed_out;
@ -249,11 +255,16 @@ private:
bool ping();
Block receiveData();
Block receiveLogData();
Block receiveDataImpl(BlockInputStreamPtr & stream);
std::unique_ptr<Exception> receiveException();
Progress receiveProgress();
BlockStreamProfileInfo receiveProfileInfo();
void initInputBuffers();
void initBlockInput();
void initBlockLogsInput();
void throwUnexpectedPacket(UInt64 packet_type, const char * expected) const;
};

View File

@ -226,6 +226,6 @@ ConnectionPoolWithFailover::tryGetEntry(
}
}
return result;
};
}
}

View File

@ -247,6 +247,7 @@ Connection::Packet MultiplexedConnections::receivePacketUnlocked()
case Protocol::Server::ProfileInfo:
case Protocol::Server::Totals:
case Protocol::Server::Extremes:
case Protocol::Server::Log:
break;
case Protocol::Server::EndOfStream:
@ -276,7 +277,7 @@ MultiplexedConnections::ReplicaState & MultiplexedConnections::getReplicaForRead
for (const ReplicaState & state : replica_states)
{
Connection * connection = state.connection;
if ((connection != nullptr) && connection->hasReadBufferPendingData())
if ((connection != nullptr) && connection->hasReadPendingData())
read_list.push_back(*connection->socket);
}

View File

@ -3,10 +3,7 @@
#include <Common/config.h>
#if USE_ICU
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wold-style-cast"
#include <unicode/ucol.h>
#pragma GCC diagnostic pop
#else
#ifdef __clang__
#pragma clang diagnostic push

View File

@ -4,6 +4,7 @@
#include <IO/WriteBufferFromArena.h>
#include <Common/SipHash.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnsCommon.h>
namespace DB
{
@ -161,6 +162,25 @@ ColumnPtr ColumnAggregateFunction::permute(const Permutation & perm, size_t limi
return std::move(res);
}
ColumnPtr ColumnAggregateFunction::index(const IColumn & indexes, size_t limit) const
{
return selectIndexImpl(*this, indexes, limit);
}
template <typename Type>
ColumnPtr ColumnAggregateFunction::indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const
{
auto res = createView();
res->getData().resize(limit);
for (size_t i = 0; i < limit; ++i)
res->getData()[i] = getData()[indexes[i]];
return std::move(res);
}
INSTANTIATE_INDEX_IMPL(ColumnAggregateFunction);
/// Is required to support operations with Set
void ColumnAggregateFunction::updateHashWithValue(size_t n, SipHash & hash) const
{

View File

@ -156,6 +156,11 @@ public:
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
template <typename Type>
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;
ColumnPtr replicate(const Offsets & offsets) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override;

View File

@ -626,6 +626,44 @@ ColumnPtr ColumnArray::permute(const Permutation & perm, size_t limit) const
return std::move(res);
}
ColumnPtr ColumnArray::index(const IColumn & indexes, size_t limit) const
{
return selectIndexImpl(*this, indexes, limit);
}
template <typename T>
ColumnPtr ColumnArray::indexImpl(const PaddedPODArray<T> & indexes, size_t limit) const
{
if (limit == 0)
return ColumnArray::create(data);
/// Convert indexes to UInt64 in case of overflow.
auto nested_indexes_column = ColumnUInt64::create();
PaddedPODArray<UInt64> & nested_indexes = nested_indexes_column->getData();
nested_indexes.reserve(getOffsets().back());
auto res = ColumnArray::create(data->cloneEmpty());
Offsets & res_offsets = res->getOffsets();
res_offsets.resize(limit);
size_t current_offset = 0;
for (size_t i = 0; i < limit; ++i)
{
for (size_t j = 0; j < sizeAt(indexes[i]); ++j)
nested_indexes.push_back(offsetAt(indexes[i]) + j);
current_offset += sizeAt(indexes[i]);
res_offsets[i] = current_offset;
}
if (current_offset != 0)
res->data = data->index(*nested_indexes_column, current_offset);
return std::move(res);
}
INSTANTIATE_INDEX_IMPL(ColumnArray);
void ColumnArray::getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const
{
size_t s = size();

View File

@ -71,6 +71,8 @@ public:
void popBack(size_t n) override;
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
template <typename Type> ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;
int compareAt(size_t n, size_t m, const IColumn & rhs_, int nan_direction_hint) const override;
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
void reserve(size_t n) override;

View File

@ -30,6 +30,11 @@ ColumnPtr ColumnConst::convertToFullColumn() const
return data->replicate(Offsets(1, s));
}
ColumnPtr ColumnConst::removeLowCardinality() const
{
return ColumnConst::create(data->convertToFullColumnIfWithDictionary(), s);
}
ColumnPtr ColumnConst::filter(const Filter & filt, ssize_t /*result_size_hint*/) const
{
if (s != filt.size())
@ -63,6 +68,18 @@ ColumnPtr ColumnConst::permute(const Permutation & perm, size_t limit) const
return ColumnConst::create(data, limit);
}
ColumnPtr ColumnConst::index(const IColumn & indexes, size_t limit) const
{
if (limit == 0)
limit = indexes.size();
if (indexes.size() < limit)
throw Exception("Size of indexes (" + toString(indexes.size()) + ") is less than required (" + toString(limit) + ")",
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
return ColumnConst::create(data, limit);
}
MutableColumns ColumnConst::scatter(ColumnIndex num_columns, const Selector & selector) const
{
if (s != selector.size())

View File

@ -36,6 +36,8 @@ public:
return convertToFullColumn();
}
ColumnPtr removeLowCardinality() const;
std::string getName() const override
{
return "Const(" + data->getName() + ")";
@ -153,6 +155,7 @@ public:
ColumnPtr filter(const Filter & filt, ssize_t result_size_hint) const override;
ColumnPtr replicate(const Offsets & offsets) const override;
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
void getPermutation(bool reverse, size_t limit, int nan_direction_hint, Permutation & res) const override;
size_t byteSize() const override

View File

@ -1,4 +1,5 @@
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnsCommon.h>
#include <Common/Arena.h>
#include <Common/SipHash.h>
@ -258,6 +259,32 @@ ColumnPtr ColumnFixedString::permute(const Permutation & perm, size_t limit) con
return std::move(res);
}
ColumnPtr ColumnFixedString::index(const IColumn & indexes, size_t limit) const
{
return selectIndexImpl(*this, indexes, limit);
}
template <typename Type>
ColumnPtr ColumnFixedString::indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const
{
if (limit == 0)
return ColumnFixedString::create(n);
auto res = ColumnFixedString::create(n);
Chars_t & res_chars = res->chars;
res_chars.resize(n * limit);
size_t offset = 0;
for (size_t i = 0; i < limit; ++i, offset += n)
memcpySmallAllowReadWriteOverflow15(&res_chars[offset], &chars[indexes[i] * n], n);
return std::move(res);
}
ColumnPtr ColumnFixedString::replicate(const Offsets & offsets) const
{
size_t col_size = size();

View File

@ -108,6 +108,11 @@ public:
ColumnPtr permute(const Permutation & perm, size_t limit) const override;
ColumnPtr index(const IColumn & indexes, size_t limit) const override;
template <typename Type>
ColumnPtr indexImpl(const PaddedPODArray<Type> & indexes, size_t limit) const;
ColumnPtr replicate(const Offsets & offsets) const override;
MutableColumns scatter(ColumnIndex num_columns, const Selector & selector) const override

View File

@ -88,6 +88,15 @@ ColumnPtr ColumnFunction::permute(const Permutation & perm, size_t limit) const
return ColumnFunction::create(limit, function, capture);
}
ColumnPtr ColumnFunction::index(const IColumn & indexes, size_t limit) const
{
ColumnsWithTypeAndName capture = captured_columns;
for (auto & column : capture)
column.column = column.column->index(indexes, limit);
return ColumnFunction::create(limit, function, capture);
}
std::vector<MutableColumnPtr> ColumnFunction::scatter(IColumn::ColumnIndex num_columns,
const IColumn::Selector & selector) const
{

Some files were not shown because too many files have changed in this diff Show More