Merge branch 'master' into decimal

This commit is contained in:
chertus 2018-08-06 13:38:18 +03:00
commit c590a74981
105 changed files with 1185 additions and 759 deletions

View File

@ -3,26 +3,6 @@ language: generic
matrix:
fast_finish: true
include:
# - os: linux
#
# cache:
# ccache: true
# timeout: 1000
#
# addons:
# apt:
# update: true
# sources:
# - ubuntu-toolchain-r-test
# packages: [ g++-7, libicu-dev, libreadline-dev, libmysqlclient-dev, unixodbc-dev, libltdl-dev, libssl-dev, libboost-dev, zlib1g-dev, libdouble-conversion-dev, libsparsehash-dev, librdkafka-dev, libcapnp-dev, libsparsehash-dev, libgoogle-perftools-dev, bash, expect, python, python-lxml, python-termcolor, curl, perl, sudo, openssl ]
#
# env:
# - MATRIX_EVAL="export CC=gcc-7 && export CXX=g++-7"
#
# script:
# - env TEST_RUN= utils/travis/normal.sh
# We need to have gcc7 headers to compile c++17 code on clang
- os: linux
@ -41,33 +21,11 @@ matrix:
packages: [ ninja-build, g++-7, clang-5.0, lld-5.0, libicu-dev, libreadline-dev, libmysqlclient-dev, unixodbc-dev, libltdl-dev, libssl-dev, libboost-dev, zlib1g-dev, libdouble-conversion-dev, libsparsehash-dev, librdkafka-dev, libcapnp-dev, libsparsehash-dev, libgoogle-perftools-dev, bash, expect, python, python-lxml, python-termcolor, curl, perl, sudo, openssl]
env:
- MATRIX_EVAL="export CC=clang-5.0 && export CXX=clang++-5.0"
- MATRIX_EVAL="export CC=clang-5.0 CXX=clang++-5.0"
script:
- utils/travis/normal.sh
# TODO: fix internal compiler
# - os: linux
#
# sudo: required
#
# cache:
# timeout: 1000
# directories:
# - /var/cache/pbuilder/ccache
#
# addons:
# apt:
# packages: [ pbuilder, fakeroot, debhelper ]
#
# env:
# - MATRIX_EVAL="export DEB_CC=clang-5.0 && export DEB_CXX=clang++-5.0"
#
# script:
# - utils/travis/pbuilder.sh
- os: linux
sudo: required
@ -85,69 +43,6 @@ matrix:
script:
- utils/travis/pbuilder.sh
# - os: linux
#
# sudo: required
#
# cache:
# timeout: 1000
# directories:
# - /var/cache/pbuilder/ccache
#
# addons:
# apt:
# update: true
# packages: [ pbuilder, fakeroot, debhelper ]
#
# env:
# - MATRIX_EVAL="export ARCH=i386"
#
# script:
# - env PBUILDER_TIMEOUT=40m TEST_TRUE=true TEST_RUN= utils/travis/pbuilder.sh
# TODO: Can't bootstrap bionic on trusty host
# - os: linux
#
# sudo: required
#
# cache:
# timeout: 1000
# directories:
# - /var/cache/pbuilder/ccache
#
# addons:
# apt:
# update: true
# packages: [ pbuilder, fakeroot, debhelper ]
#
# env:
# - MATRIX_EVAL="export DEB_CC=clang-6.0 && export DEB_CXX=clang++-6.0 && export DIST=bionic && export EXTRAPACKAGES='clang-6.0 lld-6.0'"
#
# script:
# - utils/travis/pbuilder.sh
# Cant fit to time limit (48min)
# - os: osx
# osx_image: xcode9.2
#
# cache:
# ccache: true
# timeout: 1000
#
# before_install:
# - brew install unixodbc gcc ccache libtool gettext zlib readline double-conversion gperftools google-sparsehash lz4 zstd || true
# - brew link --overwrite gcc || true
#
# env:
# - MATRIX_EVAL="export CC=gcc-8 && export CXX=g++-8"
#
# script:
# - env CMAKE_FLAGS="-DUSE_INTERNAL_BOOST_LIBRARY=1" utils/travis/normal.sh
allow_failures:
- os: osx

View File

@ -61,10 +61,6 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set (COMMON_WARNING_FLAGS "${COMMON_WARNING_FLAGS} -Wno-unused-command-line-argument")
endif ()
if (ARCH_LINUX)
set (CXX11_ABI "ENABLE" CACHE STRING "Use C++11 ABI: DEFAULT, ENABLE, DISABLE")
endif ()
option (TEST_COVERAGE "Enables flags for test coverage" OFF)
option (ENABLE_TESTS "Enables tests" ${NOT_MSVC})
@ -86,7 +82,7 @@ endif ()
if (CMAKE_LIBRARY_ARCHITECTURE MATCHES "amd64.*|x86_64.*|AMD64.*")
option (USE_INTERNAL_MEMCPY "Use internal implementation of 'memcpy' function instead of provided by libc. Only for x86_64." ON)
if (ARCH_LINUX)
if (OS_LINUX)
option (GLIBC_COMPATIBILITY "Set to TRUE to enable compatibility with older glibc libraries. Only for x86_64, Linux. Implies USE_INTERNAL_MEMCPY." ON)
endif()
endif ()
@ -95,15 +91,7 @@ if (GLIBC_COMPATIBILITY)
set (USE_INTERNAL_MEMCPY ON)
endif ()
if (CXX11_ABI STREQUAL ENABLE)
set (CXX11_ABI_FLAGS "-D_GLIBCXX_USE_CXX11_ABI=1")
elseif (CXX11_ABI STREQUAL DISABLE)
set (CXX11_ABI_FLAGS "-D_GLIBCXX_USE_CXX11_ABI=0")
else ()
set (CXX11_ABI_FLAGS "")
endif ()
set (COMPILER_FLAGS "${COMPILER_FLAGS} ${CXX11_ABI_FLAGS}")
set (COMPILER_FLAGS "${COMPILER_FLAGS}")
string(REGEX MATCH "-?[0-9]+(.[0-9]+)?$" COMPILER_POSTFIX ${CMAKE_CXX_COMPILER})
@ -150,26 +138,29 @@ else ()
endif ()
set (CMAKE_BUILD_COLOR_MAKEFILE ON)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${COMPILER_FLAGS} ${PLATFORM_EXTRA_CXX_FLAG} -fno-omit-frame-pointer ${COMMON_WARNING_FLAGS} ${CXX_WARNING_FLAGS} ${GLIBC_COMPATIBILITY_COMPILE_FLAGS}")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${COMPILER_FLAGS} ${PLATFORM_EXTRA_CXX_FLAG} -fno-omit-frame-pointer ${COMMON_WARNING_FLAGS} ${CXX_WARNING_FLAGS}")
#set (CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} ${CMAKE_CXX_FLAGS_ADD}")
set (CMAKE_CXX_FLAGS_RELWITHDEBINFO "${CMAKE_CXX_FLAGS_RELWITHDEBINFO} -O3 ${CMAKE_CXX_FLAGS_ADD}")
set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -g3 -ggdb3 -fno-inline ${CMAKE_CXX_FLAGS_ADD}")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${COMPILER_FLAGS} -fno-omit-frame-pointer ${COMMON_WARNING_FLAGS} ${GLIBC_COMPATIBILITY_COMPILE_FLAGS} ${CMAKE_C_FLAGS_ADD}")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${COMPILER_FLAGS} -fno-omit-frame-pointer ${COMMON_WARNING_FLAGS} ${CMAKE_C_FLAGS_ADD}")
#set (CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} ${CMAKE_C_FLAGS_ADD}")
set (CMAKE_C_FLAGS_RELWITHDEBINFO "${CMAKE_C_FLAGS_RELWITHDEBINFO} -O3 ${CMAKE_C_FLAGS_ADD}")
set (CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0 -g3 -ggdb3 -fno-inline ${CMAKE_C_FLAGS_ADD}")
if (MAKE_STATIC_LIBRARIES AND NOT APPLE AND NOT (CMAKE_CXX_COMPILER_ID STREQUAL "Clang" AND ARCH_FREEBSD))
if (MAKE_STATIC_LIBRARIES AND NOT APPLE AND NOT (CMAKE_CXX_COMPILER_ID STREQUAL "Clang" AND OS_FREEBSD))
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libgcc -static-libstdc++")
# Along with executables, we also build example of shared library for "library dictionary source"; and it also should be self-contained.
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -static-libgcc -static-libstdc++")
endif ()
set(THREADS_PREFER_PTHREAD_FLAG ON)
include (cmake/test_compiler.cmake)
if (ARCH_LINUX AND CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${GLIBC_COMPATIBILITY_LINK_FLAGS} ${CXX11_ABI_FLAGS}")
if (OS_LINUX AND CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS}")
option (USE_LIBCXX "Use libc++ and libc++abi instead of libstdc++ (only make sense on Linux with Clang)" ${HAVE_LIBCXX})
set (LIBCXX_PATH "" CACHE STRING "Use custom path for libc++. It should be used for MSan.")
@ -237,7 +228,7 @@ else ()
set(NOT_UNBUNDLED 1)
endif ()
# Using system libs can cause lot of warnings in includes.
if (UNBUNDLED OR NOT (ARCH_LINUX OR APPLE) OR ARCH_32)
if (UNBUNDLED OR NOT (OS_LINUX OR APPLE) OR ARCH_32)
option (NO_WERROR "Disable -Werror compiler option" ON)
endif ()
@ -246,19 +237,15 @@ message (STATUS "Building for: ${CMAKE_SYSTEM} ${CMAKE_SYSTEM_PROCESSOR} ${CMAKE
include(GNUInstallDirs)
include (cmake/find_ssl.cmake)
if (NOT OPENSSL_FOUND)
message (FATAL_ERROR "Need openssl for build. debian tip: sudo apt install libssl-dev")
endif ()
include (cmake/lib_name.cmake)
include (cmake/find_icu4c.cmake)
include (cmake/find_boost.cmake)
# openssl, zlib before poco
include (cmake/find_zlib.cmake)
include (cmake/find_zstd.cmake)
include (cmake/find_ltdl.cmake) # for odbc
include (cmake/find_termcap.cmake)
include (cmake/find_odbc.cmake)
# openssl, zlib, odbc before poco
include (cmake/find_poco.cmake)
include (cmake/find_lz4.cmake)
include (cmake/find_sparsehash.cmake)
@ -278,7 +265,6 @@ include (cmake/find_contrib_lib.cmake)
find_contrib_lib(cityhash)
find_contrib_lib(farmhash)
find_contrib_lib(metrohash)
find_contrib_lib(murmurhash2)
find_contrib_lib(btrie)
find_contrib_lib(double-conversion)

View File

@ -3,7 +3,6 @@ set -e -x
source default-config
./install-os-packages.sh libssl-dev
./install-os-packages.sh libicu-dev
./install-os-packages.sh libreadline-dev

View File

@ -43,9 +43,6 @@ case $PACKAGE_MANAGER in
jq)
$SUDO apt-get install -y jq
;;
libssl-dev)
$SUDO apt-get install -y libssl-dev
;;
libicu-dev)
$SUDO apt-get install -y libicu-dev
;;
@ -91,9 +88,6 @@ case $PACKAGE_MANAGER in
jq)
$SUDO yum install -y jq
;;
libssl-dev)
$SUDO yum install -y openssl-devel
;;
libicu-dev)
$SUDO yum install -y libicu-devel
;;
@ -133,9 +127,6 @@ case $PACKAGE_MANAGER in
jq)
$SUDO pkg install -y jq
;;
libssl-dev)
$SUDO pkg install -y openssl
;;
libicu-dev)
$SUDO pkg install -y icu
;;

View File

@ -21,7 +21,7 @@ BUILD_TARGETS=clickhouse
BUILD_TYPE=Debug
ENABLE_EMBEDDED_COMPILER=0
CMAKE_FLAGS="-D CMAKE_C_FLAGS_ADD=-g0 -D CMAKE_CXX_FLAGS_ADD=-g0 -D ENABLE_TCMALLOC=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_UNWIND=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_POCO_ODBC=0 -D ENABLE_MYSQL=0"
CMAKE_FLAGS="-D CMAKE_C_FLAGS_ADD=-g0 -D CMAKE_CXX_FLAGS_ADD=-g0 -D ENABLE_TCMALLOC=0 -D ENABLE_CAPNP=0 -D ENABLE_RDKAFKA=0 -D ENABLE_UNWIND=0 -D ENABLE_ICU=0 -D ENABLE_POCO_MONGODB=0 -D ENABLE_POCO_NETSSL=0 -D ENABLE_POCO_ODBC=0 -D ENABLE_ODBC=0 -D ENABLE_MYSQL=0"
[[ $(uname) == "FreeBSD" ]] && COMPILER_PACKAGE_VERSION=devel && export COMPILER_PATH=/usr/local/bin

View File

@ -0,0 +1,88 @@
# This file copied from contrib/poco/cmake/FindODBC.cmake to allow build without submodules
#
# Find the ODBC driver manager includes and library.
#
# ODBC is an open standard for connecting to different databases in a
# semi-vendor-independent fashion. First you install the ODBC driver
# manager. Then you need a driver for each separate database you want
# to connect to (unless a generic one works). VTK includes neither
# the driver manager nor the vendor-specific drivers: you have to find
# those yourself.
#
# This module defines
# ODBC_INCLUDE_DIRECTORIES, where to find sql.h
# ODBC_LIBRARIES, the libraries to link against to use ODBC
# ODBC_FOUND. If false, you cannot build anything that requires ODBC.
option (ENABLE_ODBC "Enable ODBC" ${OS_LINUX})
if (OS_LINUX)
option (USE_INTERNAL_ODBC_LIBRARY "Set to FALSE to use system odbc library instead of bundled" ${NOT_UNBUNDLED})
else ()
option (USE_INTERNAL_ODBC_LIBRARY "Set to FALSE to use system odbc library instead of bundled" OFF)
endif ()
if (USE_INTERNAL_ODBC_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/unixodbc/README")
message (WARNING "submodule contrib/unixodbc is missing. to fix try run: \n git submodule update --init --recursive")
set (USE_INTERNAL_ODBC_LIBRARY 0)
endif ()
if (ENABLE_ODBC)
if (USE_INTERNAL_ODBC_LIBRARY)
set (ODBC_LIBRARIES unixodbc)
set (ODBC_INCLUDE_DIRECTORIES ${CMAKE_SOURCE_DIR}/contrib/unixodbc/include)
set (ODBC_FOUND 1)
set (USE_ODBC 1)
else ()
find_path(ODBC_INCLUDE_DIRECTORIES
NAMES sql.h
HINTS
/usr/include
/usr/include/iodbc
/usr/include/odbc
/usr/local/include
/usr/local/include/iodbc
/usr/local/include/odbc
/usr/local/iodbc/include
/usr/local/odbc/include
"C:/Program Files/ODBC/include"
"C:/Program Files/Microsoft SDKs/Windows/v7.0/include"
"C:/Program Files/Microsoft SDKs/Windows/v6.0a/include"
"C:/ODBC/include"
DOC "Specify the directory containing sql.h."
)
find_library(ODBC_LIBRARIES
NAMES iodbc odbc iodbcinst odbcinst odbc32
HINTS
/usr/lib
/usr/lib/iodbc
/usr/lib/odbc
/usr/local/lib
/usr/local/lib/iodbc
/usr/local/lib/odbc
/usr/local/iodbc/lib
/usr/local/odbc/lib
"C:/Program Files/ODBC/lib"
"C:/ODBC/lib/debug"
"C:/Program Files (x86)/Microsoft SDKs/Windows/v7.0A/Lib"
DOC "Specify the ODBC driver manager library here."
)
# MinGW find usually fails
if(MINGW)
set(ODBC_INCLUDE_DIRECTORIES ".")
set(ODBC_LIBRARIES odbc32)
endif()
include(FindPackageHandleStandardArgs)
find_package_handle_standard_args(ODBC
DEFAULT_MSG
ODBC_INCLUDE_DIRECTORIES
ODBC_LIBRARIES)
mark_as_advanced(ODBC_FOUND ODBC_LIBRARIES ODBC_INCLUDE_DIRECTORIES)
endif ()
endif ()
message (STATUS "Using odbc: ${ODBC_INCLUDE_DIRECTORIES} : ${ODBC_LIBRARIES}")

View File

@ -11,19 +11,12 @@ if ( ( ARCH_ARM AND NOT ARCH_AARCH64 ) OR ARCH_I386)
set (ARCH_32 1)
message (WARNING "Support for 32bit platforms is highly experimental")
endif ()
if (CMAKE_SYSTEM MATCHES "Linux")
set (ARCH_LINUX 1)
set (OS_LINUX 1)
endif ()
if (CMAKE_SYSTEM MATCHES "FreeBSD")
set (ARCH_FREEBSD 1)
endif ()
if (NOT MSVC)
set (NOT_MSVC 1)
endif ()
if (NOT APPLE)
set (NOT_APPLE 1)
set (OS_FREEBSD 1)
endif ()
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")

View File

@ -2,7 +2,7 @@
# TODO: test new libcpuid - maybe already fixed
if (NOT ARCH_ARM)
if (ARCH_FREEBSD)
if (OS_FREEBSD)
set (DEFAULT_USE_INTERNAL_CPUID_LIBRARY 1)
else ()
set (DEFAULT_USE_INTERNAL_CPUID_LIBRARY ${NOT_UNBUNDLED})

View File

@ -1,4 +1,4 @@
if (ARCH_FREEBSD)
if (OS_FREEBSD)
find_library (EXECINFO_LIBRARY execinfo)
find_library (ELF_LIBRARY elf)
message (STATUS "Using execinfo: ${EXECINFO_LIBRARY}")

View File

@ -15,13 +15,20 @@
# ODBC_LIBRARIES, the libraries to link against to use ODBC
# ODBC_FOUND. If false, you cannot build anything that requires ODBC.
option (ENABLE_ODBC "Enable ODBC" ON)
option (USE_INTERNAL_ODBC_LIBRARY "Set to FALSE to use system odbc library instead of bundled" ${NOT_UNBUNDLED})
option (ENABLE_ODBC "Enable ODBC" ${OS_LINUX})
if (OS_LINUX)
option (USE_INTERNAL_ODBC_LIBRARY "Set to FALSE to use system odbc library instead of bundled" ${NOT_UNBUNDLED})
else ()
option (USE_INTERNAL_ODBC_LIBRARY "Set to FALSE to use system odbc library instead of bundled" OFF)
endif ()
if (USE_INTERNAL_ODBC_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/unixodbc/README")
message (WARNING "submodule contrib/unixodbc is missing. to fix try run: \n git submodule update --init --recursive")
set (USE_INTERNAL_ODBC_LIBRARY 0)
endif ()
set (ODBC_INCLUDE_DIRECTORIES ) # Include directories will be either used automatically by target_include_directories or set later.
if (ENABLE_ODBC)
if (USE_INTERNAL_ODBC_LIBRARY)
set (ODBC_LIBRARIES unixodbc)

View File

@ -92,8 +92,7 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
endif ()
endif ()
# TODO! fix internal ssl
if (OPENSSL_FOUND AND NOT USE_INTERNAL_SSL_LIBRARY AND (NOT DEFINED ENABLE_POCO_NETSSL OR ENABLE_POCO_NETSSL))
if (OPENSSL_FOUND AND (NOT DEFINED ENABLE_POCO_NETSSL OR ENABLE_POCO_NETSSL))
set (Poco_NetSSL_LIBRARY PocoNetSSL)
set (Poco_Crypto_LIBRARY PocoCrypto)
endif ()

View File

@ -13,7 +13,7 @@ endif ()
if (NOT USE_INTERNAL_RDKAFKA_LIBRARY)
find_library (RDKAFKA_LIB rdkafka)
find_path (RDKAFKA_INCLUDE_DIR NAMES librdkafka/rdkafka.h PATHS ${RDKAFKA_INCLUDE_PATHS})
if (USE_STATIC_LIBRARIES AND NOT ARCH_FREEBSD)
if (USE_STATIC_LIBRARIES AND NOT OS_FREEBSD)
find_library (SASL2_LIBRARY sasl2)
endif ()
endif ()

View File

@ -1,7 +1,7 @@
if (APPLE)
# lib from libs/libcommon
set (RT_LIBRARY "apple_rt")
elseif (ARCH_FREEBSD)
elseif (OS_FREEBSD)
find_library (RT_LIBRARY rt)
else ()
set (RT_LIBRARY "")

View File

@ -1,4 +1,4 @@
option (USE_INTERNAL_SSL_LIBRARY "Set to FALSE to use system *ssl library instead of bundled" ${MSVC})
option (USE_INTERNAL_SSL_LIBRARY "Set to FALSE to use system *ssl library instead of bundled" ${OS_LINUX})
set (OPENSSL_USE_STATIC_LIBS ${USE_STATIC_LIBRARIES})

View File

@ -17,7 +17,7 @@ if (NOT ZLIB_FOUND)
set (USE_INTERNAL_ZLIB_LIBRARY 1)
set (ZLIB_COMPAT 1) # for zlib-ng, also enables WITH_GZFILEOP
set (WITH_NATIVE_INSTRUCTIONS ${ARCHNATIVE})
if (ARCH_FREEBSD OR ARCH_I386)
if (OS_FREEBSD OR ARCH_I386)
set (WITH_OPTIM 0 CACHE INTERNAL "") # Bug in assembler
endif ()
if (ARCH_AARCH64)

View File

@ -43,6 +43,8 @@ if (USE_INTERNAL_METROHASH_LIBRARY)
add_subdirectory (libmetrohash)
endif ()
add_subdirectory (murmurhash)
if (USE_INTERNAL_BTRIE_LIBRARY)
add_subdirectory (libbtrie)
endif ()
@ -51,10 +53,6 @@ if (USE_INTERNAL_UNWIND_LIBRARY)
add_subdirectory (libunwind)
endif ()
if (USE_INTERNAL_MURMURHASH2_LIBRARY)
add_subdirectory (libmurmurhash2)
endif ()
if (USE_INTERNAL_ZLIB_LIBRARY)
add_subdirectory (${INTERNAL_ZLIB_NAME})
# todo: make pull to Dead2/zlib-ng and remove:
@ -98,6 +96,7 @@ if (USE_INTERNAL_SSL_LIBRARY)
set (BUILD_SHARED 1)
endif ()
set (USE_SHARED ${USE_STATIC_LIBRARIES})
set (LIBRESSL_SKIP_INSTALL 1)
add_subdirectory (ssl)
target_include_directories(${OPENSSL_CRYPTO_LIBRARY} PUBLIC ${OPENSSL_INCLUDE_DIR})
target_include_directories(${OPENSSL_SSL_LIBRARY} PUBLIC ${OPENSSL_INCLUDE_DIR})
@ -152,11 +151,6 @@ if (USE_INTERNAL_POCO_LIBRARY)
set (_save ${ENABLE_TESTS})
set (ENABLE_TESTS 0)
set (CMAKE_DISABLE_FIND_PACKAGE_ZLIB 1)
if (USE_INTERNAL_SSL_LIBRARY OR (DEFINED ENABLE_POCO_NETSSL AND NOT ENABLE_POCO_NETSSL))
set (DISABLE_INTERNAL_OPENSSL 1 CACHE INTERNAL "")
set (ENABLE_NETSSL 0 CACHE INTERNAL "") # TODO!
set (ENABLE_CRYPTO 0 CACHE INTERNAL "") # TODO!
endif ()
if (MSVC)
set (ENABLE_DATA_ODBC 0 CACHE INTERNAL "") # TODO (build fail)
endif ()

View File

@ -37,7 +37,7 @@ ${JEMALLOC_SOURCE_DIR}/src/witness.c
)
if(CMAKE_SYSTEM_NAME MATCHES "Darwin")
list(APPEND SRCS src/zone.c)
list(APPEND SRCS ${JEMALLOC_SOURCE_DIR}/src/zone.c)
endif()
add_library(jemalloc STATIC ${SRCS})

View File

@ -96,7 +96,8 @@
/*
* Defined if secure_getenv(3) is available.
*/
#define JEMALLOC_HAVE_SECURE_GETENV
// Don't want dependency on newer GLIBC
//#define JEMALLOC_HAVE_SECURE_GETENV
/*
* Defined if issetugid(2) is available.

View File

@ -1,6 +0,0 @@
add_library(murmurhash2
src/murmurhash2.cpp
include/murmurhash2.h)
target_include_directories (murmurhash2 PUBLIC include)
target_include_directories (murmurhash2 PUBLIC src)

View File

@ -1 +0,0 @@
MurmurHash2 was written by Austin Appleby, and is placed in the publicdomain. The author hereby disclaims copyright to this source code.

View File

@ -57,6 +57,8 @@ ${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64/libmariadb/ma_client_plugin.c
add_library(mysqlclient STATIC ${SRCS})
target_link_libraries(mysqlclient ${OPENSSL_LIBRARIES})
target_include_directories(mysqlclient PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64/include)
target_include_directories(mysqlclient PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/common/include)
target_include_directories(mysqlclient PUBLIC ${MARIADB_CLIENT_SOURCE_DIR}/include)

View File

@ -0,0 +1,5 @@
add_library(murmurhash
src/murmurhash2.cpp
include/murmurhash2.h)
target_include_directories (murmurhash PUBLIC include)

View File

@ -0,0 +1 @@
MurmurHash was written by Austin Appleby, and is placed in the publicdomain. The author hereby disclaims copyright to this source code.

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit 3a2d0a833a22ef5e1164a9ada54e3253cb038904
Subproject commit 4ab45bc3bb0d2c476ea5385ec2d398c6bfc9f089

2
contrib/ssl vendored

@ -1 +1 @@
Subproject commit 6fbe1c6f404193989c5f6a63115d80fbe34ce2a3
Subproject commit 4f9a7b8745184410dc0b31ba548ce21ac64edd9c

View File

@ -279,9 +279,10 @@ target_link_libraries(unixodbc ltdl)
# SYSTEM_FILE_PATH was changed to /etc
target_include_directories(unixodbc PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64)
target_include_directories(unixodbc PRIVATE ${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64/private)
target_include_directories(unixodbc PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64)
target_include_directories(unixodbc PUBLIC ${ODBC_SOURCE_DIR}/include)
target_compile_definitions(unixodbc PRIVATE -DHAVE_CONFIG_H)
target_compile_options(unixodbc PRIVATE -Wno-dangling-else -Wno-parentheses -Wno-unknown-warning-option -O2)
target_compile_options(unixodbc PRIVATE -Wno-dangling-else -Wno-parentheses -Wno-misleading-indentation -Wno-unknown-warning-option -O2)

View File

@ -53,6 +53,7 @@ add_headers_and_sources(dbms src/Interpreters/ClusterProxy)
add_headers_and_sources(dbms src/Columns)
add_headers_and_sources(dbms src/Storages)
add_headers_and_sources(dbms src/Storages/Distributed)
add_headers_and_sources(dbms src/Storages/Kafka)
add_headers_and_sources(dbms src/Storages/MergeTree)
add_headers_and_sources(dbms src/Client)
add_headers_and_sources(dbms src/Formats)
@ -84,7 +85,7 @@ list (APPEND dbms_headers src/TableFunctions/ITableFunction.h src/TableFunctions
add_library(clickhouse_common_io ${SPLIT_SHARED} ${clickhouse_common_io_headers} ${clickhouse_common_io_sources})
if (ARCH_FREEBSD)
if (OS_FREEBSD)
target_compile_definitions (clickhouse_common_io PUBLIC CLOCK_MONOTONIC_COARSE=CLOCK_MONOTONIC_FAST)
endif ()

View File

@ -1,11 +1,11 @@
# This strings autochanged from release_lib.sh:
set(VERSION_REVISION 54403 CACHE STRING "")
set(VERSION_REVISION 54404 CACHE STRING "")
set(VERSION_MAJOR 18 CACHE STRING "")
set(VERSION_MINOR 8 CACHE STRING "")
set(VERSION_MINOR 9 CACHE STRING "")
set(VERSION_PATCH 0 CACHE STRING "")
set(VERSION_GITHASH e6be3df322f24ff3aa9ae9a97b9b01b2c88ab7b0 CACHE STRING "")
set(VERSION_DESCRIBE v18.8.0-testing CACHE STRING "")
set(VERSION_STRING 18.8.0 CACHE STRING "")
set(VERSION_GITHASH c83721a02db002eef7ff864f82d53ca89d47f9e6 CACHE STRING "")
set(VERSION_DESCRIBE v18.9.0-testing CACHE STRING "")
set(VERSION_STRING 18.9.0 CACHE STRING "")
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")
@ -14,18 +14,11 @@ set(VERSION_TWEAK "" CACHE STRING "")
if (VERSION_TWEAK)
string(CONCAT VERSION_STRING ${VERSION_STRING} "." ${VERSION_TWEAK})
endif ()
if (VERSION_EXTRA)
string(CONCAT VERSION_STRING ${VERSION_STRING} "." ${VERSION_EXTRA})
endif ()
set (VERSION_NAME "${PROJECT_NAME}")
set (VERSION_FULL "${VERSION_NAME} ${VERSION_STRING}")
if (APPLE)
# dirty hack: ld: malformed 64-bit a.b.c.d.e version number: 1.1.54160
math (EXPR VERSION_SO1 "${VERSION_REVISION}/255")
math (EXPR VERSION_SO2 "${VERSION_REVISION}%255")
set (VERSION_SO "${VERSION_MAJOR}.${VERSION_MINOR}.${VERSION_SO1}.${VERSION_SO2}")
else ()
set (VERSION_SO "${VERSION_STRING}")
endif ()
set (VERSION_NAME "${PROJECT_NAME}" CACHE STRING "")
set (VERSION_FULL "${VERSION_NAME} ${VERSION_STRING}" CACHE STRING "")
set (VERSION_SO "${VERSION_STRING}" CACHE STRING "")

View File

@ -152,6 +152,6 @@ else ()
endif ()
if (USE_EMBEDDED_COMPILER AND ENABLE_CLICKHOUSE_SERVER)
if (TARGET clickhouse-server AND TARGET copy-headers)
add_dependencies(clickhouse-server copy-headers)
endif ()

View File

@ -19,7 +19,7 @@ if (CLICKHOUSE_SPLIT_BINARY)
install (TARGETS clickhouse-server ${CLICKHOUSE_ALL_TARGETS} RUNTIME DESTINATION ${CMAKE_INSTALL_BINDIR} COMPONENT clickhouse)
endif ()
if (NOT APPLE AND NOT ARCH_FREEBSD)
if (OS_LINUX)
set (GLIBC_MAX_REQUIRED 2.4)
add_test(NAME GLIBC_required_version COMMAND bash -c "readelf -s ${CMAKE_CURRENT_BINARY_DIR}/../clickhouse-server | grep '@GLIBC' | grep -oP 'GLIBC_[\\d\\.]+' | sort | uniq | sort -r | perl -lnE 'exit 1 if $_ gt q{GLIBC_${GLIBC_MAX_REQUIRED}}'")
endif ()

View File

@ -4,6 +4,7 @@
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <Common/FieldVisitors.h>
#include <Common/SipHash.h>
@ -35,6 +36,13 @@ String FieldVisitorDump::operator() (const UInt64 & x) const { return formatQuot
String FieldVisitorDump::operator() (const Int64 & x) const { return formatQuotedWithPrefix(x, "Int64_"); }
String FieldVisitorDump::operator() (const Float64 & x) const { return formatQuotedWithPrefix(x, "Float64_"); }
String FieldVisitorDump::operator() (const UInt128 & x) const
{
WriteBufferFromOwnString wb;
wb << "UInt128_" << x.low << "_" << x.high;
return wb.str();
}
String FieldVisitorDump::operator() (const String & x) const
{
@ -47,14 +55,14 @@ String FieldVisitorDump::operator() (const Array & x) const
{
WriteBufferFromOwnString wb;
wb.write("Array_[", 7);
wb << "Array_[";
for (auto it = x.begin(); it != x.end(); ++it)
{
if (it != x.begin())
wb.write(", ", 2);
writeString(applyVisitor(*this, *it), wb);
wb << ", ";
wb << applyVisitor(*this, *it);
}
writeChar(']', wb);
wb << ']';
return wb.str();
}
@ -64,14 +72,14 @@ String FieldVisitorDump::operator() (const Tuple & x_def) const
auto & x = x_def.toUnderType();
WriteBufferFromOwnString wb;
wb.write("Tuple_(", 7);
wb << "Tuple_(";
for (auto it = x.begin(); it != x.end(); ++it)
{
if (it != x.begin())
wb.write(", ", 2);
writeString(applyVisitor(*this, *it), wb);
wb << ", ";
wb << applyVisitor(*this, *it);
}
writeChar(')', wb);
wb << ')';
return wb.str();
}
@ -105,19 +113,24 @@ String FieldVisitorToString::operator() (const Int64 & x) const { return formatQ
String FieldVisitorToString::operator() (const Float64 & x) const { return formatFloat(x); }
String FieldVisitorToString::operator() (const String & x) const { return formatQuoted(x); }
String FieldVisitorToString::operator() (const UInt128 & x) const
{
/// Dummy implementation. There is no UInt128 literals in SQL.
return FieldVisitorDump()(x);
}
String FieldVisitorToString::operator() (const Array & x) const
{
WriteBufferFromOwnString wb;
writeChar('[', wb);
wb << '[';
for (Array::const_iterator it = x.begin(); it != x.end(); ++it)
{
if (it != x.begin())
wb.write(", ", 2);
writeString(applyVisitor(*this, *it), wb);
wb << applyVisitor(*this, *it);
}
writeChar(']', wb);
wb << ']';
return wb.str();
}
@ -127,14 +140,14 @@ String FieldVisitorToString::operator() (const Tuple & x_def) const
auto & x = x_def.toUnderType();
WriteBufferFromOwnString wb;
writeChar('(', wb);
wb << '(';
for (auto it = x.begin(); it != x.end(); ++it)
{
if (it != x.begin())
wb.write(", ", 2);
writeString(applyVisitor(*this, *it), wb);
wb << ", ";
wb << applyVisitor(*this, *it);
}
writeChar(')', wb);
wb << ')';
return wb.str();
}
@ -155,6 +168,13 @@ void FieldVisitorHash::operator() (const UInt64 & x) const
hash.update(x);
}
void FieldVisitorHash::operator() (const UInt128 & x) const
{
UInt8 type = Field::Types::UInt128;
hash.update(type);
hash.update(x);
}
void FieldVisitorHash::operator() (const Int64 & x) const
{
UInt8 type = Field::Types::Int64;

View File

@ -38,6 +38,7 @@ typename std::decay_t<Visitor>::ResultType applyVisitor(Visitor && visitor, F &&
{
case Field::Types::Null: return visitor(field.template get<Null>());
case Field::Types::UInt64: return visitor(field.template get<UInt64>());
case Field::Types::UInt128: return visitor(field.template get<UInt128>());
case Field::Types::Int64: return visitor(field.template get<Int64>());
case Field::Types::Float64: return visitor(field.template get<Float64>());
case Field::Types::String: return visitor(field.template get<String>());
@ -57,6 +58,7 @@ static typename std::decay_t<Visitor>::ResultType applyBinaryVisitorImpl(Visitor
{
case Field::Types::Null: return visitor(field1, field2.template get<Null>());
case Field::Types::UInt64: return visitor(field1, field2.template get<UInt64>());
case Field::Types::UInt128: return visitor(field1, field2.template get<UInt128>());
case Field::Types::Int64: return visitor(field1, field2.template get<Int64>());
case Field::Types::Float64: return visitor(field1, field2.template get<Float64>());
case Field::Types::String: return visitor(field1, field2.template get<String>());
@ -79,6 +81,9 @@ typename std::decay_t<Visitor>::ResultType applyVisitor(Visitor && visitor, F1 &
case Field::Types::UInt64:
return applyBinaryVisitorImpl(
std::forward<Visitor>(visitor), field1.template get<UInt64>(), std::forward<F2>(field2));
case Field::Types::UInt128:
return applyBinaryVisitorImpl(
std::forward<Visitor>(visitor), field1.template get<UInt128>(), std::forward<F2>(field2));
case Field::Types::Int64:
return applyBinaryVisitorImpl(
std::forward<Visitor>(visitor), field1.template get<Int64>(), std::forward<F2>(field2));
@ -107,6 +112,7 @@ class FieldVisitorToString : public StaticVisitor<String>
public:
String operator() (const Null & x) const;
String operator() (const UInt64 & x) const;
String operator() (const UInt128 & x) const;
String operator() (const Int64 & x) const;
String operator() (const Float64 & x) const;
String operator() (const String & x) const;
@ -121,6 +127,7 @@ class FieldVisitorDump : public StaticVisitor<String>
public:
String operator() (const Null & x) const;
String operator() (const UInt64 & x) const;
String operator() (const UInt128 & x) const;
String operator() (const Int64 & x) const;
String operator() (const Float64 & x) const;
String operator() (const String & x) const;
@ -157,6 +164,11 @@ public:
T operator() (const UInt64 & x) const { return x; }
T operator() (const Int64 & x) const { return x; }
T operator() (const Float64 & x) const { return x; }
T operator() (const UInt128 &) const
{
throw Exception("Cannot convert UInt128 to " + demangle(typeid(T).name()), ErrorCodes::CANNOT_CONVERT_TYPE);
}
};
@ -170,6 +182,7 @@ public:
void operator() (const Null & x) const;
void operator() (const UInt64 & x) const;
void operator() (const UInt128 & x) const;
void operator() (const Int64 & x) const;
void operator() (const Float64 & x) const;
void operator() (const String & x) const;
@ -180,44 +193,60 @@ public:
/** More precise comparison, used for index.
* Differs from Field::operator< and Field::operator== in that it also compares values of different types.
* Comparison rules are same as in FunctionsComparison (to be consistent with expression evaluation in query).
*
* TODO Comparisons of UInt128 with different type are incorrect.
*/
class FieldVisitorAccurateEquals : public StaticVisitor<bool>
{
public:
bool operator() (const Null &, const Null &) const { return true; }
bool operator() (const Null &, const UInt64 &) const { return false; }
bool operator() (const Null &, const UInt128 &) const { return false; }
bool operator() (const Null &, const Int64 &) const { return false; }
bool operator() (const Null &, const Float64 &) const { return false; }
bool operator() (const Null &, const String &) const { return false; }
bool operator() (const Null &, const Array &) const { return false; }
bool operator() (const Null &, const Tuple &) const { return false; }
bool operator() (const UInt64 &, const Null &) const { return false; }
bool operator() (const UInt64 &, const Null &) const { return false; }
bool operator() (const UInt64 & l, const UInt64 & r) const { return l == r; }
bool operator() (const UInt64 &, const UInt128) const { return true; }
bool operator() (const UInt64 & l, const Int64 & r) const { return accurate::equalsOp(l, r); }
bool operator() (const UInt64 & l, const Float64 & r) const { return accurate::equalsOp(l, r); }
bool operator() (const UInt64 &, const String &) const { return false; }
bool operator() (const UInt64 &, const Array &) const { return false; }
bool operator() (const UInt64 &, const Tuple &) const { return false; }
bool operator() (const UInt64 &, const String &) const { return false; }
bool operator() (const UInt64 &, const Array &) const { return false; }
bool operator() (const UInt64 &, const Tuple &) const { return false; }
bool operator() (const Int64 &, const Null &) const { return false; }
bool operator() (const UInt128 &, const Null &) const { return false; }
bool operator() (const UInt128 &, const UInt64) const { return false; }
bool operator() (const UInt128 & l, const UInt128 & r) const { return l == r; }
bool operator() (const UInt128 &, const Int64) const { return false; }
bool operator() (const UInt128 &, const Float64) const { return false; }
bool operator() (const UInt128 &, const String &) const { return false; }
bool operator() (const UInt128 &, const Array &) const { return false; }
bool operator() (const UInt128 &, const Tuple &) const { return false; }
bool operator() (const Int64 &, const Null &) const { return false; }
bool operator() (const Int64 & l, const UInt64 & r) const { return accurate::equalsOp(l, r); }
bool operator() (const Int64 &, const UInt128) const { return false; }
bool operator() (const Int64 & l, const Int64 & r) const { return l == r; }
bool operator() (const Int64 & l, const Float64 & r) const { return accurate::equalsOp(l, r); }
bool operator() (const Int64 &, const String &) const { return false; }
bool operator() (const Int64 &, const Array &) const { return false; }
bool operator() (const Int64 &, const Tuple &) const { return false; }
bool operator() (const Int64 &, const String &) const { return false; }
bool operator() (const Int64 &, const Array &) const { return false; }
bool operator() (const Int64 &, const Tuple &) const { return false; }
bool operator() (const Float64 &, const Null &) const { return false; }
bool operator() (const Float64 &, const Null &) const { return false; }
bool operator() (const Float64 & l, const UInt64 & r) const { return accurate::equalsOp(l, r); }
bool operator() (const Float64 &, const UInt128) const { return false; }
bool operator() (const Float64 & l, const Int64 & r) const { return accurate::equalsOp(l, r); }
bool operator() (const Float64 & l, const Float64 & r) const { return l == r; }
bool operator() (const Float64 &, const String &) const { return false; }
bool operator() (const Float64 &, const Array &) const { return false; }
bool operator() (const Float64 &, const Tuple &) const { return false; }
bool operator() (const Float64 &, const String &) const { return false; }
bool operator() (const Float64 &, const Array &) const { return false; }
bool operator() (const Float64 &, const Tuple &) const { return false; }
bool operator() (const String &, const Null &) const { return false; }
bool operator() (const String &, const UInt64 &) const { return false; }
bool operator() (const String &, const UInt128 &) const { return false; }
bool operator() (const String &, const Int64 &) const { return false; }
bool operator() (const String &, const Float64 &) const { return false; }
bool operator() (const String & l, const String & r) const { return l == r; }
@ -226,6 +255,7 @@ public:
bool operator() (const Array &, const Null &) const { return false; }
bool operator() (const Array &, const UInt64 &) const { return false; }
bool operator() (const Array &, const UInt128 &) const { return false; }
bool operator() (const Array &, const Int64 &) const { return false; }
bool operator() (const Array &, const Float64 &) const { return false; }
bool operator() (const Array &, const String &) const { return false; }
@ -234,6 +264,7 @@ public:
bool operator() (const Tuple &, const Null &) const { return false; }
bool operator() (const Tuple &, const UInt64 &) const { return false; }
bool operator() (const Tuple &, const UInt128 &) const { return false; }
bool operator() (const Tuple &, const Int64 &) const { return false; }
bool operator() (const Tuple &, const Float64 &) const { return false; }
bool operator() (const Tuple &, const String &) const { return false; }
@ -247,45 +278,60 @@ public:
bool operator() (const Null &, const Null &) const { return false; }
bool operator() (const Null &, const UInt64 &) const { return true; }
bool operator() (const Null &, const Int64 &) const { return true; }
bool operator() (const Null &, const UInt128 &) const { return true; }
bool operator() (const Null &, const Float64 &) const { return true; }
bool operator() (const Null &, const String &) const { return true; }
bool operator() (const Null &, const Array &) const { return true; }
bool operator() (const Null &, const Tuple &) const { return true; }
bool operator() (const UInt64 &, const Null &) const { return false; }
bool operator() (const UInt64 &, const Null &) const { return false; }
bool operator() (const UInt64 & l, const UInt64 & r) const { return l < r; }
bool operator() (const UInt64 &, const UInt128 &) const { return true; }
bool operator() (const UInt64 & l, const Int64 & r) const { return accurate::lessOp(l, r); }
bool operator() (const UInt64 & l, const Float64 & r) const { return accurate::lessOp(l, r); }
bool operator() (const UInt64 &, const String &) const { return true; }
bool operator() (const UInt64 &, const Array &) const { return true; }
bool operator() (const UInt64 &, const Tuple &) const { return true; }
bool operator() (const UInt64 &, const String &) const { return true; }
bool operator() (const UInt64 &, const Array &) const { return true; }
bool operator() (const UInt64 &, const Tuple &) const { return true; }
bool operator() (const Int64 &, const Null &) const { return false; }
bool operator() (const UInt128 &, const Null &) const { return false; }
bool operator() (const UInt128 &, const UInt64) const { return false; }
bool operator() (const UInt128 & l, const UInt128 & r) const { return l < r; }
bool operator() (const UInt128 &, const Int64) const { return false; }
bool operator() (const UInt128 &, const Float64) const { return false; }
bool operator() (const UInt128 &, const String &) const { return false; }
bool operator() (const UInt128 &, const Array &) const { return false; }
bool operator() (const UInt128 &, const Tuple &) const { return false; }
bool operator() (const Int64 &, const Null &) const { return false; }
bool operator() (const Int64 & l, const UInt64 & r) const { return accurate::lessOp(l, r); }
bool operator() (const Int64 &, const UInt128 &) const { return false; }
bool operator() (const Int64 & l, const Int64 & r) const { return l < r; }
bool operator() (const Int64 & l, const Float64 & r) const { return accurate::lessOp(l, r); }
bool operator() (const Int64 &, const String &) const { return true; }
bool operator() (const Int64 &, const Array &) const { return true; }
bool operator() (const Int64 &, const Tuple &) const { return true; }
bool operator() (const Int64 &, const String &) const { return true; }
bool operator() (const Int64 &, const Array &) const { return true; }
bool operator() (const Int64 &, const Tuple &) const { return true; }
bool operator() (const Float64 &, const Null &) const { return false; }
bool operator() (const Float64 &, const Null &) const { return false; }
bool operator() (const Float64 & l, const UInt64 & r) const { return accurate::lessOp(l, r); }
bool operator() (const Float64, const UInt128 &) const { return false; }
bool operator() (const Float64 & l, const Int64 & r) const { return accurate::lessOp(l, r); }
bool operator() (const Float64 & l, const Float64 & r) const { return l < r; }
bool operator() (const Float64 &, const String &) const { return true; }
bool operator() (const Float64 &, const Array &) const { return true; }
bool operator() (const Float64 &, const Tuple &) const { return true; }
bool operator() (const Float64 &, const String &) const { return true; }
bool operator() (const Float64 &, const Array &) const { return true; }
bool operator() (const Float64 &, const Tuple &) const { return true; }
bool operator() (const String &, const Null &) const { return false; }
bool operator() (const String &, const UInt64 &) const { return false; }
bool operator() (const String &, const Int64 &) const { return false; }
bool operator() (const String &, const Float64 &) const { return false; }
bool operator() (const String &, const Null &) const { return false; }
bool operator() (const String &, const UInt64 &) const { return false; }
bool operator() (const String &, const UInt128 &) const { return false; }
bool operator() (const String &, const Int64 &) const { return false; }
bool operator() (const String &, const Float64 &) const { return false; }
bool operator() (const String & l, const String & r) const { return l < r; }
bool operator() (const String &, const Array &) const { return true; }
bool operator() (const String &, const Tuple &) const { return true; }
bool operator() (const String &, const Array &) const { return true; }
bool operator() (const String &, const Tuple &) const { return true; }
bool operator() (const Array &, const Null &) const { return false; }
bool operator() (const Array &, const UInt64 &) const { return false; }
bool operator() (const Array &, const UInt128 &) const { return false; }
bool operator() (const Array &, const Int64 &) const { return false; }
bool operator() (const Array &, const Float64 &) const { return false; }
bool operator() (const Array &, const String &) const { return false; }
@ -294,6 +340,7 @@ public:
bool operator() (const Tuple &, const Null &) const { return false; }
bool operator() (const Tuple &, const UInt64 &) const { return false; }
bool operator() (const Tuple &, const UInt128 &) const { return false; }
bool operator() (const Tuple &, const Int64 &) const { return false; }
bool operator() (const Tuple &, const Float64 &) const { return false; }
bool operator() (const Tuple &, const String &) const { return false; }
@ -318,6 +365,7 @@ public:
bool operator() (Null &) const { throw Exception("Cannot sum Nulls", ErrorCodes::LOGICAL_ERROR); }
bool operator() (String &) const { throw Exception("Cannot sum Strings", ErrorCodes::LOGICAL_ERROR); }
bool operator() (Array &) const { throw Exception("Cannot sum Arrays", ErrorCodes::LOGICAL_ERROR); }
bool operator() (UInt128 &) const { throw Exception("Cannot sum UUIDs", ErrorCodes::LOGICAL_ERROR); }
};
}

View File

@ -54,7 +54,7 @@ void Block::insert(size_t position, const ColumnWithTypeAndName & elem)
if (name_pos.second >= position)
++name_pos.second;
index_by_name[elem.name] = position;
index_by_name.emplace(elem.name, position);
data.emplace(data.begin() + position, elem);
}
@ -68,20 +68,20 @@ void Block::insert(size_t position, ColumnWithTypeAndName && elem)
if (name_pos.second >= position)
++name_pos.second;
index_by_name[elem.name] = position;
index_by_name.emplace(elem.name, position);
data.emplace(data.begin() + position, std::move(elem));
}
void Block::insert(const ColumnWithTypeAndName & elem)
{
index_by_name[elem.name] = data.size();
index_by_name.emplace(elem.name, data.size());
data.emplace_back(elem);
}
void Block::insert(ColumnWithTypeAndName && elem)
{
index_by_name[elem.name] = data.size();
index_by_name.emplace(elem.name, data.size());
data.emplace_back(std::move(elem));
}

View File

@ -18,6 +18,7 @@ namespace DB
namespace ErrorCodes
{
extern const int EMPTY_DATA_PASSED;
extern const int NOT_IMPLEMENTED;
}
@ -34,6 +35,11 @@ DataTypePtr FieldToDataType::operator() (const UInt64 & x) const
return std::make_shared<DataTypeUInt64>();
}
DataTypePtr FieldToDataType::operator() (const UInt128 &) const
{
throw Exception("There are no UInt128 literals in SQL", ErrorCodes::NOT_IMPLEMENTED);
}
DataTypePtr FieldToDataType::operator() (const Int64 & x) const
{
if (x <= std::numeric_limits<Int8>::max() && x >= std::numeric_limits<Int8>::min()) return std::make_shared<DataTypeInt8>();

View File

@ -19,6 +19,7 @@ class FieldToDataType : public StaticVisitor<DataTypePtr>
public:
DataTypePtr operator() (const Null & x) const;
DataTypePtr operator() (const UInt64 & x) const;
DataTypePtr operator() (const UInt128 & x) const;
DataTypePtr operator() (const Int64 & x) const;
DataTypePtr operator() (const Float64 & x) const;
DataTypePtr operator() (const String & x) const;

View File

@ -41,7 +41,7 @@ generate_function_register(Array
FunctionArrayEnumerate
FunctionArrayEnumerateUniq
FunctionArrayUniq
FunctionArrayDistinct
FunctionArrayDistinct
FunctionEmptyArrayUInt8
FunctionEmptyArrayUInt16
FunctionEmptyArrayUInt32
@ -91,7 +91,7 @@ list(REMOVE_ITEM clickhouse_functions_headers IFunction.h FunctionFactory.h Func
add_library(clickhouse_functions ${clickhouse_functions_sources})
target_link_libraries(clickhouse_functions PUBLIC dbms PRIVATE libconsistent-hashing ${FARMHASH_LIBRARIES} ${METROHASH_LIBRARIES} ${MURMURHASH2_LIBRARIES})
target_link_libraries(clickhouse_functions PUBLIC dbms PRIVATE libconsistent-hashing ${FARMHASH_LIBRARIES} ${METROHASH_LIBRARIES} murmurhash)
target_include_directories (clickhouse_functions SYSTEM BEFORE PUBLIC ${DIVIDE_INCLUDE_DIR})

View File

@ -60,7 +60,7 @@ add_check (hashing_read_buffer)
add_executable (io_operators operators.cpp)
target_link_libraries (io_operators clickhouse_common_io)
if (NOT APPLE AND NOT ARCH_FREEBSD)
if (OS_LINUX)
add_executable(write_buffer_aio write_buffer_aio.cpp)
target_link_libraries (write_buffer_aio clickhouse_common_io ${Boost_FILESYSTEM_LIBRARY})

View File

@ -1,5 +1,5 @@
if (ARCH_FREEBSD)
if (OS_FREEBSD)
set (PATH_SHARE "/usr/local/share" CACHE STRING "")
else ()
set (PATH_SHARE "/usr/share" CACHE STRING "")

View File

@ -127,11 +127,7 @@ String Cluster::Address::toStringFull() const
Clusters::Clusters(Poco::Util::AbstractConfiguration & config, const Settings & settings, const String & config_name)
{
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_name, config_keys);
for (const auto & key : config_keys)
impl.emplace(key, std::make_shared<Cluster>(config, settings, config_name + "." + key));
updateClusters(config, settings, config_name);
}
@ -158,19 +154,9 @@ void Clusters::updateClusters(Poco::Util::AbstractConfiguration & config, const
std::lock_guard lock(mutex);
impl.clear();
for (const auto & key : config_keys)
{
auto it = impl.find(key);
auto new_cluster = std::make_shared<Cluster>(config, settings, config_name + "." + key);
if (it == impl.end())
impl.emplace(key, std::move(new_cluster));
else
{
//TODO: Check that cluster update is necessarily
it->second = std::move(new_cluster);
}
}
impl.emplace(key, std::make_shared<Cluster>(config, settings, config_name + "." + key));
}
Clusters::Impl Clusters::getContainer() const

View File

@ -1,59 +0,0 @@
#include <Interpreters/ClusterProxy/DescribeStreamFactory.h>
#include <Interpreters/InterpreterDescribeQuery.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/BlockExtraInfoInputStream.h>
#include <DataStreams/RemoteBlockInputStream.h>
namespace DB
{
namespace
{
BlockExtraInfo toBlockExtraInfo(const Cluster::Address & address)
{
BlockExtraInfo block_extra_info;
block_extra_info.host = address.host_name;
block_extra_info.resolved_address = address.getResolvedAddress().toString();
block_extra_info.port = address.port;
block_extra_info.user = address.user;
block_extra_info.is_valid = true;
return block_extra_info;
}
}
namespace ClusterProxy
{
void DescribeStreamFactory::createForShard(
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res)
{
for (const Cluster::Address & local_address : shard_info.local_addresses)
{
InterpreterDescribeQuery interpreter{query_ast, context};
BlockInputStreamPtr stream = interpreter.execute().in;
/** Materialization is needed, since from remote servers the constants come materialized.
* If you do not do this, different types (Const and non-Const) columns will be produced in different threads,
* And this is not allowed, since all code is based on the assumption that in the block stream all types are the same.
*/
BlockInputStreamPtr materialized_stream = std::make_shared<MaterializingBlockInputStream>(stream);
res.emplace_back(std::make_shared<BlockExtraInfoInputStream>(materialized_stream, toBlockExtraInfo(local_address)));
}
if (shard_info.hasRemoteConnections())
{
auto remote_stream = std::make_shared<RemoteBlockInputStream>(
shard_info.pool, query, InterpreterDescribeQuery::getSampleBlock(), context, nullptr, throttler);
remote_stream->setPoolMode(PoolMode::GET_ALL);
remote_stream->appendExtraInfo();
res.emplace_back(std::move(remote_stream));
}
}
}
}

View File

@ -1,23 +0,0 @@
#pragma once
#include <Interpreters/ClusterProxy/IStreamFactory.h>
namespace DB
{
namespace ClusterProxy
{
class DescribeStreamFactory final : public IStreamFactory
{
public:
void createForShard(
const Cluster::ShardInfo & shard_info,
const String & query, const ASTPtr & query_ast,
const Context & context, const ThrottlerPtr & throttler,
BlockInputStreams & res) override;
};
}
}

View File

@ -203,7 +203,7 @@ ExpressionAnalyzer::ExpressionAnalyzer(
const SubqueriesForSets & subqueries_for_set_)
: ast(ast_), context(context_), settings(context.getSettings()),
subquery_depth(subquery_depth_),
source_columns(source_columns_), required_result_columns(required_result_columns_.begin(), required_result_columns_.end()),
source_columns(source_columns_), required_result_columns(required_result_columns_),
storage(storage_),
do_global(do_global_), subqueries_for_sets(subqueries_for_set_)
{
@ -2847,7 +2847,8 @@ void ExpressionAnalyzer::appendProjectResult(ExpressionActionsChain & chain) con
for (size_t i = 0; i < asts.size(); ++i)
{
String result_name = asts[i]->getAliasOrColumnName();
if (required_result_columns.empty() || required_result_columns.count(result_name))
if (required_result_columns.empty()
|| std::find(required_result_columns.begin(), required_result_columns.end(), result_name) != required_result_columns.end())
{
result_columns.emplace_back(asts[i]->getColumnName(), result_name);
step.required_output.push_back(result_columns.back().second);
@ -3393,15 +3394,37 @@ void ExpressionAnalyzer::removeUnneededColumnsFromSelectClause()
if (!select_query)
return;
if (required_result_columns.empty() || select_query->distinct)
if (required_result_columns.empty())
return;
ASTs & elements = select_query->select_expression_list->children;
elements.erase(std::remove_if(elements.begin(), elements.end(), [this](const auto & node)
ASTs new_elements;
new_elements.reserve(elements.size());
/// Some columns may be queried multiple times, like SELECT x, y, y FROM table.
/// In that case we keep them exactly same number of times.
std::map<String, size_t> required_columns_with_duplicate_count;
for (const auto & name : required_result_columns)
++required_columns_with_duplicate_count[name];
for (const auto & elem : elements)
{
return !required_result_columns.count(node->getAliasOrColumnName()) && !hasArrayJoin(node);
}), elements.end());
String name = elem->getAliasOrColumnName();
auto it = required_columns_with_duplicate_count.find(name);
if (required_columns_with_duplicate_count.end() != it && it->second)
{
new_elements.push_back(elem);
--it->second;
}
else if (select_query->distinct || hasArrayJoin(elem))
{
new_elements.push_back(elem);
}
}
elements = std::move(new_elements);
}
}

View File

@ -204,7 +204,7 @@ private:
/** If non-empty, ignore all expressions in not from this list.
*/
NameSet required_result_columns;
Names required_result_columns;
/// Columns after ARRAY JOIN, JOIN, and/or aggregation.
NamesAndTypesList aggregated_columns;

View File

@ -228,6 +228,17 @@ void ExternalLoader::reloadFromConfigFiles(const bool throw_on_error, const bool
throw;
}
}
/// erase removed from config loadable objects
std::list<std::string> removed_loadable_objects;
for (const auto & loadable : loadable_objects)
{
const auto & current_config = loadable_objects_defined_in_config[loadable.second.origin];
if (current_config.find(loadable.first) == std::end(current_config))
removed_loadable_objects.emplace_back(loadable.first);
}
for(const auto & name : removed_loadable_objects)
loadable_objects.erase(name);
}
void ExternalLoader::reloadFromConfigFile(const std::string & config_path, const bool throw_on_error,
@ -250,6 +261,8 @@ void ExternalLoader::reloadFromConfigFile(const std::string & config_path, const
if (force_reload || last_modified > config_last_modified)
{
auto config = config_repository->load(config_path);
loadable_objects_defined_in_config[config_path].clear();
/// Definitions of loadable objects may have changed, recreate all of them
@ -282,7 +295,8 @@ void ExternalLoader::reloadFromConfigFile(const std::string & config_path, const
LOG_WARNING(log, config_path << ": " + config_settings.external_name + " name cannot be empty");
continue;
}
loadable_objects_defined_in_config[config_path].emplace(name);
if (!loadable_name.empty() && name != loadable_name)
continue;

View File

@ -5,6 +5,7 @@
#include <mutex>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <chrono>
#include <tuple>
#include <Interpreters/IExternalLoadable.h>
@ -146,6 +147,8 @@ private:
/// Both for loadable_objects and failed_loadable_objects.
std::unordered_map<std::string, std::chrono::system_clock::time_point> update_times;
std::unordered_map<std::string, std::unordered_set<std::string>> loadable_objects_defined_in_config;
pcg64 rnd_engine{randomSeed()};
const Configuration & config;
@ -166,8 +169,8 @@ private:
/// Check objects definitions in config files and reload or/and add new ones if the definition is changed
/// If loadable_name is not empty, load only loadable object with name loadable_name
void reloadFromConfigFiles(bool throw_on_error, bool force_reload = false, const std::string & loadable_name = "");
void reloadFromConfigFile(const std::string & config_path, bool throw_on_error, bool force_reload,
const std::string & loadable_name);
void reloadFromConfigFile(const std::string & config_path, const bool throw_on_error,
const bool force_reload, const std::string & loadable_name);
/// Check config files and update expired loadable objects
void reloadAndUpdate(bool throw_on_error = false);

View File

@ -1,92 +1,16 @@
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterCheckQuery.h>
#include <Storages/IStorage.h>
#include <Parsers/ASTCheckQuery.h>
#include <Storages/StorageDistributed.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/UnionBlockInputStream.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
#include <openssl/sha.h>
#include <deque>
#include <array>
namespace DB
{
namespace ErrorCodes
{
extern const int INVALID_BLOCK_EXTRA_INFO;
extern const int RECEIVED_EMPTY_DATA;
}
namespace
{
/// A helper structure for performing a response to a DESCRIBE TABLE query with a Distributed table.
/// Contains information about the local table that was retrieved from a single replica.
struct TableDescription
{
TableDescription(const Block & block, const BlockExtraInfo & extra_info_)
: extra_info(extra_info_)
{
const auto & name_column = typeid_cast<const ColumnString &>(*block.getByName("name").column);
const auto & type_column = typeid_cast<const ColumnString &>(*block.getByName("type").column);
const auto & default_type_column = typeid_cast<const ColumnString &>(*block.getByName("default_type").column);
const auto & default_expression_column = typeid_cast<const ColumnString &>(*block.getByName("default_expression").column);
size_t row_count = block.rows();
names_with_types.reserve(name_column.byteSize() + type_column.byteSize() + (3 * row_count));
SHA512_CTX ctx;
SHA512_Init(&ctx);
bool is_first = true;
for (size_t i = 0; i < row_count; ++i)
{
const auto & name = name_column.getDataAt(i).toString();
const auto & type = type_column.getDataAt(i).toString();
const auto & default_type = default_type_column.getDataAt(i).toString();
const auto & default_expression = default_expression_column.getDataAt(i).toString();
names_with_types.append(is_first ? "" : ", ");
names_with_types.append(name);
names_with_types.append(" ");
names_with_types.append(type);
SHA512_Update(&ctx, reinterpret_cast<const unsigned char *>(name.data()), name.size());
SHA512_Update(&ctx, reinterpret_cast<const unsigned char *>(type.data()), type.size());
SHA512_Update(&ctx, reinterpret_cast<const unsigned char *>(default_type.data()), default_type.size());
SHA512_Update(&ctx, reinterpret_cast<const unsigned char *>(default_expression.data()), default_expression.size());
is_first = false;
}
SHA512_Final(hash.data(), &ctx);
}
using Hash = std::array<unsigned char, SHA512_DIGEST_LENGTH>;
BlockExtraInfo extra_info;
std::string names_with_types;
Hash hash;
UInt32 structure_class;
};
inline bool operator<(const TableDescription & lhs, const TableDescription & rhs)
{
return lhs.hash < rhs.hash;
}
using TableDescriptions = std::deque<TableDescription>;
}
InterpreterCheckQuery::InterpreterCheckQuery(const ASTPtr & query_ptr_, const Context & context_)
: query_ptr(query_ptr_), context(context_)
{
@ -101,120 +25,14 @@ BlockIO InterpreterCheckQuery::execute()
StoragePtr table = context.getTable(database_name, table_name);
auto distributed_table = dynamic_cast<StorageDistributed *>(&*table);
if (distributed_table != nullptr)
{
/// For tables with the Distributed engine, the CHECK TABLE query sends a DESCRIBE TABLE request to all replicas.
/// The identity of the structures is checked (column names + column types + default types + expressions
/// by default) of the tables that the distributed table looks at.
auto column = ColumnUInt8::create();
column->insert(UInt64(table->checkData()));
result = Block{{ std::move(column), std::make_shared<DataTypeUInt8>(), "result" }};
const auto & settings = context.getSettingsRef();
BlockIO res;
res.in = std::make_shared<OneBlockInputStream>(result);
BlockInputStreams streams = distributed_table->describe(context, settings);
streams[0] = std::make_shared<UnionBlockInputStream<StreamUnionMode::ExtraInfo>>(
streams, nullptr, settings.max_distributed_connections);
streams.resize(1);
auto stream_ptr = dynamic_cast<IProfilingBlockInputStream *>(&*streams[0]);
if (stream_ptr == nullptr)
throw Exception("InterpreterCheckQuery: Internal error", ErrorCodes::LOGICAL_ERROR);
auto & stream = *stream_ptr;
/// Get all data from the DESCRIBE TABLE queries.
TableDescriptions table_descriptions;
while (true)
{
if (stream.isCancelledOrThrowIfKilled())
{
BlockIO res;
res.in = std::make_shared<OneBlockInputStream>(result);
return res;
}
Block block = stream.read();
if (!block)
break;
BlockExtraInfo info = stream.getBlockExtraInfo();
if (!info.is_valid)
throw Exception("Received invalid block extra info", ErrorCodes::INVALID_BLOCK_EXTRA_INFO);
table_descriptions.emplace_back(block, info);
}
if (table_descriptions.empty())
throw Exception("Received empty data", ErrorCodes::RECEIVED_EMPTY_DATA);
/// Define an equivalence class for each table structure.
std::sort(table_descriptions.begin(), table_descriptions.end());
UInt32 structure_class = 0;
auto it = table_descriptions.begin();
it->structure_class = structure_class;
auto prev = it;
for (++it; it != table_descriptions.end(); ++it)
{
if (*prev < *it)
++structure_class;
it->structure_class = structure_class;
prev = it;
}
/// Construct the result.
MutableColumnPtr status_column = ColumnUInt8::create();
MutableColumnPtr host_name_column = ColumnString::create();
MutableColumnPtr host_address_column = ColumnString::create();
MutableColumnPtr port_column = ColumnUInt16::create();
MutableColumnPtr user_column = ColumnString::create();
MutableColumnPtr structure_class_column = ColumnUInt32::create();
MutableColumnPtr structure_column = ColumnString::create();
/// This value is 1 if the structure is not disposed of anywhere, but 0 otherwise.
UInt8 status_value = (structure_class == 0) ? 1 : 0;
for (const auto & desc : table_descriptions)
{
status_column->insert(static_cast<UInt64>(status_value));
structure_class_column->insert(static_cast<UInt64>(desc.structure_class));
host_name_column->insert(desc.extra_info.host);
host_address_column->insert(desc.extra_info.resolved_address);
port_column->insert(static_cast<UInt64>(desc.extra_info.port));
user_column->insert(desc.extra_info.user);
structure_column->insert(desc.names_with_types);
}
Block block;
block.insert(ColumnWithTypeAndName(std::move(status_column), std::make_shared<DataTypeUInt8>(), "status"));
block.insert(ColumnWithTypeAndName(std::move(host_name_column), std::make_shared<DataTypeString>(), "host_name"));
block.insert(ColumnWithTypeAndName(std::move(host_address_column), std::make_shared<DataTypeString>(), "host_address"));
block.insert(ColumnWithTypeAndName(std::move(port_column), std::make_shared<DataTypeUInt16>(), "port"));
block.insert(ColumnWithTypeAndName(std::move(user_column), std::make_shared<DataTypeString>(), "user"));
block.insert(ColumnWithTypeAndName(std::move(structure_class_column), std::make_shared<DataTypeUInt32>(), "structure_class"));
block.insert(ColumnWithTypeAndName(std::move(structure_column), std::make_shared<DataTypeString>(), "structure"));
BlockIO res;
res.in = std::make_shared<OneBlockInputStream>(block);
return res;
}
else
{
auto column = ColumnUInt8::create();
column->insert(UInt64(table->checkData()));
result = Block{{ std::move(column), std::make_shared<DataTypeUInt8>(), "result" }};
BlockIO res;
res.in = std::make_shared<OneBlockInputStream>(result);
return res;
}
return res;
}
}

View File

@ -11,6 +11,7 @@
#include <Common/typeid_cast.h>
#include <Parsers/queryToString.h>
namespace DB
{
@ -69,7 +70,11 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
ast.list_of_selects->children.at(query_num), context, Names(), to_stage, subquery_depth, true).getSampleBlock();
if (full_result_header_for_current_select.columns() != full_result_header.columns())
throw Exception("Different number of columns in UNION ALL elements", ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH);
throw Exception("Different number of columns in UNION ALL elements:\n"
+ full_result_header.dumpNames()
+ "\nand\n"
+ full_result_header_for_current_select.dumpNames() + "\n",
ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH);
required_result_column_names_for_other_selects[query_num].reserve(required_result_column_names.size());
for (const auto & pos : positions_of_required_result_columns)
@ -87,7 +92,7 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
ast.list_of_selects->children.at(query_num), context, current_required_result_column_names, to_stage, subquery_depth, only_analyze));
}
/// Determine structure of result.
/// Determine structure of the result.
if (num_selects == 1)
{
@ -104,7 +109,11 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
for (size_t query_num = 1; query_num < num_selects; ++query_num)
if (headers[query_num].columns() != num_columns)
throw Exception("Different number of columns in UNION ALL elements", ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH);
throw Exception("Different number of columns in UNION ALL elements:\n"
+ result_header.dumpNames()
+ "\nand\n"
+ headers[query_num].dumpNames() + "\n",
ErrorCodes::UNION_ALL_RESULT_STRUCTURES_MISMATCH);
for (size_t column_num = 0; column_num < num_columns; ++column_num)
{

View File

@ -0,0 +1,44 @@
#include <Common/config.h>
#if USE_RDKAFKA
#include <Storages/Kafka/KafkaSettings.h>
#include <Parsers/ASTCreateQuery.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
void KafkaSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)
{
for (const ASTSetQuery::Change & setting : storage_def.settings->changes)
{
#define SET(TYPE, NAME, DEFAULT, DESCRIPTION) \
else if (setting.name == #NAME) NAME.set(setting.value);
if (false) {}
APPLY_FOR_KAFKA_SETTINGS(SET)
else
throw Exception(
"Unknown setting " + setting.name + " for storage " + storage_def.engine->name,
ErrorCodes::BAD_ARGUMENTS);
#undef SET
}
}
else
{
auto settings_ast = std::make_shared<ASTSetQuery>();
settings_ast->is_standalone = false;
storage_def.set(storage_def.settings, settings_ast);
}
}
}
#endif

View File

@ -0,0 +1,43 @@
#pragma once
#include <Common/config.h>
#if USE_RDKAFKA
#include <Poco/Util/AbstractConfiguration.h>
#include <Core/Defines.h>
#include <Core/Types.h>
#include <Interpreters/SettingsCommon.h>
namespace DB
{
class ASTStorage;
/** Settings for the Kafka engine.
* Could be loaded from a CREATE TABLE query (SETTINGS clause).
*/
struct KafkaSettings
{
#define APPLY_FOR_KAFKA_SETTINGS(M) \
M(SettingString, kafka_broker_list, "", "A comma-separated list of brokers for Kafka engine.") \
M(SettingString, kafka_topic_list, "", "A list of Kafka topics.") \
M(SettingString, kafka_group_name, "", "A group of Kafka consumers.") \
M(SettingString, kafka_format, "", "Message format for Kafka engine.") \
M(SettingChar, kafka_row_delimiter, '\0', "The character to be considered as a delimiter in Kafka message.") \
M(SettingString, kafka_schema, "", "Schema identifier (used by schema-based formats) for Kafka engine") \
M(SettingUInt64, kafka_num_consumers, 1, "The number of consumers per table for Kafka engine.")
#define DECLARE(TYPE, NAME, DEFAULT, DESCRIPTION) \
TYPE NAME {DEFAULT};
APPLY_FOR_KAFKA_SETTINGS(DECLARE)
#undef DECLARE
public:
void loadFromQuery(ASTStorage & storage_def);
};
}
#endif

View File

@ -23,7 +23,9 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTInsertQuery.h>
#include <Parsers/ASTLiteral.h>
#include <Storages/StorageKafka.h> // Y_IGNORE
#include <Parsers/ASTCreateQuery.h>
#include <Storages/Kafka/KafkaSettings.h>
#include <Storages/Kafka/StorageKafka.h> // Y_IGNORE
#include <Storages/StorageFactory.h>
#include <IO/ReadBuffer.h>
#include <common/logger_useful.h>
@ -566,93 +568,200 @@ void registerStorageKafka(StorageFactory & factory)
factory.registerStorage("Kafka", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
size_t args_count = engine_args.size();
bool has_settings = args.storage_def->settings;
KafkaSettings kafka_settings;
if (has_settings)
{
kafka_settings.loadFromQuery(*args.storage_def);
}
/** Arguments of engine is following:
* - Kafka broker list
* - List of topics
* - Group ID (may be a constaint expression with a string result)
* - Message format (string)
* - Row delimiter
* - Schema (optional, if the format supports it)
* - Number of consumers
*/
if (engine_args.size() < 3 || engine_args.size() > 7)
throw Exception(
"Storage Kafka requires 3-7 parameters"
" - Kafka broker list, list of topics to consume, consumer group ID, message format, row delimiter, schema, number of consumers",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
// Check arguments and settings
#define CHECK_KAFKA_STORAGE_ARGUMENT(ARG_NUM, PAR_NAME) \
/* One of the four required arguments is not specified */ \
if (args_count < ARG_NUM && ARG_NUM <= 4 && \
!kafka_settings.PAR_NAME.changed) \
{ \
throw Exception( \
"Required parameter '" #PAR_NAME "' " \
"for storage Kafka not specified", \
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH); \
} \
/* The same argument is given in two places */ \
if (has_settings && \
kafka_settings.PAR_NAME.changed && \
args_count >= ARG_NUM) \
{ \
throw Exception( \
"The argument №" #ARG_NUM " of storage Kafka " \
"and the parameter '" #PAR_NAME "' " \
"in SETTINGS cannot be specified at the same time", \
ErrorCodes::BAD_ARGUMENTS); \
}
CHECK_KAFKA_STORAGE_ARGUMENT(1, kafka_broker_list)
CHECK_KAFKA_STORAGE_ARGUMENT(2, kafka_topic_list)
CHECK_KAFKA_STORAGE_ARGUMENT(3, kafka_group_name)
CHECK_KAFKA_STORAGE_ARGUMENT(4, kafka_format)
CHECK_KAFKA_STORAGE_ARGUMENT(5, kafka_row_delimiter)
CHECK_KAFKA_STORAGE_ARGUMENT(6, kafka_schema)
CHECK_KAFKA_STORAGE_ARGUMENT(7, kafka_num_consumers)
#undef CHECK_KAFKA_STORAGE_ARGUMENT
// Get and check broker list
String brokers;
auto ast = typeid_cast<const ASTLiteral *>(engine_args[0].get());
if (ast && ast->value.getType() == Field::Types::String)
brokers = safeGet<String>(ast->value);
else
throw Exception(String("Kafka broker list must be a string"), ErrorCodes::BAD_ARGUMENTS);
if (args_count >= 1)
{
auto ast = typeid_cast<const ASTLiteral *>(engine_args[0].get());
if (ast && ast->value.getType() == Field::Types::String)
{
brokers = safeGet<String>(ast->value);
}
else
{
throw Exception(String("Kafka broker list must be a string"), ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_broker_list.changed)
{
brokers = kafka_settings.kafka_broker_list.value;
}
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context);
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context);
// Get and check topic list
String topic_list;
if (args_count >= 2)
{
engine_args[1] = evaluateConstantExpressionAsLiteral(engine_args[1], args.local_context);
topic_list = static_cast<const ASTLiteral &>(*engine_args[1]).value.safeGet<String>();
}
else if (kafka_settings.kafka_topic_list.changed)
{
topic_list = kafka_settings.kafka_topic_list.value;
}
Names topics;
boost::split(topics, topic_list , [](char c){ return c == ','; });
for (String & topic : topics)
{
boost::trim(topic);
}
// Get and check group name
String group;
if (args_count >= 3)
{
engine_args[2] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[2], args.local_context);
group = static_cast<const ASTLiteral &>(*engine_args[2]).value.safeGet<String>();
}
else if (kafka_settings.kafka_group_name.changed)
{
group = kafka_settings.kafka_group_name.value;
}
// Get and check message format name
String format;
if (args_count >= 4)
{
engine_args[3] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[3], args.local_context);
auto ast = typeid_cast<const ASTLiteral *>(engine_args[3].get());
if (ast && ast->value.getType() == Field::Types::String)
{
format = safeGet<String>(ast->value);
}
else
{
throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_format.changed)
{
format = kafka_settings.kafka_format.value;
}
// Parse row delimiter (optional)
char row_delimiter = '\0';
if (engine_args.size() >= 5)
if (args_count >= 5)
{
engine_args[4] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);
auto ast = typeid_cast<const ASTLiteral *>(engine_args[4].get());
String arg;
if (ast && ast->value.getType() == Field::Types::String)
{
arg = safeGet<String>(ast->value);
}
else
{
throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
}
if (arg.size() > 1)
{
throw Exception("Row delimiter must be a char", ErrorCodes::BAD_ARGUMENTS);
}
else if (arg.size() == 0)
{
row_delimiter = '\0';
}
else
{
row_delimiter = arg[0];
}
}
else if (kafka_settings.kafka_row_delimiter.changed)
{
row_delimiter = kafka_settings.kafka_row_delimiter.value;
}
// Parse format schema if supported (optional)
String schema;
if (engine_args.size() >= 6)
if (args_count >= 6)
{
engine_args[5] = evaluateConstantExpressionOrIdentifierAsLiteral(engine_args[4], args.local_context);
auto ast = typeid_cast<const ASTLiteral *>(engine_args[5].get());
if (ast && ast->value.getType() == Field::Types::String)
{
schema = safeGet<String>(ast->value);
}
else
{
throw Exception("Format schema must be a string", ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_schema.changed)
{
schema = kafka_settings.kafka_schema.value;
}
// Parse number of consumers (optional)
UInt64 num_consumers = 1;
if (engine_args.size() >= 7)
if (args_count >= 7)
{
auto ast = typeid_cast<const ASTLiteral *>(engine_args[6].get());
if (ast && ast->value.getType() == Field::Types::UInt64)
{
num_consumers = safeGet<UInt64>(ast->value);
}
else
{
throw Exception("Number of consumers must be a positive integer", ErrorCodes::BAD_ARGUMENTS);
}
}
else if (kafka_settings.kafka_num_consumers.changed)
{
num_consumers = kafka_settings.kafka_num_consumers.value;
}
// Parse topic list
Names topics;
String topic_arg = static_cast<const ASTLiteral &>(*engine_args[1]).value.safeGet<String>();
boost::split(topics, topic_arg , [](char c){ return c == ','; });
for(String & topic : topics)
boost::trim(topic);
// Parse consumer group
String group = static_cast<const ASTLiteral &>(*engine_args[2]).value.safeGet<String>();
// Parse format from string
String format;
ast = typeid_cast<const ASTLiteral *>(engine_args[3].get());
if (ast && ast->value.getType() == Field::Types::String)
format = safeGet<String>(ast->value);
else
throw Exception("Format must be a string", ErrorCodes::BAD_ARGUMENTS);
return StorageKafka::create(
args.table_name, args.database_name, args.context, args.columns,

View File

@ -1784,6 +1784,21 @@ size_t MergeTreeData::getMaxPartsCountForPartition() const
}
std::optional<Int64> MergeTreeData::getMinPartDataVersion() const
{
std::lock_guard lock(data_parts_mutex);
std::optional<Int64> result;
for (const DataPartPtr & part : getDataPartsStateRange(DataPartState::Committed))
{
if (!result || *result > part->info.getDataVersion())
result = part->info.getDataVersion();
}
return result;
}
void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event *until) const
{
const size_t parts_count = getMaxPartsCountForPartition();

View File

@ -384,6 +384,10 @@ public:
size_t getMaxPartsCountForPartition() const;
/// Get min value of part->info.getDataVersion() for all active parts.
/// Makes sense only for ordinary MergeTree engines because for them block numbering doesn't depend on partition.
std::optional<Int64> getMinPartDataVersion() const;
/// If the table contains too many active parts, sleep for a while to give them time to merge.
/// If until is non-null, wake up from the sleep earlier if the event happened.
void delayInsertOrThrowIfNeeded(Poco::Event * until = nullptr) const;

View File

@ -21,6 +21,8 @@ struct MergeTreeMutationEntry
/// Create a new entry and write it to a temporary file.
MergeTreeMutationEntry(MutationCommands commands_, const String & path_prefix_, Int64 tmp_number);
MergeTreeMutationEntry(const MergeTreeMutationEntry &) = delete;
MergeTreeMutationEntry(MergeTreeMutationEntry &&) = default;
/// Commit entry and rename it to a permanent file.
void commit(Int64 block_number_);

View File

@ -139,7 +139,11 @@ struct MergeTreeSettings
* instead of ordinary ones (dozens KB). \
* Before enabling check that all replicas support new format. \
*/ \
M(SettingBool, use_minimalistic_checksums_in_zookeeper, true)
M(SettingBool, use_minimalistic_checksums_in_zookeeper, true) \
\
/** How many records about mutations that are done to keep. \
* If zero, then keep all of them */ \
M(SettingUInt64, finished_mutations_to_keep, 100)
/// Settings that should not change after the creation of a table.
#define APPLY_FOR_IMMUTABLE_MERGE_TREE_SETTINGS(M) \

View File

@ -59,6 +59,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
{
clearOldLogs();
clearOldBlocks();
clearOldMutations();
}
}
@ -236,4 +237,63 @@ void ReplicatedMergeTreeCleanupThread::getBlocksSortedByTime(zkutil::ZooKeeper &
std::sort(timed_blocks.begin(), timed_blocks.end(), NodeWithStat::greaterByTime);
}
void ReplicatedMergeTreeCleanupThread::clearOldMutations()
{
if (!storage.data.settings.finished_mutations_to_keep)
return;
if (storage.queue.countFinishedMutations() <= storage.data.settings.finished_mutations_to_keep)
{
/// Not strictly necessary, but helps to avoid unnecessary ZooKeeper requests.
/// If even this replica hasn't finished enough mutations yet, then we don't need to clean anything.
return;
}
auto zookeeper = storage.getZooKeeper();
zkutil::Stat replicas_stat;
Strings replicas = zookeeper->getChildren(storage.zookeeper_path + "/replicas", &replicas_stat);
UInt64 min_pointer = std::numeric_limits<UInt64>::max();
for (const String & replica : replicas)
{
String pointer;
zookeeper->tryGet(storage.zookeeper_path + "/replicas/" + replica + "/mutation_pointer", pointer);
if (pointer.empty())
return; /// One replica hasn't done anything yet so we can't delete any mutations.
min_pointer = std::min(parse<UInt64>(pointer), min_pointer);
}
Strings entries = zookeeper->getChildren(storage.zookeeper_path + "/mutations");
std::sort(entries.begin(), entries.end());
/// Do not remove entries that are greater than `min_pointer` (they are not done yet).
entries.erase(std::upper_bound(entries.begin(), entries.end(), padIndex(min_pointer)), entries.end());
/// Do not remove last `storage.data.settings.finished_mutations_to_keep` entries.
if (entries.size() <= storage.data.settings.finished_mutations_to_keep)
return;
entries.erase(entries.end() - storage.data.settings.finished_mutations_to_keep, entries.end());
if (entries.empty())
return;
zkutil::Requests ops;
size_t batch_start_i = 0;
for (size_t i = 0; i < entries.size(); ++i)
{
ops.emplace_back(zkutil::makeRemoveRequest(storage.zookeeper_path + "/mutations/" + entries[i], -1));
if (ops.size() > 4 * zkutil::MULTI_BATCH_SIZE || i + 1 == entries.size())
{
/// Simultaneously with clearing the log, we check to see if replica was added since we received replicas list.
ops.emplace_back(zkutil::makeCheckRequest(storage.zookeeper_path + "/replicas", replicas_stat.version));
zookeeper->multi(ops);
LOG_DEBUG(log, "Removed " << (i + 1 - batch_start_i) << " old mutation entries: " << entries[batch_start_i] << " - " << entries[i]);
batch_start_i = i + 1;
ops.clear();
}
}
}
}

View File

@ -50,6 +50,9 @@ private:
/// Remove old block hashes from ZooKeeper. This is done by the leader replica.
void clearOldBlocks();
/// Remove old mutations that are done from ZooKeeper. This is done by the leader replica.
void clearOldMutations();
using NodeCTimeCache = std::map<String, Int64>;
NodeCTimeCache cached_block_stats;

View File

@ -1031,6 +1031,45 @@ bool ReplicatedMergeTreeQueue::processEntry(
}
size_t ReplicatedMergeTreeQueue::countMergesAndPartMutations() const
{
std::lock_guard lock(state_mutex);
size_t count = 0;
for (const auto & entry : queue)
if (entry->type == ReplicatedMergeTreeLogEntry::MERGE_PARTS
|| entry->type == ReplicatedMergeTreeLogEntry::MUTATE_PART)
++count;
return count;
}
size_t ReplicatedMergeTreeQueue::countMutations() const
{
std::lock_guard lock(state_mutex);
return mutations_by_znode.size();
}
size_t ReplicatedMergeTreeQueue::countFinishedMutations() const
{
std::lock_guard lock(state_mutex);
size_t count = 0;
for (const auto & pair : mutations_by_znode)
{
const auto & mutation = pair.second;
if (!mutation.is_done)
break;
++count;
}
return count;
}
ReplicatedMergeTreeMergePredicate ReplicatedMergeTreeQueue::getMergePredicate(zkutil::ZooKeeperPtr & zookeeper)
{
return ReplicatedMergeTreeMergePredicate(*this, zookeeper);
@ -1124,6 +1163,8 @@ bool ReplicatedMergeTreeQueue::tryFinalizeMutations(zkutil::ZooKeeperPtr zookeep
{
std::lock_guard lock(state_mutex);
mutation_pointer = finished.back()->znode_name;
for (const ReplicatedMergeTreeMutationEntry * entry : finished)
{
auto it = mutations_by_znode.find(entry->znode_name);
@ -1476,27 +1517,6 @@ bool ReplicatedMergeTreeMergePredicate::operator()(
}
size_t ReplicatedMergeTreeMergePredicate::countMergesAndPartMutations() const
{
std::lock_guard lock(queue.state_mutex);
size_t count = 0;
for (const auto & entry : queue.queue)
if (entry->type == ReplicatedMergeTreeLogEntry::MERGE_PARTS
|| entry->type == ReplicatedMergeTreeLogEntry::MUTATE_PART)
++count;
return count;
}
size_t ReplicatedMergeTreeMergePredicate::countMutations() const
{
std::lock_guard lock(queue.state_mutex);
return queue.mutations_by_znode.size();
}
std::optional<Int64> ReplicatedMergeTreeMergePredicate::getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const
{
/// Assigning mutations is easier than assigning merges because mutations appear in the same order as

View File

@ -275,6 +275,15 @@ public:
*/
bool processEntry(std::function<zkutil::ZooKeeperPtr()> get_zookeeper, LogEntryPtr & entry, const std::function<bool(LogEntryPtr &)> func);
/// Count the number of merges and mutations of single parts in the queue.
size_t countMergesAndPartMutations() const;
/// Count the total number of active mutations.
size_t countMutations() const;
/// Count the total number of active mutations that are finished (is_done = true).
size_t countFinishedMutations() const;
ReplicatedMergeTreeMergePredicate getMergePredicate(zkutil::ZooKeeperPtr & zookeeper);
/// Return the version (block number) of the last mutation that we don't need to apply to the part
@ -345,12 +354,6 @@ public:
const MergeTreeData::DataPartPtr & left, const MergeTreeData::DataPartPtr & right,
String * out_reason = nullptr) const;
/// Count the number of merges and mutations of single parts in the queue.
size_t countMergesAndPartMutations() const;
/// Count the total number of active mutations.
size_t countMutations() const;
/// Return nonempty optional if the part can and should be mutated.
/// Returned mutation version number is always the biggest possible.
std::optional<Int64> getDesiredMutationVersion(const MergeTreeData::DataPartPtr & part) const;

View File

@ -30,7 +30,6 @@
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/ClusterProxy/executeQuery.h>
#include <Interpreters/ClusterProxy/SelectStreamFactory.h>
#include <Interpreters/ClusterProxy/DescribeStreamFactory.h>
#include <Interpreters/getClusterName.h>
#include <Core/Field.h>
@ -245,7 +244,6 @@ BlockInputStreams StorageDistributed::read(
? QueryProcessingStage::Complete
: QueryProcessingStage::WithMergeableState;
const auto & modified_query_ast = rewriteSelectQuery(
query_info.query, remote_database, remote_table, remote_table_function_ptr);
@ -318,34 +316,6 @@ void StorageDistributed::shutdown()
}
BlockInputStreams StorageDistributed::describe(const Context & context, const Settings & settings)
{
/// Create DESCRIBE TABLE query.
auto cluster = getCluster();
auto describe_query = std::make_shared<ASTDescribeQuery>();
std::string name = remote_database + '.' + remote_table;
auto id = std::make_shared<ASTIdentifier>(name);
auto desc_database = std::make_shared<ASTIdentifier>(remote_database);
auto desc_table = std::make_shared<ASTIdentifier>(remote_table);
id->children.push_back(desc_database);
id->children.push_back(desc_table);
auto table_expression = std::make_shared<ASTTableExpression>();
table_expression->database_and_table_name = id;
describe_query->table_expression = table_expression;
ClusterProxy::DescribeStreamFactory describe_stream_factory;
return ClusterProxy::executeQuery(
describe_stream_factory, cluster, describe_query, context, settings);
}
void StorageDistributed::truncate(const ASTPtr &)
{
std::lock_guard lock(cluster_nodes_mutex);

View File

@ -85,9 +85,6 @@ public:
String getDataPath() const override { return path; }
/// From each replica, get a description of the corresponding local table.
BlockInputStreams describe(const Context & context, const Settings & settings);
const ExpressionActionsPtr & getShardingKeyExpr() const { return sharding_key_expr; }
const String & getShardingKeyColumnName() const { return sharding_key_column_name; }
size_t getShardCount() const;

View File

@ -87,11 +87,19 @@ StoragePtr StorageFactory::get(
name = engine_def.name;
if ((storage_def->partition_by || storage_def->order_by || storage_def->sample_by || storage_def->settings)
if (storage_def->settings && !endsWith(name, "MergeTree") && name != "Kafka")
{
throw Exception(
"Engine " + name + " doesn't support SETTINGS clause. "
"Currently only the MergeTree family of engines and Kafka engine supports it",
ErrorCodes::BAD_ARGUMENTS);
}
if ((storage_def->partition_by || storage_def->order_by || storage_def->sample_by)
&& !endsWith(name, "MergeTree"))
{
throw Exception(
"Engine " + name + " doesn't support PARTITION BY, ORDER BY, SAMPLE BY or SETTINGS clauses. "
"Engine " + name + " doesn't support PARTITION BY, ORDER BY or SAMPLE BY clauses. "
"Currently only the MergeTree family of engines supports them", ErrorCodes::BAD_ARGUMENTS);
}

View File

@ -599,6 +599,7 @@ bool StorageMergeTree::backgroundTask()
{
data.clearOldPartsFromFilesystem();
data.clearOldTemporaryDirectories();
clearOldMutations();
}
size_t aio_threshold = context.getSettings().min_bytes_to_use_direct_io;
@ -631,6 +632,46 @@ Int64 StorageMergeTree::getCurrentMutationVersion(
return it->first;
};
void StorageMergeTree::clearOldMutations()
{
if (!data.settings.finished_mutations_to_keep)
return;
std::vector<MergeTreeMutationEntry> mutations_to_delete;
{
std::lock_guard lock(currently_merging_mutex);
if (current_mutations_by_version.size() <= data.settings.finished_mutations_to_keep)
return;
auto begin_it = current_mutations_by_version.begin();
std::optional<Int64> min_version = data.getMinPartDataVersion();
auto end_it = current_mutations_by_version.end();
if (min_version)
end_it = current_mutations_by_version.upper_bound(*min_version);
size_t done_count = std::distance(begin_it, end_it);
if (done_count <= data.settings.finished_mutations_to_keep)
return;
size_t to_delete_count = done_count - data.settings.finished_mutations_to_keep;
auto it = begin_it;
for (size_t i = 0; i < to_delete_count; ++i)
{
mutations_to_delete.push_back(std::move(it->second));
it = current_mutations_by_version.erase(it);
}
}
for (auto & mutation : mutations_to_delete)
{
LOG_TRACE(log, "Removing mutation: " << mutation.file_name);
mutation.removeFile();
}
}
void StorageMergeTree::clearColumnInPartition(const ASTPtr & partition, const Field & column_name, const Context & context)
{

View File

@ -140,6 +140,8 @@ private:
const MergeTreeData::DataPartPtr & part,
std::lock_guard<std::mutex> & /* currently_merging_mutex_lock */) const;
void clearOldMutations();
friend class MergeTreeBlockOutputStream;
friend class MergeTreeData;
friend struct CurrentlyMergingPartsTagger;

View File

@ -2197,7 +2197,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
/// If many merges is already queued, then will queue only small enough merges.
/// Otherwise merge queue could be filled with only large merges,
/// and in the same time, many small parts could be created and won't be merged.
size_t merges_and_mutations_queued = merge_pred.countMergesAndPartMutations();
size_t merges_and_mutations_queued = queue.countMergesAndPartMutations();
if (merges_and_mutations_queued >= data.settings.max_replicated_merges_in_queue)
{
LOG_TRACE(log, "Number of queued merges and part mutations (" << merges_and_mutations_queued
@ -2216,7 +2216,7 @@ void StorageReplicatedMergeTree::mergeSelectingTask()
{
success = createLogEntryToMergeParts(zookeeper, future_merged_part.parts, future_merged_part.name, deduplicate);
}
else if (merge_pred.countMutations() > 0)
else if (queue.countMutations() > 0)
{
/// Choose a part to mutate.

View File

@ -1,6 +1,5 @@
import os.path as p
import time
import datetime
import pytest
from helpers.cluster import ClickHouseCluster
@ -10,9 +9,11 @@ import json
import subprocess
cluster = ClickHouseCluster(__file__)
instance = cluster.add_instance('instance', main_configs=['configs/kafka.xml'], with_kafka = True)
instance = cluster.add_instance('instance',
main_configs=['configs/kafka.xml'],
with_kafka=True)
@pytest.fixture(scope="module")
def started_cluster():
@ -25,23 +26,36 @@ def started_cluster():
finally:
cluster.shutdown()
def kafka_is_available(started_cluster):
p = subprocess.Popen(('docker', 'exec', '-i', started_cluster.kafka_docker_id, '/usr/bin/kafka-broker-api-versions', '--bootstrap-server', 'PLAINTEXT://localhost:9092'), stdout=subprocess.PIPE)
streamdata = p.communicate()[0]
p = subprocess.Popen(('docker',
'exec',
'-i',
started_cluster.kafka_docker_id,
'/usr/bin/kafka-broker-api-versions',
'--bootstrap-server',
'PLAINTEXT://localhost:9092'),
stdout=subprocess.PIPE)
p.communicate()[0]
return p.returncode == 0
def kafka_produce(started_cluster, topic, messages):
p = subprocess.Popen(('docker', 'exec', '-i', started_cluster.kafka_docker_id, '/usr/bin/kafka-console-producer', '--broker-list', 'localhost:9092', '--topic', topic), stdin=subprocess.PIPE)
p = subprocess.Popen(('docker',
'exec',
'-i',
started_cluster.kafka_docker_id,
'/usr/bin/kafka-console-producer',
'--broker-list',
'localhost:9092',
'--topic',
topic),
stdin=subprocess.PIPE)
p.communicate(messages)
p.stdin.close()
def test_kafka_json(started_cluster):
instance.query('''
DROP TABLE IF EXISTS test.kafka;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka('kafka1:9092', 'json', 'json', 'JSONEachRow', '\\n');
''')
def kafka_check_json_numbers(instance):
retries = 0
while True:
if kafka_is_available(started_cluster):
@ -58,10 +72,38 @@ CREATE TABLE test.kafka (key UInt64, value UInt64)
kafka_produce(started_cluster, 'json', messages)
time.sleep(3)
result = instance.query('SELECT * FROM test.kafka;')
with open(p.join(p.dirname(__file__), 'test_kafka_json.reference')) as reference:
file = p.join(p.dirname(__file__), 'test_kafka_json.reference')
with open(file) as reference:
assert TSV(result) == TSV(reference)
def test_kafka_json(started_cluster):
instance.query('''
DROP TABLE IF EXISTS test.kafka;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka('kafka1:9092', 'json', 'json',
'JSONEachRow', '\\n');
''')
kafka_check_json_numbers(instance)
instance.query('DROP TABLE test.kafka')
def test_kafka_json_settings(started_cluster):
instance.query('''
DROP TABLE IF EXISTS test.kafka;
CREATE TABLE test.kafka (key UInt64, value UInt64)
ENGINE = Kafka
SETTINGS
kafka_broker_list = 'kafka1:9092',
kafka_topic_list = 'json'
kafka_group_name = 'json'
kafka_format = 'JSONEachRow'
kafka_row_delimiter = '\\n';
''')
kafka_check_json_numbers(instance)
instance.query('DROP TABLE test.kafka')
if __name__ == '__main__':
cluster.start()
raw_input("Cluster created, press any key to destroy...")

View File

@ -1,2 +1,4 @@
1000
2000
1000 Alice
2000 Alice

View File

@ -23,3 +23,9 @@ UNION ALL
SELECT value AS val FROM data2014 WHERE name = 'Alice')
ORDER BY val ASC;
SELECT val, name FROM
(SELECT value AS val, value AS val_1, name FROM data2013 WHERE name = 'Alice'
UNION ALL
SELECT value AS val, value, name FROM data2014 WHERE name = 'Alice')
ORDER BY val ASC;

View File

@ -0,0 +1,20 @@
#!/usr/bin/env bash
function wait_for_mutation()
{
local table=$1
local mutation_id=$2
for i in {1..100}
do
sleep 0.1
if [[ $(${CLICKHOUSE_CLIENT} --query="SELECT is_done FROM system.mutations WHERE table='$table' AND mutation_id='$mutation_id'") -eq 1 ]]; then
break
fi
if [[ $i -eq 100 ]]; then
echo "Timed out while waiting for mutation to execute!"
fi
done
}

View File

@ -8,3 +8,6 @@ Query involving aliases should fail on submission
mutation_1.txt DELETE WHERE x = 1 [''] [1] 0 1
mutation_5.txt DELETE WHERE (x % 2) = 1 [''] [5] 0 1
mutation_6.txt DELETE WHERE s = \'d\' [''] [6] 0 1
*** Test mutations cleaner ***
mutation_3.txt DELETE WHERE x = 2 1
mutation_4.txt DELETE WHERE x = 3 1

View File

@ -3,6 +3,8 @@
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
. $CURDIR/00652_mergetree_mutations.lib
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS test.mutations"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE test.mutations(d Date, x UInt32, s String, a UInt32 ALIAS x + 1) ENGINE MergeTree(d, intDiv(x, 10), 8192)"
@ -31,18 +33,8 @@ ${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations DELETE WHERE s = 'd'"
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations(d, x, s) VALUES \
('2000-01-01', 5, 'e'), ('2000-02-01', 5, 'e')"
# Wait until all mutations are done.
for i in {1..100}
do
sleep 0.1
if [[ $(${CLICKHOUSE_CLIENT} --query="SELECT sum(is_done) FROM system.mutations WHERE table='mutations'") -eq 3 ]]; then
break
fi
if [[ $i -eq 100 ]]; then
echo "Timed out while waiting for mutations to execute!"
fi
done
# Wait until the last mutation is done.
wait_for_mutation "mutations" "mutation_6.txt"
# Check that the table contains only the data that should not be deleted.
${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.mutations ORDER BY d, x"
@ -50,4 +42,31 @@ ${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.mutations ORDER BY d, x"
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, command, block_numbers.partition_id, block_numbers.number, parts_to_do, is_done \
FROM system.mutations WHERE table = 'mutations' ORDER BY mutation_id"
${CLICKHOUSE_CLIENT} --query="SELECT '*** Test mutations cleaner ***'"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS test.mutations_cleaner"
# Create a table with finished_mutations_to_keep = 2
${CLICKHOUSE_CLIENT} --query="CREATE TABLE test.mutations_cleaner(x UInt32) ENGINE MergeTree ORDER BY x SETTINGS finished_mutations_to_keep = 2"
# Insert some data
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_cleaner(x) VALUES (1), (2), (3), (4)"
# Add some mutations and wait for their execution
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_cleaner DELETE WHERE x = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_cleaner DELETE WHERE x = 2"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_cleaner DELETE WHERE x = 3"
wait_for_mutation "mutations_cleaner" "mutation_4.txt"
# Sleep and then do an INSERT to wakeup the background task that will clean up the old mutations
sleep 1
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_cleaner(x) VALUES (4)"
sleep 0.1
# Check that the first mutation is cleaned
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, command, is_done FROM system.mutations WHERE table = 'mutations_cleaner' ORDER BY mutation_id"
${CLICKHOUSE_CLIENT} --query="DROP TABLE test.mutations"
${CLICKHOUSE_CLIENT} --query="DROP TABLE test.mutations_cleaner"

View File

@ -7,3 +7,7 @@ Query should fail 2
0000000000 DELETE WHERE x = 1 [] [] 0 1
0000000001 DELETE WHERE (x % 2) = 1 ['200001','200002'] [2,1] 0 1
0000000002 DELETE WHERE s = \'d\' ['200001','200002'] [3,2] 0 1
*** Test mutations cleaner ***
0000000001 DELETE WHERE x = 2 1
0000000002 DELETE WHERE x = 3 1
0000000003 DELETE WHERE x = 4 0

View File

@ -3,6 +3,8 @@
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
. $CURDIR/00652_mergetree_mutations.lib
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS test.mutations_r1"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS test.mutations_r2"
@ -31,18 +33,8 @@ ${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_r1 DELETE WHERE s = 'd'
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_r1(d, x, s) VALUES \
('2000-01-01', 5, 'e'), ('2000-02-01', 5, 'e')"
# Wait until all mutations are done.
for i in {1..100}
do
sleep 0.1
if [[ $(${CLICKHOUSE_CLIENT} --query="SELECT sum(is_done) FROM system.mutations WHERE table='mutations_r2'") -eq 3 ]]; then
break
fi
if [[ $i -eq 100 ]]; then
echo "Timed out while waiting for mutations to execute!"
fi
done
# Wait until the last mutation is done.
wait_for_mutation "mutations_r2" "0000000002"
# Check that the table contains only the data that should not be deleted.
${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.mutations_r2 ORDER BY d, x"
@ -50,5 +42,44 @@ ${CLICKHOUSE_CLIENT} --query="SELECT * FROM test.mutations_r2 ORDER BY d, x"
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, command, block_numbers.partition_id, block_numbers.number, parts_to_do, is_done \
FROM system.mutations WHERE table = 'mutations_r2' ORDER BY mutation_id"
${CLICKHOUSE_CLIENT} --query="SELECT '*** Test mutations cleaner ***'"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS test.mutations_cleaner_r1"
${CLICKHOUSE_CLIENT} --query="DROP TABLE IF EXISTS test.mutations_cleaner_r2"
# Create 2 replicas with finished_mutations_to_keep = 2
${CLICKHOUSE_CLIENT} --query="CREATE TABLE test.mutations_cleaner_r1(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/mutations_cleaner', 'r1') ORDER BY x SETTINGS \
finished_mutations_to_keep = 2,
cleanup_delay_period = 1,
cleanup_delay_period_random_add = 0"
${CLICKHOUSE_CLIENT} --query="CREATE TABLE test.mutations_cleaner_r2(x UInt32) ENGINE ReplicatedMergeTree('/clickhouse/tables/test/mutations_cleaner', 'r2') ORDER BY x SETTINGS \
finished_mutations_to_keep = 2,
cleanup_delay_period = 1,
cleanup_delay_period_random_add = 0"
# Insert some data
${CLICKHOUSE_CLIENT} --query="INSERT INTO test.mutations_cleaner_r1(x) VALUES (1), (2), (3), (4)"
# Add some mutations and wait for their execution
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_cleaner_r1 DELETE WHERE x = 1"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_cleaner_r1 DELETE WHERE x = 2"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_cleaner_r1 DELETE WHERE x = 3"
wait_for_mutation "mutations_cleaner_r2" "0000000002"
# Add another mutation and prevent its execution on the second replica
${CLICKHOUSE_CLIENT} --query="SYSTEM STOP REPLICATION QUEUES test.mutations_cleaner_r2"
${CLICKHOUSE_CLIENT} --query="ALTER TABLE test.mutations_cleaner_r1 DELETE WHERE x = 4"
# Sleep for more than cleanup_delay_period
sleep 1.5
# Check that the first mutation is cleaned
${CLICKHOUSE_CLIENT} --query="SELECT mutation_id, command, is_done FROM system.mutations WHERE table = 'mutations_cleaner_r2' ORDER BY mutation_id"
${CLICKHOUSE_CLIENT} --query="DROP TABLE test.mutations_r1"
${CLICKHOUSE_CLIENT} --query="DROP TABLE test.mutations_r2"
${CLICKHOUSE_CLIENT} --query="DROP TABLE test.mutations_cleaner_r1"
${CLICKHOUSE_CLIENT} --query="DROP TABLE test.mutations_cleaner_r2"

View File

@ -0,0 +1,6 @@
1
0
0
0
1
1

View File

@ -0,0 +1,21 @@
USE test;
CREATE TABLE IF NOT EXISTS uuid
(
created_at DateTime,
id UUID
)
ENGINE = MergeTree
PARTITION BY toDate(created_at)
ORDER BY (created_at, id);
INSERT INTO uuid (created_at, id) VALUES ('2018-01-01 01:02:03', '00000000-0000-03f8-9cb8-cb1b82fb3900');
SELECT count() FROM uuid WHERE id = '00000000-0000-03f8-9cb8-cb1b82fb3900';
SELECT count() FROM uuid WHERE id != '00000000-0000-03f8-9cb8-cb1b82fb3900';
SELECT count() FROM uuid WHERE id < '00000000-0000-03f8-9cb8-cb1b82fb3900';
SELECT count() FROM uuid WHERE id > '00000000-0000-03f8-9cb8-cb1b82fb3900';
SELECT count() FROM uuid WHERE id <= '00000000-0000-03f8-9cb8-cb1b82fb3900';
SELECT count() FROM uuid WHERE id >= '00000000-0000-03f8-9cb8-cb1b82fb3900';
DROP TABLE uuid;

View File

@ -0,0 +1,8 @@
1 2
3 3
1 2
4 4
1 2
3 4
1 2
3 3

View File

@ -0,0 +1,4 @@
SELECT x, y FROM (SELECT x, y FROM (SELECT 1 AS x, 2 AS y) UNION ALL SELECT x, x FROM (SELECT 3 AS x, 4 AS y)) ORDER BY x, y;
SELECT x, y FROM (SELECT x, y FROM (SELECT 1 AS x, 2 AS y) UNION ALL SELECT y, y FROM (SELECT 3 AS x, 4 AS y)) ORDER BY x, y;
SELECT x, y FROM (SELECT x, x, y FROM (SELECT 1 AS x, 2 AS y) UNION ALL SELECT x, y, y FROM (SELECT 3 AS x, 4 AS y)) ORDER BY x, y;
SELECT x, y FROM (SELECT x, y, y FROM (SELECT 1 AS x, 2 AS y) UNION ALL SELECT x, x, y FROM (SELECT 3 AS x, 4 AS y)) ORDER BY x, y;

View File

@ -0,0 +1,14 @@
DROP TABLE IF EXISTS test.test;
CREATE TABLE test.test(x Int32) ENGINE = Log;
INSERT INTO test.test VALUES (123);
SELECT a1
FROM
(
SELECT x AS a1, x AS a2 FROM test.test
UNION ALL
SELECT x, x FROM test.test
);
DROP TABLE test.test;

4
debian/changelog vendored
View File

@ -1,5 +1,5 @@
clickhouse (18.8.0) unstable; urgency=low
clickhouse (18.9.0) unstable; urgency=low
* Modified source code
-- <root@yandex-team.ru> Thu, 02 Aug 2018 11:35:38 +0300
-- Alexey Milovidov <milovidov@yandex-team.ru> Fri, 03 Aug 2018 19:17:05 +0300

View File

@ -3,6 +3,7 @@ set -e
CLICKHOUSE_USER=${CLICKHOUSE_USER=clickhouse}
CLICKHOUSE_GROUP=${CLICKHOUSE_GROUP=${CLICKHOUSE_USER}}
CLICKHOUSE_CONFDIR=${CLICKHOUSE_CONFDIR=/etc/clickhouse-server}
CLICKHOUSE_DATADIR=${CLICKHOUSE_DATADIR=/var/lib/clickhouse}
CLICKHOUSE_LOGDIR=${CLICKHOUSE_LOGDIR=/var/log/clickhouse-server}
OS=${OS=`lsb_release -is 2>/dev/null || uname -s || true`}
@ -64,6 +65,10 @@ Please fix this and reinstall this package." >&2
exit 1
fi
if [ -d ${CLICKHOUSE_CONFDIR} ]; then
su -s /bin/sh ${CLICKHOUSE_USER} -c "test -w ${CLICKHOUSE_CONFDIR}" || chown ${CLICKHOUSE_USER}:${CLICKHOUSE_GROUP} ${CLICKHOUSE_CONFDIR}
fi
if [ ! -d ${CLICKHOUSE_DATADIR} ]; then
mkdir -p ${CLICKHOUSE_DATADIR}
chown ${CLICKHOUSE_USER}:${CLICKHOUSE_GROUP} ${CLICKHOUSE_DATADIR}

4
debian/rules vendored
View File

@ -70,8 +70,8 @@ override_dh_auto_configure:
override_dh_auto_build:
# Fix for ninja. Do not add -O.
#cd $(BUILDDIR) && $(MAKE) -j$(THREADS_COUNT)
cd $(BUILDDIR) && cmake --build . -- -j$(THREADS_COUNT)
cd $(BUILDDIR) && $(MAKE) -j$(THREADS_COUNT)
#cd $(BUILDDIR) && cmake --build . -- -j$(THREADS_COUNT) # cmake return true on error
override_dh_auto_test:
#TODO, use ENABLE_TESTS=1

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=\*
ARG version=18.9.0
RUN apt-get update && \
apt-get install -y apt-transport-https dirmngr && \

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=\*
ARG version=18.9.0
RUN apt-get update && \
apt-get install -y apt-transport-https dirmngr && \

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb http://repo.yandex.ru/clickhouse/deb/stable/ main/"
ARG version=\*
ARG version=18.9.0
RUN apt-get update && \
apt-get install -y apt-transport-https dirmngr && \

View File

@ -46,22 +46,19 @@ export CXX=g++-7
## Install required libraries from packages
```bash
sudo apt-get install libicu-dev libreadline-dev libssl-dev
sudo apt-get install libicu-dev libreadline-dev
```
## Checkout ClickHouse sources
To get the latest stable version:
```bash
git clone -b stable --recursive git@github.com:yandex/ClickHouse.git
# or: git clone -b stable --recursive https://github.com/yandex/ClickHouse.git
git clone --recursive git@github.com:yandex/ClickHouse.git
# or: git clone --recursive https://github.com/yandex/ClickHouse.git
cd ClickHouse
```
For development, switch to the `master` branch.
For the latest release candidate, switch to the `testing` branch.
For the latest stable version, switch to the `stable` branch.
## Build ClickHouse

View File

@ -17,17 +17,14 @@ brew install cmake ninja gcc icu4c mariadb-connector-c openssl libtool gettext r
## Checkout ClickHouse sources
To get the latest stable version:
```bash
git clone -b stable --recursive --depth=10 git@github.com:yandex/ClickHouse.git
# or: git clone -b stable --recursive --depth=10 https://github.com/yandex/ClickHouse.git
git clone --recursive --depth=10 git@github.com:yandex/ClickHouse.git
# or: git clone --recursive --depth=10 https://github.com/yandex/ClickHouse.git
cd ClickHouse
```
For development, switch to the `master` branch.
For the latest release candidate, switch to the `testing` branch.
For the latest stable version, switch to the `stable` branch.
## Build ClickHouse

View File

@ -8,20 +8,41 @@ Kafka lets you:
- Organize fault-tolerant storage.
- Process streams as they become available.
Old format:
```
Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers])
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])
```
Parameters:
New format:
- `broker_list` A comma-separated list of brokers (`localhost:9092`).
- `topic_list` A list of Kafka topics (`my_topic`).
- `group_name` A group of Kafka consumers (`group1`). Reading margins are tracked for each group separately. If you don't want messages to be duplicated in the cluster, use the same group name everywhere.
- `--format` Message format. Uses the same notation as the SQL ` FORMAT` function, such as ` JSONEachRow`. For more information, see the "Formats" section.
- `schema` An optional parameter that must be used if the format requires a schema definition. For example, [Cap'n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition.
```
Kafka SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic1,topic2',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n'
kafka_schema = '',
kafka_num_consumers = 2
```
Example:
Required parameters:
- `kafka_broker_list` A comma-separated list of brokers (`localhost:9092`).
- `kafka_topic_list` A list of Kafka topics (`my_topic`).
- `kafka_group_name` A group of Kafka consumers (`group1`). Reading margins are tracked for each group separately. If you don't want messages to be duplicated in the cluster, use the same group name everywhere.
- `kafka_format` Message format. Uses the same notation as the SQL ` FORMAT` function, such as ` JSONEachRow`. For more information, see the "Formats" section.
Optional parameters:
- `kafka_row_delimiter` - Character-delimiter of records (rows), which ends the message.
- `kafka_schema` An optional parameter that must be used if the format requires a schema definition. For example, [Cap'n Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `kafka_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient. The total number of consumers should not exceed the number of partitions in the topic, since only one consumer can be assigned per partition.
Examples:
```sql
CREATE TABLE queue (
@ -31,6 +52,24 @@ Example:
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
SETTINGS kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
```
The delivered messages are tracked automatically, so each message in a group is only counted once. If you want to get the data twice, then create a copy of the table with another group name.
@ -59,7 +98,7 @@ Example:
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;

View File

@ -246,6 +246,8 @@ Mutations are totally ordered by their creation order and are applied to each pa
A mutation query returns immediately after the mutation entry is added (in case of replicated tables to ZooKeeper, for nonreplicated tables - to the filesystem). The mutation itself executes asynchronously using the system profile settings. To track the progress of mutations you can use the `system.mutations` table. A mutation that was successfully submitted will continue to execute even if ClickHouse servers are restarted. There is no way to roll back the mutation once it is submitted.
Entries for finished mutations are not deleted right away (the number of preserved entries is determined by the `finished_mutations_to_keep` storage engine parameter). Older mutation entries are deleted.
#### system.mutations table
The table contains information about mutations of MergeTree tables and their progress. Each mutation command is represented by a single row. The table has the following columns:

View File

@ -1,6 +1,6 @@
# Kafka
Движок работает с [Apache Kafka](http://kafka.apache.org/).
Движок работает с [Apache Kafka](http://kafka.apache.org/).
Kafka позволяет:
@ -8,20 +8,40 @@ Kafka позволяет:
- Организовать отказо-устойчивое хранилище.
- Обрабатывать потоки по мере их появления.
Старый формат:
```
Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers])
Kafka(kafka_broker_list, kafka_topic_list, kafka_group_name, kafka_format
[, kafka_row_delimiter, kafka_schema, kafka_num_consumers])
```
Параметры:
Новый формат:
- `broker_list` - Перечень брокеров, разделенный запятыми (`localhost:9092`).
- `topic_list` - Перечень необходимых топиков Kafka (`my_topic`).
- `group_name` - Группа потребителя Kafka (`group1`). Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы.
- `format` - Формат сообщений. Имеет те же обозначения, что выдает SQL-выражение `FORMAT`, например, `JSONEachRow`. Подробнее смотрите в разделе "Форматы".
- `schema` - Опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Cap'n Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
- `num_consumers` - Количество потребителей (consumer) на таблицу. По умолчанию `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя.
```
Kafka SETTINGS
kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic1,topic2',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_row_delimiter = '\n'
kafka_schema = '',
kafka_num_consumers = 2
```
Пример:
Обязательные параметры:
- `kafka_broker_list` - Перечень брокеров, разделенный запятыми (`localhost:9092`).
- `kafka_topic_list` - Перечень необходимых топиков Kafka (`my_topic`).
- `kafka_group_name` - Группа потребителя Kafka (`group1`). Отступы для чтения отслеживаются для каждой группы отдельно. Если необходимо, чтобы сообщения не повторялись на кластере, используйте везде одно имя группы.
- `kafka_format` - Формат сообщений. Имеет те же обозначения, что выдает SQL-выражение `FORMAT`, например, `JSONEachRow`. Подробнее смотрите в разделе "Форматы".
Опциональные параметры:
- `kafka_row_delimiter` - Символ-разделитель записей (строк), которым завершается сообщение.
- `kafka_schema` - Опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Cap'n Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
- `kafka_num_consumers` - Количество потребителей (consumer) на таблицу. По умолчанию `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя.
Примеры:
```sql
CREATE TABLE queue (
@ -31,6 +51,24 @@ Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers])
) ENGINE = Kafka('localhost:9092', 'topic', 'group1', 'JSONEachRow');
SELECT * FROM queue LIMIT 5;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka SETTINGS kafka_broker_list = 'localhost:9092',
kafka_topic_list = 'topic',
kafka_group_name = 'group1',
kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
CREATE TABLE queue2 (
timestamp UInt64,
level String,
message String
) ENGINE = Kafka('localhost:9092', 'topic', 'group1')
SETTINGS kafka_format = 'JSONEachRow',
kafka_num_consumers = 4;
```
Полученные сообщения отслеживаются автоматически, поэтому из одной группы каждое сообщение считывается только один раз. Если необходимо получить данные дважды, то создайте копию таблицы с другим именем группы.
@ -59,7 +97,7 @@ Kafka(broker_list, topic_list, group_name, format[, schema, num_consumers])
level String,
total UInt64
) ENGINE = SummingMergeTree(day, (day, level), 8192);
CREATE MATERIALIZED VIEW consumer TO daily
AS SELECT toDate(toDateTime(timestamp)) AS day, level, count() as total
FROM queue GROUP BY day, level;

View File

@ -245,6 +245,8 @@ ALTER TABLE [db.]table DELETE WHERE expr
Запрос завершается немедленно после добавления информации о мутации (для реплицированных таблиц - в ZooKeeper, для нереплицированных - на файловую систему). Сама мутация выполняется асинхронно, используя настройки системного профиля. Следить за ходом её выполнения можно по таблице `system.mutations`. Добавленные мутации будут выполняться до конца даже в случае перезапуска серверов ClickHouse. Откатить мутацию после её добавления нельзя.
Записи о последних выполненных мутациях удаляются не сразу (количество сохраняемых мутаций определяется параметром движка таблиц `finished_mutations_to_keep`). Более старые записи удаляются.
#### Таблица system.mutations
Таблица содержит информацию о ходе выполнения мутаций MergeTree-таблиц. Каждой команде мутации соответствует одна строка. В таблице есть следующие столбцы:

View File

@ -1,4 +1,4 @@
if (ARCH_FREEBSD OR ARCH_32)
if (OS_FREEBSD OR ARCH_32)
option (USE_INTERNAL_GPERFTOOLS_LIBRARY "Set to FALSE to use system gperftools (tcmalloc) library instead of bundled" OFF)
else ()
option (USE_INTERNAL_GPERFTOOLS_LIBRARY "Set to FALSE to use system gperftools (tcmalloc) library instead of bundled" ${NOT_UNBUNDLED})
@ -13,7 +13,7 @@ if (ENABLE_TCMALLOC)
find_package (Gperftools)
endif ()
if (NOT (GPERFTOOLS_FOUND AND GPERFTOOLS_INCLUDE_DIR AND GPERFTOOLS_TCMALLOC_MINIMAL) AND NOT (ARCH_FREEBSD OR ARCH_32))
if (NOT (GPERFTOOLS_FOUND AND GPERFTOOLS_INCLUDE_DIR AND GPERFTOOLS_TCMALLOC_MINIMAL) AND NOT (OS_FREEBSD OR ARCH_32))
set (USE_INTERNAL_GPERFTOOLS_LIBRARY 1)
set (GPERFTOOLS_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libtcmalloc/include")
set (GPERFTOOLS_TCMALLOC_MINIMAL tcmalloc_minimal_internal)

View File

@ -1,13 +1,20 @@
option (ENABLE_JEMALLOC "Set to TRUE to use jemalloc" ON)
option (USE_INTERNAL_JEMALLOC_LIBRARY "Set to FALSE to use system jemalloc library instead of bundled" ${NOT_UNBUNDLED})
option (ENABLE_JEMALLOC "Set to TRUE to use jemalloc" ${OS_LINUX})
if (OS_LINUX)
option (USE_INTERNAL_JEMALLOC_LIBRARY "Set to FALSE to use system jemalloc library instead of bundled" ${NOT_UNBUNDLED})
elseif ()
option (USE_INTERNAL_JEMALLOC_LIBRARY "Set to FALSE to use system jemalloc library instead of bundled" OFF)
endif()
if (ENABLE_JEMALLOC)
if (USE_INTERNAL_JEMALLOC_LIBRARY)
set (JEMALLOC_LIBRARIES "jemalloc")
else ()
if (NOT USE_INTERNAL_JEMALLOC_LIBRARY)
find_package (JeMalloc)
endif ()
if (NOT JEMALLOC_LIBRARIES)
set (JEMALLOC_LIBRARIES "jemalloc")
set (USE_INTERNAL_JEMALLOC_LIBRARY 1)
endif ()
if (JEMALLOC_LIBRARIES)
set (USE_JEMALLOC 1)
else ()

View File

@ -132,6 +132,33 @@ const char * __shm_directory(size_t * len)
}
/// https://boringssl.googlesource.com/boringssl/+/ad1907fe73334d6c696c8539646c21b11178f20f%5E!/#F0
/* Copyright (c) 2015, Google Inc.
*
* Permission to use, copy, modify, and/or distribute this software for any
* purpose with or without fee is hereby granted, provided that the above
* copyright notice and this permission notice appear in all copies.
*
* THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
* WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
* MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
* SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
* WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION
* OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF OR IN
* CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
*/
void explicit_bzero(void * buf, size_t len)
{
memset(buf, 0, len);
__asm__ __volatile__("" :: "r"(buf) : "memory");
}
void __explicit_bzero_chk(void * buf, size_t len, size_t unused)
{
return explicit_bzero(buf, len);
}
#if defined (__cplusplus)
}
#endif

View File

@ -1,5 +1,9 @@
option (ENABLE_MYSQL "Enable MySQL" ON)
option (USE_INTERNAL_MYSQL_LIBRARY "Set to FALSE to use system mysqlclient library instead of bundled" ${NOT_UNBUNDLED})
option (ENABLE_MYSQL "Enable MySQL" ${OS_LINUX})
if (OS_LINUX)
option (USE_INTERNAL_MYSQL_LIBRARY "Set to FALSE to use system mysqlclient library instead of bundled" ${NOT_UNBUNDLED})
else ()
option (USE_INTERNAL_MYSQL_LIBRARY "Set to FALSE to use system mysqlclient library instead of bundled" OFF)
endif ()
if (USE_INTERNAL_MYSQL_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/mariadb-connector-c/README")
message (WARNING "submodule contrib/mariadb-connector-c is missing. to fix try run: \n git submodule update --init --recursive")

View File

@ -52,10 +52,6 @@ do
elif [[ $1 == '--version' ]]; then
gen_revision_author $2
exit 0
# who use this?
#elif [[ $1 == '--head' ]]; then
# VERSION_STRING=`git rev-parse HEAD`
# shift
elif [[ $1 == '--pbuilder' ]]; then
# Default
shift
@ -64,7 +60,7 @@ do
shift
elif [[ $1 == '--fast' ]]; then
# Wrong but fast pbuilder mode: create base package with all depends
EXTRAPACKAGES="$EXTRAPACKAGES debhelper cmake ninja-build gcc-7 g++-7 libc6-dev libmariadbclient-dev libicu-dev libltdl-dev libreadline-dev libssl-dev unixodbc-dev psmisc bash expect python python-lxml python-termcolor python-requests curl perl sudo openssl netcat-openbsd"
EXTRAPACKAGES="$EXTRAPACKAGES debhelper cmake ninja-build gcc-7 g++-7 libc6-dev libicu-dev libreadline-dev psmisc bash expect python python-lxml python-termcolor python-requests curl perl sudo openssl netcat-openbsd"
shift
else
echo "Unknown option $1"
@ -77,9 +73,6 @@ if [ -n "$SANITIZER" ]
then
CMAKE_BUILD_TYPE=$SANITIZER
VERSION_POSTFIX+=+${SANITIZER,,}
# todo: нужно ли отключить libtcmalloc?
LIBTCMALLOC_OPTS="-DENABLE_TCMALLOC=0"
# GLIBC_COMPATIBILITY отключен по умолчанию
export DEB_CC=${DEB_CC=clang-6.0}
export DEB_CXX=${DEB_CXX=clang++-6.0}
EXTRAPACKAGES="$EXTRAPACKAGES clang-6.0 lld-6.0"

Some files were not shown because too many files have changed in this diff Show More