Merge branch 'master' into changelog

This commit is contained in:
alexey-milovidov 2019-07-20 00:48:01 +03:00 committed by GitHub
commit 5351e8ac6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
671 changed files with 26404 additions and 2556 deletions

1
.gitignore vendored
View File

@ -205,6 +205,7 @@ configure-stamp
*.bin
*.mrk
*.mrk2
.dupload.conf

6
.gitmodules vendored
View File

@ -67,6 +67,12 @@
[submodule "contrib/libgsasl"]
path = contrib/libgsasl
url = https://github.com/ClickHouse-Extras/libgsasl.git
[submodule "contrib/libcxx"]
path = contrib/libcxx
url = https://github.com/llvm-mirror/libcxx.git
[submodule "contrib/libcxxabi"]
path = contrib/libcxxabi
url = https://github.com/llvm-mirror/libcxxabi.git
[submodule "contrib/snappy"]
path = contrib/snappy
url = https://github.com/google/snappy

View File

@ -1,3 +1,20 @@
## ClickHouse release 19.9.4.1, 2019-07-05
### Bug Fix
* Fix segfault in Delta codec which affects columns with values less than 32 bits size. The bug led to random memory corruption. [#5786](https://github.com/yandex/ClickHouse/pull/5786) ([alesapin](https://github.com/alesapin))
* Fix rare bug in checking of part with LowCardinality column. [#5832](https://github.com/yandex/ClickHouse/pull/5832) ([alesapin](https://github.com/alesapin))
* Fix segfault in TTL merge with non-physical columns in block. [#5819](https://github.com/yandex/ClickHouse/pull/5819) ([Anton Popov](https://github.com/CurtizJ))
* Fix potential infinite sleeping of low-priority queries. [#5842](https://github.com/yandex/ClickHouse/pull/5842) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fix how ClickHouse determines default time zone as UCT instead of UTC. [#5828](https://github.com/yandex/ClickHouse/pull/5828) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Fix bug about executing distributed DROP/ALTER/TRUNCATE/OPTIMIZE ON CLUSTER queries on follower replica before leader replica. Now they will be executed directly on leader replica. [#5757](https://github.com/yandex/ClickHouse/pull/5757) ([alesapin](https://github.com/alesapin))
* Fix race condition, which cause that some queries may not appear in query_log instantly after SYSTEM FLUSH LOGS query. [#5685](https://github.com/yandex/ClickHouse/pull/5685) ([Anton Popov](https://github.com/CurtizJ))
* Added missing support for constant arguments to `evalMLModel` function. [#5820](https://github.com/yandex/ClickHouse/pull/5820) ([alexey-milovidov](https://github.com/alexey-milovidov))
## ClickHouse release 19.7.6.1, 2019-07-05
### Bug Fix
* Fix performance regression in some queries with JOIN. [#5192](https://github.com/yandex/ClickHouse/pull/5192) ([Winter Zhang](https://github.com/zhang2014))
## ClickHouse release 19.9.2.4, 2019-06-24
### New Feature

View File

@ -1,3 +1,15 @@
## ClickHouse release 19.9.4.1, 2019-07-05
### Исправления ошибок
* Исправлен segmentation fault в кодеке сжатия Delta в колонках с величинами размером меньше 32 бит. Ошибка могла приводить к повреждениям памяти. [#5786](https://github.com/yandex/ClickHouse/pull/5786) ([alesapin](https://github.com/alesapin))
* Исправлена ошибка в проверке кусков в LowCardinality колонках. [#5832](https://github.com/yandex/ClickHouse/pull/5832) ([alesapin](https://github.com/alesapin))
* Исправлен segmentation fault при слиянии кусков с истекшим TTL в случае, когда в блоке присутствуют столбцы, не входящие в структуру таблицы. [#5819](https://github.com/yandex/ClickHouse/pull/5819) ([Anton Popov](https://github.com/CurtizJ))
* Исправлена существовавшая возможность ухода в бесконечное ожидание на низко-приоритетных запросах. [#5842](https://github.com/yandex/ClickHouse/pull/5842) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Исправлена ошибка определения таймзоны по умолчанию (UCT вместо UTC). [#5828](https://github.com/yandex/ClickHouse/pull/5828) ([alexey-milovidov](https://github.com/alexey-milovidov))
* Исправлена ошибка в распределенных запросах вида DROP/ALTER/TRUNCATE/OPTIMIZE ON CLUSTER. [#5757](https://github.com/yandex/ClickHouse/pull/5757) ([alesapin](https://github.com/alesapin))
* Исправлена ошибка, которая при распределенных запросах могла привести к тому, что некоторые запросы не появлялись в query_log сразу после SYSTEM FLUSH LOGS запроса. [#5685](https://github.com/yandex/ClickHouse/pull/5685) ([Anton Popov](https://github.com/CurtizJ))
* Добавлена отсутствовавшая поддержка константных аргументов для функции `evalMLModel`. [#5820](https://github.com/yandex/ClickHouse/pull/5820) ([alexey-milovidov](https://github.com/alexey-milovidov))
## ClickHouse release 19.9.2.4, 2019-06-24
### Новые возможности
@ -47,6 +59,11 @@
* Исправлена сборка clickhouse как сабмодуля [#5574](https://github.com/yandex/ClickHouse/pull/5574) ([proller](https://github.com/proller))
* Улучшение теста производительности функций JSONExtract [#5444](https://github.com/yandex/ClickHouse/pull/5444) ([Vitaly Baranov](https://github.com/vitlibar))
## ClickHouse release 19.7.6.1, 2019-07-05
### Исправления ошибок
* Исправлена просадка производительности в методе JOIN в некоторых видах запросов. [#5192](https://github.com/yandex/ClickHouse/pull/5192) ([Winter Zhang](https://github.com/zhang2014))
## ClickHouse release 19.8.3.8, 2019-06-11
### Новые возможности

View File

@ -104,7 +104,6 @@ if (COMPILER_GCC AND CMAKE_CXX_COMPILER_VERSION VERSION_GREATER "8.3.0")
endif ()
if (COMPILER_CLANG)
# clang: warning: argument unused during compilation: '-stdlib=libc++'
# clang: warning: argument unused during compilation: '-specs=/usr/share/dpkg/no-pie-compile.specs' [-Wunused-command-line-argument]
set (COMMON_WARNING_FLAGS "${COMMON_WARNING_FLAGS} -Wno-unused-command-line-argument")
endif ()
@ -183,12 +182,18 @@ else ()
set (CXX_FLAGS_INTERNAL_COMPILER "-std=c++1z")
endif ()
if (COMPILER_GCC OR COMPILER_CLANG)
# Enable C++14 sized global deallocation functions. It should be enabled by setting -std=c++14 but I'm not sure.
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsized-deallocation")
endif ()
option(WITH_COVERAGE "Build with coverage." 0)
if(WITH_COVERAGE AND COMPILER_CLANG)
set(COMPILER_FLAGS "${COMPILER_FLAGS} -fprofile-instr-generate -fcoverage-mapping")
endif()
if(WITH_COVERAGE AND COMPILER_GCC)
set(COMPILER_FLAGS "${COMPILER_FLAGS} -fprofile-arcs -ftest-coverage")
set(COVERAGE_OPTION "-lgcov")
endif()
set (CMAKE_BUILD_COLOR_MAKEFILE ON)
@ -202,16 +207,72 @@ set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${COMPILER_FLAGS} -fn
set (CMAKE_C_FLAGS_RELWITHDEBINFO "${CMAKE_C_FLAGS_RELWITHDEBINFO} -O3 ${CMAKE_C_FLAGS_ADD}")
set (CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0 -g3 -ggdb3 -fno-inline ${CMAKE_C_FLAGS_ADD}")
include (cmake/use_libcxx.cmake)
# Uses MAKE_STATIC_LIBRARIES
option (UNBUNDLED "Try find all libraries in system. We recommend to avoid this mode for production builds, because we cannot guarantee exact versions and variants of libraries your system has installed. This mode exists for enthusiastic developers who search for trouble. Also it is useful for maintainers of OS packages." OFF)
if (UNBUNDLED)
set(NOT_UNBUNDLED 0)
else ()
set(NOT_UNBUNDLED 1)
endif ()
# Using system libs can cause lot of warnings in includes.
if (UNBUNDLED OR NOT (OS_LINUX OR APPLE) OR ARCH_32)
option (NO_WERROR "Disable -Werror compiler option" ON)
endif ()
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package (Threads)
include (cmake/find_cxx.cmake)
include (cmake/test_compiler.cmake)
if (OS_LINUX AND COMPILER_CLANG AND USE_STATIC_LIBRARIES)
option (USE_LIBCXX "Use libc++ and libc++abi instead of libstdc++ (only make sense on Linux)" ${HAVE_LIBCXX})
if (USE_LIBCXX)
set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -D_LIBCPP_DEBUG=0") # More checks in debug build.
endif ()
endif ()
if (USE_LIBCXX)
set (STATIC_STDLIB_FLAGS "")
else ()
set (STATIC_STDLIB_FLAGS "-static-libgcc -static-libstdc++")
endif ()
if (MAKE_STATIC_LIBRARIES AND NOT APPLE AND NOT (COMPILER_CLANG AND OS_FREEBSD))
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} ${STATIC_STDLIB_FLAGS}")
# Along with executables, we also build example of shared library for "library dictionary source"; and it also should be self-contained.
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} ${STATIC_STDLIB_FLAGS}")
endif ()
if (USE_STATIC_LIBRARIES AND HAVE_NO_PIE)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${FLAG_NO_PIE}")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${FLAG_NO_PIE}")
endif ()
if (NOT SANITIZE)
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--no-undefined")
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -Wl,--no-undefined")
endif()
include (cmake/find_unwind.cmake)
if (USE_INTERNAL_UNWIND_LIBRARY)
option (USE_INTERNAL_UNWIND_LIBRARY_FOR_EXCEPTION_HANDLING "Use internal unwind library for exception handling" ${USE_STATIC_LIBRARIES})
endif ()
# Set standard, system and compiler libraries explicitly.
# This is intended for more control of what we are linking.
string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE_UC)
set (CMAKE_POSTFIX_VARIABLE "CMAKE_${CMAKE_BUILD_TYPE_UC}_POSTFIX")
set (DEFAULT_LIBS "")
if (OS_LINUX AND NOT UNBUNDLED AND (GLIBC_COMPATIBILITY OR USE_INTERNAL_UNWIND_LIBRARY_FOR_EXCEPTION_HANDLING OR USE_LIBCXX))
# Note: this probably has no effect, but I'm not an expert in CMake.
@ -225,6 +286,8 @@ if (OS_LINUX AND NOT UNBUNDLED AND (GLIBC_COMPATIBILITY OR USE_INTERNAL_UNWIND_L
set (BUILTINS_LIB_PATH "")
if (COMPILER_CLANG)
execute_process (COMMAND ${CMAKE_CXX_COMPILER} --print-file-name=libclang_rt.builtins-${CMAKE_SYSTEM_PROCESSOR}.a OUTPUT_VARIABLE BUILTINS_LIB_PATH OUTPUT_STRIP_TRAILING_WHITESPACE)
else ()
set (BUILTINS_LIB_PATH "-lgcc")
endif ()
string (TOUPPER ${CMAKE_BUILD_TYPE} CMAKE_BUILD_TYPE_UC)
@ -242,7 +305,7 @@ if (OS_LINUX AND NOT UNBUNDLED AND (GLIBC_COMPATIBILITY OR USE_INTERNAL_UNWIND_L
if (USE_INTERNAL_UNWIND_LIBRARY_FOR_EXCEPTION_HANDLING)
# TODO: Allow to use non-static library as well.
set (EXCEPTION_HANDLING_LIBRARY "lib/libunwind${${CMAKE_POSTFIX_VARIABLE}}.a")
set (EXCEPTION_HANDLING_LIBRARY "${ClickHouse_BINARY_DIR}/contrib/libunwind-cmake/libunwind_static${${CMAKE_POSTFIX_VARIABLE}}.a")
else ()
set (EXCEPTION_HANDLING_LIBRARY "-lgcc_eh")
endif ()
@ -250,9 +313,15 @@ if (OS_LINUX AND NOT UNBUNDLED AND (GLIBC_COMPATIBILITY OR USE_INTERNAL_UNWIND_L
message (STATUS "Using exception handling library: ${EXCEPTION_HANDLING_LIBRARY}")
if (USE_LIBCXX)
set (DEFAULT_LIBS "${DEFAULT_LIBS} -Wl,-Bstatic -lc++ -lc++abi ${EXCEPTION_HANDLING_LIBRARY} ${BUILTINS_LIB_PATH} -Wl,-Bdynamic")
if (USE_INTERNAL_LIBCXX_LIBRARY)
set (LIBCXX_LIBS "${ClickHouse_BINARY_DIR}/contrib/libcxx-cmake/libcxx_static${${CMAKE_POSTFIX_VARIABLE}}.a ${ClickHouse_BINARY_DIR}/contrib/libcxxabi-cmake/libcxxabi_static${${CMAKE_POSTFIX_VARIABLE}}.a")
else ()
set (LIBCXX_LIBS "-lc++ -lc++abi")
endif ()
set (DEFAULT_LIBS "${DEFAULT_LIBS} -Wl,-Bstatic ${LIBCXX_LIBS} ${EXCEPTION_HANDLING_LIBRARY} ${BUILTINS_LIB_PATH} -Wl,-Bdynamic")
else ()
set (DEFAULT_LIBS "${DEFAULT_LIBS} -Wl,-Bstatic -lstdc++ ${EXCEPTION_HANDLING_LIBRARY} -lgcc ${BUILTINS_LIB_PATH} -Wl,-Bdynamic")
set (DEFAULT_LIBS "${DEFAULT_LIBS} -Wl,-Bstatic -lstdc++ ${EXCEPTION_HANDLING_LIBRARY} ${COVERAGE_OPTION} ${BUILTINS_LIB_PATH} -Wl,-Bdynamic")
endif ()
# Linking with GLIBC prevents portability of binaries to older systems.
@ -279,6 +348,7 @@ endif ()
if (DEFAULT_LIBS)
# Add default libs to all targets as the last dependency.
set(CMAKE_CXX_STANDARD_LIBRARIES ${DEFAULT_LIBS})
set(CMAKE_C_STANDARD_LIBRARIES ${DEFAULT_LIBS})
endif ()
if (NOT MAKE_STATIC_LIBRARIES)
@ -338,11 +408,21 @@ if (UNBUNDLED OR NOT (OS_LINUX OR APPLE) OR ARCH_32)
option (NO_WERROR "Disable -Werror compiler option" ON)
endif ()
if (USE_LIBCXX)
set (HAVE_LIBCXX 1)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
endif()
if (USE_LIBCXX AND USE_INTERNAL_LIBCXX_LIBRARY)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -nostdinc++ -isystem ${LIBCXX_INCLUDE_DIR} -isystem ${LIBCXXABI_INCLUDE_DIR}")
endif ()
message (STATUS "Building for: ${CMAKE_SYSTEM} ${CMAKE_SYSTEM_PROCESSOR} ${CMAKE_LIBRARY_ARCHITECTURE} ; USE_STATIC_LIBRARIES=${USE_STATIC_LIBRARIES} MAKE_STATIC_LIBRARIES=${MAKE_STATIC_LIBRARIES} SPLIT_SHARED=${SPLIT_SHARED_LIBRARIES} UNBUNDLED=${UNBUNDLED} CCACHE=${CCACHE_FOUND} ${CCACHE_VERSION}")
include(GNUInstallDirs)
include (cmake/find_contrib_lib.cmake)
find_contrib_lib(double-conversion) # Must be before parquet
include (cmake/find_ssl.cmake)
include (cmake/lib_name.cmake)
include (cmake/find_icu.cmake)
@ -377,16 +457,16 @@ include (cmake/find_pdqsort.cmake)
include (cmake/find_hdfs3.cmake) # uses protobuf
include (cmake/find_consistent-hashing.cmake)
include (cmake/find_base64.cmake)
include (cmake/find_parquet.cmake)
include (cmake/find_hyperscan.cmake)
include (cmake/find_mimalloc.cmake)
include (cmake/find_simdjson.cmake)
include (cmake/find_rapidjson.cmake)
find_contrib_lib(cityhash)
find_contrib_lib(farmhash)
find_contrib_lib(metrohash)
find_contrib_lib(btrie)
find_contrib_lib(double-conversion)
include (cmake/find_parquet.cmake)
if (ENABLE_TESTS)
include (cmake/find_gtest.cmake)
@ -416,8 +496,13 @@ if (GLIBC_COMPATIBILITY OR USE_INTERNAL_UNWIND_LIBRARY_FOR_EXCEPTION_HANDLING)
if (GLIBC_COMPATIBILITY)
add_dependencies(${target_name} glibc-compatibility)
endif ()
if (USE_LIBCXX AND USE_INTERNAL_LIBCXX_LIBRARY)
add_dependencies(${target_name} cxx_static cxxabi_static)
endif ()
if (USE_INTERNAL_UNWIND_LIBRARY_FOR_EXCEPTION_HANDLING)
add_dependencies(${target_name} ${UNWIND_LIBRARY})
add_dependencies(${target_name} unwind_static)
endif ()
endif ()
endfunction ()
@ -458,4 +543,15 @@ if (GLIBC_COMPATIBILITY OR USE_INTERNAL_UNWIND_LIBRARY_FOR_EXCEPTION_HANDLING)
add_default_dependencies(boost_program_options_internal)
add_default_dependencies(boost_system_internal)
add_default_dependencies(boost_regex_internal)
add_default_dependencies(roaring)
add_default_dependencies(btrie)
add_default_dependencies(cpuid)
add_default_dependencies(mysqlclient)
add_default_dependencies(zlib)
add_default_dependencies(thrift)
add_default_dependencies(brotli)
add_default_dependencies(libprotobuf)
add_default_dependencies(base64)
add_default_dependencies(readpassphrase)
add_default_dependencies(unwind_static)
endif ()

View File

@ -13,6 +13,6 @@ ClickHouse is an open-source column-oriented database management system that all
* You can also [fill this form](https://forms.yandex.com/surveys/meet-yandex-clickhouse-team/) to meet Yandex ClickHouse team in person.
## Upcoming Events
* [ClickHouse Meetup in Minsk](https://yandex.ru/promo/metrica/clickhouse-minsk) on July 11.
* [ClickHouse Meetup in Saint Petersburg](https://yandex.ru/promo/clickhouse/saint-petersburg-2019) on July 27.
* [ClickHouse Meetup in Shenzhen](https://www.huodongxing.com/event/3483759917300) on October 20.
* [ClickHouse Meetup in Shanghai](https://www.huodongxing.com/event/4483760336000) on October 27.

26
cmake/find_cxx.cmake Normal file
View File

@ -0,0 +1,26 @@
if (NOT APPLE)
option (USE_INTERNAL_LIBCXX_LIBRARY "Set to FALSE to use system libcxx and libcxxabi libraries instead of bundled" ${NOT_UNBUNDLED})
endif ()
if (USE_INTERNAL_LIBCXX_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libcxx/include/vector")
message (WARNING "submodule contrib/libcxx is missing. to fix try run: \n git submodule update --init --recursive")
set (USE_INTERNAL_LIBCXX_LIBRARY 0)
endif ()
if (USE_INTERNAL_LIBCXX_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libcxxabi/src")
message (WARNING "submodule contrib/libcxxabi is missing. to fix try run: \n git submodule update --init --recursive")
set (USE_INTERNAL_LIBCXXABI_LIBRARY 0)
endif ()
if (NOT USE_INTERNAL_LIBCXX_LIBRARY)
find_library (LIBCXX_LIBRARY c++)
find_library (LIBCXXABI_LIBRARY c++abi)
else ()
set (LIBCXX_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcxx/include)
set (LIBCXXABI_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcxxabi/include)
set (LIBCXX_LIBRARY cxx_static)
set (LIBCXXABI_LIBRARY cxxabi_static)
endif ()
message (STATUS "Using libcxx: ${LIBCXX_LIBRARY}")
message (STATUS "Using libcxxabi: ${LIBCXXABI_LIBRARY}")

View File

@ -1,5 +1,5 @@
if (OS_LINUX AND NOT SANITIZE AND NOT ARCH_ARM AND NOT ARCH_32 AND NOT ARCH_PPC64LE)
option (ENABLE_MIMALLOC "Set to FALSE to disable usage of mimalloc for internal ClickHouse caches" ${NOT_UNBUNDLED})
option (ENABLE_MIMALLOC "Set to FALSE to disable usage of mimalloc for internal ClickHouse caches" FALSE)
endif ()
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/mimalloc/include/mimalloc.h")
@ -8,6 +8,8 @@ if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/mimalloc/include/mimalloc.h")
endif ()
if (ENABLE_MIMALLOC)
message (FATAL_ERROR "Mimalloc is not production ready. (Disable with cmake -D ENABLE_MIMALLOC=0). If you want to use mimalloc, you must manually remove this message.")
set (MIMALLOC_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/mimalloc/include)
set (USE_MIMALLOC 1)
set (MIMALLOC_LIBRARY mimalloc-static)

View File

@ -13,7 +13,10 @@ if (NOT DEFINED ENABLE_POCO_NETSSL OR ENABLE_POCO_NETSSL)
list (APPEND POCO_COMPONENTS Crypto NetSSL)
endif ()
if (NOT DEFINED ENABLE_POCO_MONGODB OR ENABLE_POCO_MONGODB)
set(ENABLE_POCO_MONGODB 1 CACHE BOOL "")
list (APPEND POCO_COMPONENTS MongoDB)
else ()
set(ENABLE_POCO_MONGODB 0 CACHE BOOL "")
endif ()
# TODO: after new poco release with SQL library rename ENABLE_POCO_ODBC -> ENABLE_POCO_SQLODBC
if (NOT DEFINED ENABLE_POCO_ODBC OR ENABLE_POCO_ODBC)
@ -37,6 +40,7 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
set (ENABLE_DATA_MYSQL 0 CACHE BOOL "")
set (ENABLE_DATA_POSTGRESQL 0 CACHE BOOL "")
set (ENABLE_ENCODINGS 0 CACHE BOOL "")
set (ENABLE_MONGODB ${ENABLE_POCO_MONGODB} CACHE BOOL "" FORCE)
# new after 2.0.0:
set (POCO_ENABLE_ZIP 0 CACHE BOOL "")
@ -60,7 +64,7 @@ elseif (NOT MISSING_INTERNAL_POCO_LIBRARY)
"${ClickHouse_SOURCE_DIR}/contrib/poco/Util/include/"
)
if (NOT DEFINED ENABLE_POCO_MONGODB OR ENABLE_POCO_MONGODB)
if (ENABLE_POCO_MONGODB)
set (Poco_MongoDB_LIBRARY PocoMongoDB)
set (Poco_MongoDB_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/poco/MongoDB/include/")
endif ()

View File

@ -6,28 +6,39 @@ if (SANITIZE)
if (SANITIZE STREQUAL "address")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} -fsanitize=address -fsanitize-address-use-after-scope")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SAN_FLAGS} -fsanitize=address -fsanitize-address-use-after-scope")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address -fsanitize-address-use-after-scope")
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=address -fsanitize-address-use-after-scope")
endif()
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libasan")
endif ()
elseif (SANITIZE STREQUAL "memory")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} -fsanitize=memory")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SAN_FLAGS} -fsanitize=memory")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=memory")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} -fsanitize=memory -fsanitize-memory-track-origins")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SAN_FLAGS} -fsanitize=memory -fsanitize-memory-track-origins")
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=memory")
endif()
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libmsan")
endif ()
elseif (SANITIZE STREQUAL "thread")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} -fsanitize=thread")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SAN_FLAGS} -fsanitize=thread")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread")
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=thread")
endif()
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libtsan")
endif ()
elseif (SANITIZE STREQUAL "undefined")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} -fsanitize=undefined -fno-sanitize-recover=all")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SAN_FLAGS} -fsanitize=undefined -fno-sanitize-recover=all")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=undefined")
if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -fsanitize=undefined")
endif()
if (MAKE_STATIC_LIBRARIES AND CMAKE_CXX_COMPILER_ID STREQUAL "GNU")
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -static-libubsan")
endif ()

View File

@ -1,24 +0,0 @@
# Uses MAKE_STATIC_LIBRARIES
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package (Threads)
include (cmake/test_compiler.cmake)
include (cmake/arch.cmake)
if (OS_LINUX AND COMPILER_CLANG)
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS}")
option (USE_LIBCXX "Use libc++ and libc++abi instead of libstdc++ (only make sense on Linux with Clang)" ${HAVE_LIBCXX})
if (USE_LIBCXX)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++") # Ok for clang6, for older can cause 'not used option' warning
set (CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -D_LIBCPP_DEBUG=0") # More checks in debug build.
endif ()
endif ()
if (USE_STATIC_LIBRARIES AND HAVE_NO_PIE)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${FLAG_NO_PIE}")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${FLAG_NO_PIE}")
endif ()

View File

@ -10,6 +10,17 @@ endif ()
set_property(DIRECTORY PROPERTY EXCLUDE_FROM_ALL 1)
if (USE_INTERNAL_UNWIND_LIBRARY)
add_subdirectory (libunwind-cmake)
endif ()
if (USE_LIBCXX AND USE_INTERNAL_LIBCXX_LIBRARY)
add_subdirectory(libcxx-cmake)
add_subdirectory(libcxxabi-cmake)
endif()
if (USE_INTERNAL_BOOST_LIBRARY)
add_subdirectory (boost-cmake)
endif ()
@ -29,8 +40,7 @@ if (USE_INTERNAL_RE2_LIBRARY)
endif ()
if (USE_INTERNAL_DOUBLE_CONVERSION_LIBRARY)
set (BUILD_TESTING 0 CACHE INTERNAL "")
add_subdirectory (double-conversion)
add_subdirectory (double-conversion-cmake)
endif ()
if (USE_INTERNAL_CITYHASH_LIBRARY)
@ -52,10 +62,6 @@ if (USE_INTERNAL_BTRIE_LIBRARY)
add_subdirectory (libbtrie)
endif ()
if (USE_INTERNAL_UNWIND_LIBRARY)
add_subdirectory (libunwind)
endif ()
if (USE_INTERNAL_ZLIB_LIBRARY)
set (ZLIB_ENABLE_TESTS 0 CACHE INTERNAL "")
set (SKIP_INSTALL_ALL 1 CACHE INTERNAL "")
@ -154,7 +160,7 @@ if (ENABLE_ODBC AND USE_INTERNAL_ODBC_LIBRARY)
add_library(ODBC::ODBC ALIAS ${ODBC_LIBRARIES})
endif ()
if (USE_INTERNAL_CAPNP_LIBRARY)
if (ENABLE_CAPNP AND USE_INTERNAL_CAPNP_LIBRARY)
set (BUILD_TESTING 0 CACHE INTERNAL "")
set (_save ${CMAKE_CXX_EXTENSIONS})
set (CMAKE_CXX_EXTENSIONS)
@ -251,7 +257,6 @@ if(USE_INTERNAL_GTEST_LIBRARY)
add_subdirectory(${ClickHouse_SOURCE_DIR}/contrib/googletest/googletest ${CMAKE_CURRENT_BINARY_DIR}/googletest)
# avoid problems with <regexp.h>
target_compile_definitions (gtest INTERFACE GTEST_HAS_POSIX_RE=0)
target_include_directories (gtest SYSTEM INTERFACE ${ClickHouse_SOURCE_DIR}/contrib/googletest/include)
elseif(GTEST_SRC_DIR)
add_subdirectory(${GTEST_SRC_DIR}/googletest ${CMAKE_CURRENT_BINARY_DIR}/googletest)
target_compile_definitions(gtest INTERFACE GTEST_HAS_POSIX_RE=0)
@ -283,7 +288,7 @@ if (USE_INTERNAL_LLVM_LIBRARY)
endif ()
if (USE_INTERNAL_LIBGSASL_LIBRARY)
add_subdirectory(libgsasl)
add_subdirectory(libgsasl)
endif()
if (USE_INTERNAL_LIBXML2_LIBRARY)

View File

@ -0,0 +1,14 @@
SET(LIBRARY_DIR ${ClickHouse_SOURCE_DIR}/contrib/double-conversion)
add_library(double-conversion
${LIBRARY_DIR}/double-conversion/bignum.cc
${LIBRARY_DIR}/double-conversion/bignum-dtoa.cc
${LIBRARY_DIR}/double-conversion/cached-powers.cc
${LIBRARY_DIR}/double-conversion/diy-fp.cc
${LIBRARY_DIR}/double-conversion/double-conversion.cc
${LIBRARY_DIR}/double-conversion/fast-dtoa.cc
${LIBRARY_DIR}/double-conversion/fixed-dtoa.cc
${LIBRARY_DIR}/double-conversion/strtod.cc)
target_include_directories(double-conversion SYSTEM PUBLIC "${LIBRARY_DIR}")

View File

@ -15,7 +15,6 @@ ${JEMALLOC_SOURCE_DIR}/src/extent_mmap.c
${JEMALLOC_SOURCE_DIR}/src/hash.c
${JEMALLOC_SOURCE_DIR}/src/hook.c
${JEMALLOC_SOURCE_DIR}/src/jemalloc.c
${JEMALLOC_SOURCE_DIR}/src/jemalloc_cpp.cpp
${JEMALLOC_SOURCE_DIR}/src/large.c
${JEMALLOC_SOURCE_DIR}/src/log.c
${JEMALLOC_SOURCE_DIR}/src/malloc_io.c

1
contrib/libcxx vendored Submodule

@ -0,0 +1 @@
Subproject commit 9807685d51db467e097ad5eb8d5c2c16922794b2

View File

@ -0,0 +1,51 @@
set(LIBCXX_SOURCE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcxx)
#set(LIBCXX_BINARY_DIR ${ClickHouse_BINARY_DIR}/contrib/libcxx)
set(SRCS
${LIBCXX_SOURCE_DIR}/src/optional.cpp
${LIBCXX_SOURCE_DIR}/src/variant.cpp
${LIBCXX_SOURCE_DIR}/src/chrono.cpp
${LIBCXX_SOURCE_DIR}/src/thread.cpp
${LIBCXX_SOURCE_DIR}/src/experimental/memory_resource.cpp
${LIBCXX_SOURCE_DIR}/src/iostream.cpp
${LIBCXX_SOURCE_DIR}/src/strstream.cpp
${LIBCXX_SOURCE_DIR}/src/ios.cpp
${LIBCXX_SOURCE_DIR}/src/future.cpp
${LIBCXX_SOURCE_DIR}/src/shared_mutex.cpp
${LIBCXX_SOURCE_DIR}/src/condition_variable.cpp
${LIBCXX_SOURCE_DIR}/src/hash.cpp
${LIBCXX_SOURCE_DIR}/src/string.cpp
${LIBCXX_SOURCE_DIR}/src/debug.cpp
#${LIBCXX_SOURCE_DIR}/src/support/win32/support.cpp
#${LIBCXX_SOURCE_DIR}/src/support/win32/locale_win32.cpp
#${LIBCXX_SOURCE_DIR}/src/support/win32/thread_win32.cpp
#${LIBCXX_SOURCE_DIR}/src/support/solaris/xlocale.cpp
${LIBCXX_SOURCE_DIR}/src/stdexcept.cpp
${LIBCXX_SOURCE_DIR}/src/utility.cpp
${LIBCXX_SOURCE_DIR}/src/any.cpp
${LIBCXX_SOURCE_DIR}/src/exception.cpp
${LIBCXX_SOURCE_DIR}/src/memory.cpp
${LIBCXX_SOURCE_DIR}/src/new.cpp
${LIBCXX_SOURCE_DIR}/src/valarray.cpp
${LIBCXX_SOURCE_DIR}/src/vector.cpp
${LIBCXX_SOURCE_DIR}/src/algorithm.cpp
${LIBCXX_SOURCE_DIR}/src/functional.cpp
${LIBCXX_SOURCE_DIR}/src/regex.cpp
${LIBCXX_SOURCE_DIR}/src/bind.cpp
${LIBCXX_SOURCE_DIR}/src/mutex.cpp
${LIBCXX_SOURCE_DIR}/src/charconv.cpp
${LIBCXX_SOURCE_DIR}/src/typeinfo.cpp
${LIBCXX_SOURCE_DIR}/src/locale.cpp
${LIBCXX_SOURCE_DIR}/src/filesystem/operations.cpp
${LIBCXX_SOURCE_DIR}/src/filesystem/int128_builtins.cpp
${LIBCXX_SOURCE_DIR}/src/filesystem/directory_iterator.cpp
${LIBCXX_SOURCE_DIR}/src/system_error.cpp
${LIBCXX_SOURCE_DIR}/src/random.cpp
)
add_library(cxx_static ${SRCS})
target_include_directories(cxx_static PUBLIC ${LIBCXX_SOURCE_DIR}/include)
target_compile_definitions(cxx_static PRIVATE -D_LIBCPP_BUILDING_LIBRARY -DLIBCXX_BUILDING_LIBCXXABI)
target_compile_options(cxx_static PRIVATE -nostdinc++)

1
contrib/libcxxabi vendored Submodule

@ -0,0 +1 @@
Subproject commit d56efcc7a52739518dbe7df9e743073e00951fa1

View File

@ -0,0 +1,34 @@
set(LIBCXXABI_SOURCE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcxxabi)
set(LIBCXX_SOURCE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcxx)
#set(LIBCXXABI_BINARY_DIR ${ClickHouse_BINARY_DIR}/contrib/libcxxabi)
set(SRCS
${LIBCXXABI_SOURCE_DIR}/src/stdlib_stdexcept.cpp
${LIBCXXABI_SOURCE_DIR}/src/cxa_virtual.cpp
${LIBCXXABI_SOURCE_DIR}/src/cxa_thread_atexit.cpp
${LIBCXXABI_SOURCE_DIR}/src/fallback_malloc.cpp
#${LIBCXXABI_SOURCE_DIR}/src/cxa_noexception.cpp
${LIBCXXABI_SOURCE_DIR}/src/cxa_guard.cpp
${LIBCXXABI_SOURCE_DIR}/src/cxa_default_handlers.cpp
${LIBCXXABI_SOURCE_DIR}/src/cxa_personality.cpp
${LIBCXXABI_SOURCE_DIR}/src/stdlib_exception.cpp
${LIBCXXABI_SOURCE_DIR}/src/abort_message.cpp
${LIBCXXABI_SOURCE_DIR}/src/cxa_demangle.cpp
${LIBCXXABI_SOURCE_DIR}/src/cxa_unexpected.cpp
${LIBCXXABI_SOURCE_DIR}/src/cxa_exception.cpp
${LIBCXXABI_SOURCE_DIR}/src/cxa_handlers.cpp
${LIBCXXABI_SOURCE_DIR}/src/cxa_exception_storage.cpp
${LIBCXXABI_SOURCE_DIR}/src/private_typeinfo.cpp
${LIBCXXABI_SOURCE_DIR}/src/stdlib_typeinfo.cpp
${LIBCXXABI_SOURCE_DIR}/src/cxa_aux_runtime.cpp
${LIBCXXABI_SOURCE_DIR}/src/cxa_vector.cpp
${LIBCXXABI_SOURCE_DIR}/src/stdlib_new_delete.cpp
)
add_library(cxxabi_static ${SRCS})
target_include_directories(cxxabi_static PUBLIC ${LIBCXXABI_SOURCE_DIR}/include ${LIBCXX_SOURCE_DIR}/include)
target_compile_definitions(cxxabi_static PRIVATE -D_LIBCPP_BUILDING_LIBRARY)
target_compile_options(cxxabi_static PRIVATE -nostdinc++ -fno-sanitize=undefined) # If we don't disable UBSan, infinite recursion happens in dynamic_cast.

View File

@ -77,8 +77,6 @@
#define HAVE_PTHREAD_SETNAME_GNU 1
// python
//#define HAVE_PYTHON 1
// C11 threads
#if (__STDC_VERSION__ >= 201112L) && !defined(__STDC_NO_THREADS__)
# define WITH_C11THREADS 1
#endif
// disable C11 threads for compatibility with old libc
#define WITH_C11THREADS 0
#endif /* _CONFIG_H_ */

View File

@ -0,0 +1,31 @@
set(LIBUNWIND_SOURCE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libunwind)
set(LIBUNWIND_CXX_SOURCES
${LIBUNWIND_SOURCE_DIR}/src/libunwind.cpp
${LIBUNWIND_SOURCE_DIR}/src/Unwind-EHABI.cpp
${LIBUNWIND_SOURCE_DIR}/src/Unwind-seh.cpp)
if (APPLE)
set(LIBUNWIND_CXX_SOURCES ${LIBUNWIND_CXX_SOURCES} ${LIBUNWIND_SOURCE_DIR}/src/Unwind_AppleExtras.cpp)
endif ()
set(LIBUNWIND_C_SOURCES
${LIBUNWIND_SOURCE_DIR}/src/UnwindLevel1.c
${LIBUNWIND_SOURCE_DIR}/src/UnwindLevel1-gcc-ext.c
${LIBUNWIND_SOURCE_DIR}/src/Unwind-sjlj.c)
set_source_files_properties(${LIBUNWIND_C_SOURCES} PROPERTIES COMPILE_FLAGS "-std=c99")
set(LIBUNWIND_ASM_SOURCES
${LIBUNWIND_SOURCE_DIR}/src/UnwindRegistersRestore.S
${LIBUNWIND_SOURCE_DIR}/src/UnwindRegistersSave.S)
set_source_files_properties(${LIBUNWIND_ASM_SOURCES} PROPERTIES LANGUAGE C)
set(LIBUNWIND_SOURCES
${LIBUNWIND_CXX_SOURCES}
${LIBUNWIND_C_SOURCES}
${LIBUNWIND_ASM_SOURCES})
add_library(unwind_static ${LIBUNWIND_SOURCES})
target_include_directories(unwind_static PUBLIC ${LIBUNWIND_SOURCE_DIR}/include)
target_compile_definitions(unwind_static PRIVATE -D_LIBUNWIND_NO_HEAP=1 -D_DEBUG -D_LIBUNWIND_IS_NATIVE_ONLY)
target_compile_options(unwind_static PRIVATE -fno-exceptions -funwind-tables -fno-sanitize=all -nostdinc++ -fno-rtti)

2
contrib/poco vendored

@ -1 +1 @@
Subproject commit ece721f1085e3894cb5286e8560af84cd1445326
Subproject commit ea2516be366a73a02a82b499ed4a7db1d40037e0

2
contrib/simdjson vendored

@ -1 +1 @@
Subproject commit 2151ad7f34cf773a23f086e941d661f8a8873144
Subproject commit 3bd3116cf8faf6d482dc31423b16533bfa2696f7

View File

@ -11,8 +11,9 @@ set(SIMDJSON_SRC
${SIMDJSON_SRC_DIR}/stage2_build_tape.cpp
${SIMDJSON_SRC_DIR}/parsedjson.cpp
${SIMDJSON_SRC_DIR}/parsedjsoniterator.cpp
${SIMDJSON_SRC_DIR}/simdjson.cpp
)
add_library(${SIMDJSON_LIBRARY} ${SIMDJSON_SRC})
target_include_directories(${SIMDJSON_LIBRARY} PUBLIC "${SIMDJSON_INCLUDE_DIR}")
target_include_directories(${SIMDJSON_LIBRARY} SYSTEM PUBLIC "${SIMDJSON_INCLUDE_DIR}")
target_compile_options(${SIMDJSON_LIBRARY} PRIVATE -mavx2 -mbmi -mbmi2 -mpclmul)

View File

@ -48,7 +48,7 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL "Clang")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Wshadow -Wshadow-uncaptured-local -Wextra-semi -Wcomma -Winconsistent-missing-destructor-override -Wunused-exception-parameter -Wcovered-switch-default -Wold-style-cast -Wrange-loop-analysis -Wunused-member-function -Wunreachable-code -Wunreachable-code-return -Wnewline-eof -Wembedded-directive -Wgnu-case-range -Wunused-macros -Wconditional-uninitialized -Wdeprecated -Wundef -Wreserved-id-macro -Wredundant-parens -Wzero-as-null-pointer-constant")
if (WEVERYTHING)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Weverything -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-missing-noreturn -Wno-padded -Wno-switch-enum -Wno-shadow-field-in-constructor -Wno-deprecated-dynamic-exception-spec -Wno-float-equal -Wno-weak-vtables -Wno-shift-sign-overflow -Wno-sign-conversion -Wno-conversion -Wno-exit-time-destructors -Wno-undefined-func-template -Wno-documentation-unknown-command -Wno-missing-variable-declarations -Wno-unused-template -Wno-global-constructors -Wno-c99-extensions -Wno-missing-prototypes -Wno-weak-template-vtables -Wno-zero-length-array -Wno-gnu-anonymous-struct -Wno-nested-anon-types -Wno-double-promotion -Wno-disabled-macro-expansion -Wno-used-but-marked-unused -Wno-vla-extension -Wno-vla -Wno-packed")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -Weverything -Wno-c++98-compat -Wno-c++98-compat-pedantic -Wno-padded -Wno-switch-enum -Wno-shadow-field-in-constructor -Wno-deprecated-dynamic-exception-spec -Wno-float-equal -Wno-weak-vtables -Wno-shift-sign-overflow -Wno-sign-conversion -Wno-conversion -Wno-exit-time-destructors -Wno-undefined-func-template -Wno-documentation-unknown-command -Wno-missing-variable-declarations -Wno-unused-template -Wno-global-constructors -Wno-c99-extensions -Wno-missing-prototypes -Wno-weak-template-vtables -Wno-zero-length-array -Wno-gnu-anonymous-struct -Wno-nested-anon-types -Wno-double-promotion -Wno-disabled-macro-expansion -Wno-vla-extension -Wno-vla -Wno-packed")
# TODO Enable conversion, sign-conversion, double-promotion warnings.
endif ()
@ -102,6 +102,7 @@ add_headers_and_sources(clickhouse_common_io src/Common/HashTable)
add_headers_and_sources(clickhouse_common_io src/IO)
add_headers_and_sources(dbms src/Core)
add_headers_and_sources(dbms src/Compression/)
add_headers_and_sources(dbms src/DataStreams)
add_headers_and_sources(dbms src/DataTypes)
add_headers_and_sources(dbms src/Databases)
@ -113,6 +114,18 @@ add_headers_and_sources(dbms src/Storages/Distributed)
add_headers_and_sources(dbms src/Storages/MergeTree)
add_headers_and_sources(dbms src/Client)
add_headers_and_sources(dbms src/Formats)
add_headers_and_sources(dbms src/Processors)
add_headers_and_sources(dbms src/Processors/Executors)
add_headers_and_sources(dbms src/Processors/Formats)
add_headers_and_sources(dbms src/Processors/Formats/Impl)
add_headers_and_sources(dbms src/Processors/Transforms)
add_headers_and_sources(dbms src/Processors/Sources)
add_headers_only(dbms src/Server)
if(USE_RDKAFKA)
add_headers_and_sources(dbms src/Storages/Kafka)
endif()
list (APPEND clickhouse_common_io_sources ${CONFIG_BUILD})
list (APPEND clickhouse_common_io_headers ${CONFIG_VERSION} ${CONFIG_COMMON})
@ -222,6 +235,17 @@ target_link_libraries(clickhouse_common_io
roaring
)
if(ZSTD_LIBRARY)
target_link_libraries(clickhouse_common_io PUBLIC ${ZSTD_LIBRARY})
endif()
if (USE_RDKAFKA)
target_link_libraries(dbms PRIVATE ${CPPKAFKA_LIBRARY} ${RDKAFKA_LIBRARY})
if(NOT USE_INTERNAL_RDKAFKA_LIBRARY)
target_include_directories(dbms SYSTEM BEFORE PRIVATE ${RDKAFKA_INCLUDE_DIR})
endif()
endif()
if(RE2_INCLUDE_DIR)
target_include_directories(clickhouse_common_io SYSTEM BEFORE PUBLIC ${RE2_INCLUDE_DIR})
@ -241,20 +265,22 @@ if(CPUINFO_LIBRARY)
endif()
target_link_libraries (dbms
PUBLIC
clickhouse_compression
PRIVATE
clickhouse_parsers
clickhouse_common_config
clickhouse_common_zookeeper
string_utils # FIXME: not sure if it's private
PUBLIC
clickhouse_common_io
PRIVATE
clickhouse_dictionaries_embedded
${LZ4_LIBRARY}
PUBLIC
${MYSQLXX_LIBRARY}
PRIVATE
${BTRIE_LIBRARIES}
${Boost_PROGRAM_OPTIONS_LIBRARY}
${Boost_FILESYSTEM_LIBRARY}
PUBLIC
${Boost_SYSTEM_LIBRARY}
Threads::Threads
@ -263,6 +289,15 @@ target_link_libraries (dbms
target_include_directories(dbms PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/src/Core/include)
target_include_directories(clickhouse_common_io PUBLIC ${CMAKE_CURRENT_BINARY_DIR}/src/Core/include) # uses some includes from core
target_include_directories(dbms SYSTEM BEFORE PUBLIC ${PDQSORT_INCLUDE_DIR})
target_include_directories(dbms SYSTEM PUBLIC ${PCG_RANDOM_INCLUDE_DIR})
if (NOT USE_INTERNAL_LZ4_LIBRARY)
target_include_directories(dbms SYSTEM BEFORE PRIVATE ${LZ4_INCLUDE_DIR})
endif ()
if (NOT USE_INTERNAL_ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
target_include_directories(dbms SYSTEM BEFORE PRIVATE ${ZSTD_INCLUDE_DIR})
endif ()
if (NOT USE_INTERNAL_BOOST_LIBRARY)
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
@ -318,10 +353,6 @@ if (USE_CAPNP)
endif ()
endif ()
if (USE_RDKAFKA)
target_link_libraries (dbms PRIVATE clickhouse_storage_kafka)
endif ()
if (USE_PARQUET)
target_link_libraries(dbms PRIVATE ${PARQUET_LIBRARY})
if (NOT USE_INTERNAL_PARQUET_LIBRARY OR USE_INTERNAL_PARQUET_LIBRARY_NATIVE_CMAKE)
@ -354,6 +385,7 @@ endif()
if (USE_JEMALLOC)
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${JEMALLOC_INCLUDE_DIR}) # used in Interpreters/AsynchronousMetrics.cpp
target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${JEMALLOC_INCLUDE_DIR}) # new_delete.cpp
endif ()
target_include_directories (dbms PUBLIC ${DBMS_INCLUDE_DIR} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/src/Formats/include)
@ -376,6 +408,10 @@ if (ENABLE_TESTS AND USE_GTEST)
# attach all dbms gtest sources
grep_gtest_sources(${ClickHouse_SOURCE_DIR}/dbms dbms_gtest_sources)
add_executable(unit_tests_dbms ${dbms_gtest_sources})
# gtest framework has substandard code
target_compile_options(unit_tests_dbms PRIVATE -Wno-zero-as-null-pointer-constant -Wno-undef -Wno-sign-compare -Wno-used-but-marked-unused -Wno-missing-noreturn)
target_link_libraries(unit_tests_dbms PRIVATE ${GTEST_BOTH_LIBRARIES} clickhouse_functions clickhouse_parsers dbms clickhouse_common_zookeeper)
add_check(unit_tests_dbms)
endif ()

View File

@ -1,11 +1,11 @@
# This strings autochanged from release_lib.sh:
set(VERSION_REVISION 54423)
set(VERSION_REVISION 54424)
set(VERSION_MAJOR 19)
set(VERSION_MINOR 11)
set(VERSION_PATCH 0)
set(VERSION_GITHASH badb6ab8310ed94e20f43ac9a9a227f7a2590009)
set(VERSION_DESCRIBE v19.11.0-testing)
set(VERSION_STRING 19.11.0)
set(VERSION_MINOR 12)
set(VERSION_PATCH 1)
set(VERSION_GITHASH a584f0ca6cb5df9b0d9baf1e2e1eaa7d12a20a44)
set(VERSION_DESCRIBE v19.12.1.1-prestable)
set(VERSION_STRING 19.12.1.1)
# end of autochange
set(VERSION_EXTRA "" CACHE STRING "")

View File

@ -61,6 +61,8 @@ public:
randomize(randomize_), max_iterations(max_iterations_), max_time(max_time_),
json_path(json_path_), settings(settings_), global_context(Context::createGlobal()), pool(concurrency)
{
global_context.makeGlobalContext();
std::cerr << std::fixed << std::setprecision(3);
/// This is needed to receive blocks with columns of AggregateFunction data type

View File

@ -1,5 +1,5 @@
set(CLICKHOUSE_BENCHMARK_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/Benchmark.cpp)
set(CLICKHOUSE_BENCHMARK_LINK PRIVATE clickhouse_aggregate_functions clickhouse_common_config clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
set(CLICKHOUSE_BENCHMARK_LINK PRIVATE dbms clickhouse_aggregate_functions clickhouse_common_config ${Boost_PROGRAM_OPTIONS_LIBRARY})
set(CLICKHOUSE_BENCHMARK_INCLUDE SYSTEM PRIVATE ${PCG_RANDOM_INCLUDE_DIR})
clickhouse_program_add(benchmark)

View File

@ -3,7 +3,7 @@ set(CLICKHOUSE_CLIENT_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/ConnectionParameters.cpp
)
set(CLICKHOUSE_CLIENT_LINK PRIVATE clickhouse_common_config clickhouse_functions clickhouse_aggregate_functions clickhouse_common_io ${LINE_EDITING_LIBS} ${Boost_PROGRAM_OPTIONS_LIBRARY})
set(CLICKHOUSE_CLIENT_LINK PRIVATE clickhouse_common_config clickhouse_functions clickhouse_aggregate_functions clickhouse_common_io clickhouse_parsers string_utils ${LINE_EDITING_LIBS} ${Boost_PROGRAM_OPTIONS_LIBRARY})
set(CLICKHOUSE_CLIENT_INCLUDE SYSTEM PRIVATE ${READLINE_INCLUDE_DIR} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/include)
include(CheckSymbolExists)

View File

@ -218,6 +218,7 @@ private:
configReadClient(config(), home_path);
context.makeGlobalContext();
context.setApplicationType(Context::ApplicationType::CLIENT);
/// settings and limits could be specified in config file, but passed settings has higher priority

View File

@ -1,7 +1,7 @@
# Also in utils
set(CLICKHOUSE_COMPRESSOR_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/Compressor.cpp)
set(CLICKHOUSE_COMPRESSOR_LINK PRIVATE clickhouse_compression clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
set(CLICKHOUSE_COMPRESSOR_LINK PRIVATE dbms clickhouse_parsers ${Boost_PROGRAM_OPTIONS_LIBRARY})
#set(CLICKHOUSE_COMPRESSOR_INCLUDE SYSTEM PRIVATE ...)
clickhouse_program_add(compressor)

View File

@ -1,5 +1,5 @@
set(CLICKHOUSE_COPIER_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/ClusterCopier.cpp)
set(CLICKHOUSE_COPIER_LINK PRIVATE clickhouse_functions clickhouse_table_functions clickhouse_aggregate_functions clickhouse_dictionaries PUBLIC daemon)
set(CLICKHOUSE_COPIER_LINK PRIVATE clickhouse_common_zookeeper clickhouse_parsers clickhouse_functions clickhouse_table_functions clickhouse_aggregate_functions clickhouse_dictionaries string_utils PUBLIC daemon)
set(CLICKHOUSE_COPIER_INCLUDE SYSTEM PRIVATE ${PCG_RANDOM_INCLUDE_DIR})
clickhouse_program_add(copier)

View File

@ -2171,10 +2171,10 @@ void ClusterCopierApp::mainImpl()
<< "revision " << ClickHouseRevision::get() << ")");
auto context = std::make_unique<Context>(Context::createGlobal());
context->makeGlobalContext();
SCOPE_EXIT(context->shutdown());
context->setConfig(loaded_config.configuration);
context->setGlobalContext(*context);
context->setApplicationType(Context::ApplicationType::LOCAL);
context->setPath(process_path);

View File

@ -1,5 +1,5 @@
set(CLICKHOUSE_EXTRACT_FROM_CONFIG_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/ExtractFromConfig.cpp)
set(CLICKHOUSE_EXTRACT_FROM_CONFIG_LINK PRIVATE clickhouse_common_config clickhouse_common_io ${Boost_PROGRAM_OPTIONS_LIBRARY})
set(CLICKHOUSE_EXTRACT_FROM_CONFIG_LINK PRIVATE clickhouse_common_config clickhouse_common_io clickhouse_common_zookeeper ${Boost_PROGRAM_OPTIONS_LIBRARY})
#set(CLICKHOUSE_EXTRACT_FROM_CONFIG_INCLUDE SYSTEM PRIVATE ...)
clickhouse_program_add(extract-from-config)

View File

@ -1,5 +1,5 @@
set(CLICKHOUSE_LOCAL_SOURCES ${CMAKE_CURRENT_SOURCE_DIR}/LocalServer.cpp)
set(CLICKHOUSE_LOCAL_LINK PRIVATE clickhouse_dictionaries clickhouse_common_io clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions ${Boost_PROGRAM_OPTIONS_LIBRARY})
set(CLICKHOUSE_LOCAL_LINK PRIVATE clickhouse_storages_system clickhouse_dictionaries clickhouse_common_config clickhouse_common_io clickhouse_functions clickhouse_aggregate_functions clickhouse_parsers clickhouse_table_functions ${Boost_PROGRAM_OPTIONS_LIBRARY})
#set(CLICKHOUSE_LOCAL_INCLUDE SYSTEM PRIVATE ...)
clickhouse_program_add(local)

View File

@ -131,7 +131,7 @@ try
context = std::make_unique<Context>(Context::createGlobal());
context->setGlobalContext(*context);
context->makeGlobalContext();
context->setApplicationType(Context::ApplicationType::LOCAL);
tryInitPath();
@ -275,8 +275,8 @@ void LocalServer::processQueries()
if (!parse_res.second)
throw Exception("Cannot parse and execute the following part of query: " + String(parse_res.first), ErrorCodes::SYNTAX_ERROR);
context->setSessionContext(*context);
context->setQueryContext(*context);
context->makeSessionContext();
context->makeQueryContext();
context->setUser("default", "", Poco::Net::SocketAddress{}, "");
context->setCurrentQueryId("");

View File

@ -1024,6 +1024,7 @@ try
}
Context context = Context::createGlobal();
context.makeGlobalContext();
ReadBufferFromFileDescriptor file_in(STDIN_FILENO);
WriteBufferFromFileDescriptor file_out(STDOUT_FILENO);

View File

@ -11,7 +11,7 @@ set(CLICKHOUSE_ODBC_BRIDGE_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/validateODBCConnectionString.cpp
)
set(CLICKHOUSE_ODBC_BRIDGE_LINK PRIVATE dbms clickhouse_common_io PUBLIC daemon)
set(CLICKHOUSE_ODBC_BRIDGE_LINK PRIVATE dbms clickhouse_parsers PUBLIC daemon)
set(CLICKHOUSE_ODBC_BRIDGE_INCLUDE PUBLIC ${ClickHouse_SOURCE_DIR}/libs/libdaemon/include)
if (USE_POCO_SQLODBC)

View File

@ -160,7 +160,7 @@ int ODBCBridge::main(const std::vector<std::string> & /*args*/)
http_params->setKeepAliveTimeout(keep_alive_timeout);
context = std::make_shared<Context>(Context::createGlobal());
context->setGlobalContext(*context);
context->makeGlobalContext();
auto server = Poco::Net::HTTPServer(
new HandlerFactory("ODBCRequestHandlerFactory-factory", keep_alive_timeout, context), server_pool, socket, http_params);

View File

@ -1,4 +1,4 @@
set(CLICKHOUSE_PERFORMANCE_TEST_SOURCES
set(CLICKHOUSE_PERFORMANCE_TEST_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/JSONString.cpp
${CMAKE_CURRENT_SOURCE_DIR}/StopConditionsSet.cpp
${CMAKE_CURRENT_SOURCE_DIR}/TestStopConditions.cpp
@ -12,7 +12,7 @@ set(CLICKHOUSE_PERFORMANCE_TEST_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/PerformanceTestSuite.cpp
)
set(CLICKHOUSE_PERFORMANCE_TEST_LINK PRIVATE dbms clickhouse_common_io clickhouse_common_config ${Boost_PROGRAM_OPTIONS_LIBRARY})
set(CLICKHOUSE_PERFORMANCE_TEST_LINK PRIVATE dbms clickhouse_common_config ${Boost_FILESYSTEM_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY})
set(CLICKHOUSE_PERFORMANCE_TEST_INCLUDE SYSTEM PRIVATE ${PCG_RANDOM_INCLUDE_DIR})
clickhouse_program_add(performance-test)

View File

@ -14,10 +14,15 @@ std::vector<XMLConfigurationPtr> ConfigPreprocessor::processConfig(
{
std::vector<XMLConfigurationPtr> result;
for (const auto & path : paths)
for (const auto & path_str : paths)
{
result.emplace_back(XMLConfigurationPtr(new XMLConfiguration(path)));
result.back()->setString("path", Poco::Path(path).absolute().toString());
auto test = XMLConfigurationPtr(new XMLConfiguration(path_str));
result.push_back(test);
const auto path = Poco::Path(path_str);
test->setString("path", path.absolute().toString());
if (test->getString("name", "") == "")
test->setString("name", path.getBaseName());
}
/// Leave tests:

View File

@ -3,6 +3,7 @@
#include <Core/Types.h>
#include <Common/CpuId.h>
#include <common/getMemoryAmount.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
@ -296,6 +297,47 @@ void PerformanceTest::runQueries(
break;
}
}
if (got_SIGINT)
{
return;
}
// Pull memory usage data from query log. The log is normally filled in
// background, so we have to flush it synchronously here to see all the
// previous queries.
{
RemoteBlockInputStream flush_log(connection, "system flush logs",
{} /* header */, context);
flush_log.readPrefix();
while (flush_log.read());
flush_log.readSuffix();
}
for (auto & statistics : statistics_by_run)
{
RemoteBlockInputStream log_reader(connection,
"select memory_usage from system.query_log where type = 2 and query_id = '"
+ statistics.query_id + "'",
{} /* header */, context);
log_reader.readPrefix();
Block block = log_reader.read();
if (block.columns() == 0)
{
LOG_WARNING(log, "Query '" << statistics.query_id << "' is not found in query log.");
continue;
}
assert(block.columns() == 1);
assert(block.getDataTypes()[0]->getName() == "UInt64");
ColumnPtr column = block.getByPosition(0).column;
assert(column->size() == 1);
StringRef ref = column->getDataAt(0);
assert(ref.size == sizeof(UInt64));
statistics.memory_usage = *reinterpret_cast<const UInt64*>(ref.data);
log_reader.readSuffix();
}
}

View File

@ -89,6 +89,7 @@ public:
, input_files(input_files_)
, log(&Poco::Logger::get("PerformanceTestSuite"))
{
global_context.makeGlobalContext();
global_context.getSettingsRef().copyChangesFrom(cmd_settings);
if (input_files.size() < 1)
throw Exception("No tests were specified", ErrorCodes::BAD_ARGUMENTS);
@ -259,15 +260,12 @@ static std::vector<std::string> getInputFiles(const po::variables_map & options,
if (input_files.empty())
throw DB::Exception("Did not find any xml files", DB::ErrorCodes::BAD_ARGUMENTS);
else
LOG_INFO(log, "Found " << input_files.size() << " files");
}
else
{
input_files = options["input-files"].as<std::vector<std::string>>();
LOG_INFO(log, "Found " + std::to_string(input_files.size()) + " input files");
std::vector<std::string> collected_files;
std::vector<std::string> collected_files;
for (const std::string & filename : input_files)
{
fs::path file(filename);
@ -289,6 +287,8 @@ static std::vector<std::string> getInputFiles(const po::variables_map & options,
input_files = std::move(collected_files);
}
LOG_INFO(log, "Found " + std::to_string(input_files.size()) + " input files");
std::sort(input_files.begin(), input_files.end());
return input_files;
}

View File

@ -157,6 +157,8 @@ std::string ReportBuilder::buildFullReport(
runJSON.set("avg_bytes_per_second", statistics.avg_bytes_speed_value);
}
runJSON.set("memory_usage", statistics.memory_usage);
run_infos.push_back(runJSON);
}
}

View File

@ -19,6 +19,7 @@ struct TestStats
Stopwatch avg_bytes_speed_watch;
bool last_query_was_cancelled = false;
std::string query_id;
size_t queries = 0;
@ -49,6 +50,8 @@ struct TestStats
size_t number_of_rows_speed_info_batches = 0;
size_t number_of_bytes_speed_info_batches = 0;
UInt64 memory_usage = 0;
bool ready = false; // check if a query wasn't interrupted by SIGINT
std::string exception;

View File

@ -2,9 +2,11 @@
#include <IO/Progress.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <Core/Block.h>
#include <Poco/UUIDGenerator.h>
namespace DB
{
namespace
{
@ -36,7 +38,7 @@ void checkFulfilledConditionsAndUpdate(
}
}
}
} // anonymous namespace
void executeQuery(
Connection & connection,
@ -47,12 +49,18 @@ void executeQuery(
Context & context,
const Settings & settings)
{
static const std::string query_id_prefix
= Poco::UUIDGenerator::defaultGenerator().create().toString() + "-";
static int next_query_id = 1;
statistics.watch_per_query.restart();
statistics.last_query_was_cancelled = false;
statistics.last_query_rows_read = 0;
statistics.last_query_bytes_read = 0;
statistics.query_id = query_id_prefix + std::to_string(next_query_id++);
RemoteBlockInputStream stream(connection, query, {}, context, &settings);
stream.setQueryId(statistics.query_id);
stream.setProgressCallback(
[&](const Progress & value)
@ -70,4 +78,5 @@ void executeQuery(
statistics.setTotalTime();
}
}

View File

@ -8,11 +8,17 @@ set(CLICKHOUSE_SERVER_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/RootRequestHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/Server.cpp
${CMAKE_CURRENT_SOURCE_DIR}/TCPHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/MySQLHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/MySQLHandlerFactory.cpp
)
set(CLICKHOUSE_SERVER_LINK PRIVATE clickhouse_dictionaries clickhouse_common_io PUBLIC daemon PRIVATE clickhouse_storages_system clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions ${Poco_Net_LIBRARY})
if (USE_POCO_NETSSL)
set(CLICKHOUSE_SERVER_SOURCES
${CLICKHOUSE_SERVER_SOURCES}
${CMAKE_CURRENT_SOURCE_DIR}/MySQLHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/MySQLHandlerFactory.cpp
)
endif ()
set(CLICKHOUSE_SERVER_LINK PRIVATE clickhouse_dictionaries clickhouse_common_io clickhouse_common_config clickhouse_common_zookeeper clickhouse_parsers string_utils PUBLIC daemon PRIVATE clickhouse_storages_system clickhouse_functions clickhouse_aggregate_functions clickhouse_table_functions ${Poco_Net_LIBRARY})
if (USE_POCO_NETSSL)
set(CLICKHOUSE_SERVER_LINK ${CLICKHOUSE_SERVER_LINK} PRIVATE ${Poco_NetSSL_LIBRARY} ${Poco_Crypto_LIBRARY})
endif ()

View File

@ -211,7 +211,6 @@ void HTTPHandler::processQuery(
Output & used_output)
{
Context context = server.context();
context.setGlobalContext(server.context());
CurrentThread::QueryScope query_scope(context);

View File

@ -18,13 +18,19 @@
#include <limits>
#include <ext/scope_guard.h>
#include <openssl/rsa.h>
namespace DB
{
using namespace MySQLProtocol;
using Poco::Net::SecureStreamSocket;
using Poco::Net::SSLManager;
namespace ErrorCodes
{
extern const int MYSQL_CLIENT_INSUFFICIENT_CAPABILITIES;
@ -48,7 +54,7 @@ MySQLHandler::MySQLHandler(IServer & server_, const Poco::Net::StreamSocket & so
void MySQLHandler::run()
{
connection_context = server.context();
connection_context.setSessionContext(connection_context);
connection_context.makeSessionContext();
connection_context.setDefaultFormat("MySQLWire");
in = std::make_shared<ReadBufferFromPocoSocket>(socket());

View File

@ -4,7 +4,6 @@
#include <Poco/Net/SecureStreamSocket.h>
#include <Common/getFQDNOrHostName.h>
#include <Core/MySQLProtocol.h>
#include <openssl/rsa.h>
#include "IServer.h"

View File

@ -187,7 +187,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
* settings, available functions, data types, aggregate functions, databases...
*/
global_context = std::make_unique<Context>(Context::createGlobal());
global_context->setGlobalContext(*global_context);
global_context->makeGlobalContext();
global_context->setApplicationType(Context::ApplicationType::SERVER);
bool has_zookeeper = config().has("zookeeper");
@ -533,10 +533,18 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (!TaskStatsInfoGetter::checkPermissions())
{
LOG_INFO(log, "It looks like the process has no CAP_NET_ADMIN capability, 'taskstats' performance statistics will be disabled."
" It could happen due to incorrect ClickHouse package installation."
" You could resolve the problem manually with 'sudo setcap cap_net_admin=+ep /usr/bin/clickhouse'."
" Note that it will not work on 'nosuid' mounted filesystems."
" It also doesn't work if you run clickhouse-server inside network namespace as it happens in some containers.");
" It could happen due to incorrect ClickHouse package installation."
" You could resolve the problem manually with 'sudo setcap cap_net_admin=+ep /usr/bin/clickhouse'."
" Note that it will not work on 'nosuid' mounted filesystems."
" It also doesn't work if you run clickhouse-server inside network namespace as it happens in some containers.");
}
if (!hasLinuxCapability(CAP_SYS_NICE))
{
LOG_INFO(log, "It looks like the process has no CAP_SYS_NICE capability, the setting 'os_thread_nice' will have no effect."
" It could happen due to incorrect ClickHouse package installation."
" You could resolve the problem manually with 'sudo setcap cap_sys_nice=+ep /usr/bin/clickhouse'."
" Note that it will not work on 'nosuid' mounted filesystems.");
}
#else
LOG_INFO(log, "TaskStats is not implemented for this OS. IO accounting will be disabled.");
@ -738,6 +746,7 @@ int Server::main(const std::vector<std::string> & /*args*/)
if (config().has("mysql_port"))
{
#if USE_POCO_NETSSL
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, config().getInt("mysql_port"), /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan());
@ -749,6 +758,10 @@ int Server::main(const std::vector<std::string> & /*args*/)
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for MySQL compatibility protocol: " + address.toString());
#else
throw Exception{"SSL support for MySQL protocol is disabled because Poco library was built without NetSSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
}
}
catch (const Poco::Exception & e)

View File

@ -28,6 +28,9 @@
#include <Storages/ColumnDefault.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <Compression/CompressionFactory.h>
#include <common/logger_useful.h>
#include <Processors/Formats/LazyOutputFormat.h>
#include "TCPHandler.h"
@ -54,7 +57,7 @@ void TCPHandler::runImpl()
ThreadStatus thread_status;
connection_context = server.context();
connection_context.setSessionContext(connection_context);
connection_context.makeSessionContext();
Settings global_settings = connection_context.getSettings();
@ -170,12 +173,13 @@ void TCPHandler::runImpl()
send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace;
/// Should we send internal logs to client?
const auto client_logs_level = query_context->getSettingsRef().send_logs_level;
if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
&& query_context->getSettingsRef().send_logs_level.value != LogsLevel::none)
&& client_logs_level.value != LogsLevel::none)
{
state.logs_queue = std::make_shared<InternalTextLogsQueue>();
state.logs_queue->max_priority = Poco::Logger::parseLevel(query_context->getSettingsRef().send_logs_level.toString());
CurrentThread::attachInternalTextLogsQueue(state.logs_queue);
state.logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level.value);
}
query_context->setExternalTablesInitializer([&global_settings, this] (Context & context)
@ -207,6 +211,8 @@ void TCPHandler::runImpl()
/// Does the request require receive data from client?
if (state.need_receive_data_for_insert)
processInsertQuery(global_settings);
else if (state.io.pipeline.initialized())
processOrdinaryQueryWithProcessors(query_context->getSettingsRef().max_threads);
else
processOrdinaryQuery();
@ -378,7 +384,10 @@ void TCPHandler::processInsertQuery(const Settings & global_settings)
{
const auto & db_and_table = query_context->getInsertionTable();
if (query_context->getSettingsRef().input_format_defaults_for_omitted_fields)
sendTableColumns(query_context->getTable(db_and_table.first, db_and_table.second)->getColumns());
{
if (!db_and_table.second.empty())
sendTableColumns(query_context->getTable(db_and_table.first, db_and_table.second)->getColumns());
}
}
/// Send block to the client - table structure.
@ -447,9 +456,9 @@ void TCPHandler::processOrdinaryQuery()
*/
if (!block && !isQueryCancelled())
{
sendTotals();
sendExtremes();
sendProfileInfo();
sendTotals(state.io.in->getTotals());
sendExtremes(state.io.in->getExtremes());
sendProfileInfo(state.io.in->getProfileInfo());
sendProgress();
sendLogs();
}
@ -465,6 +474,129 @@ void TCPHandler::processOrdinaryQuery()
state.io.onFinish();
}
void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
{
auto & pipeline = state.io.pipeline;
/// Send header-block, to allow client to prepare output format for data to send.
{
auto & header = pipeline.getHeader();
if (header)
sendData(header);
}
auto lazy_format = std::make_shared<LazyOutputFormat>(pipeline.getHeader());
pipeline.setOutput(lazy_format);
{
auto thread_group = CurrentThread::getGroup();
ThreadPool pool(1);
auto executor = pipeline.execute();
std::atomic_bool exception = false;
pool.schedule([&]()
{
/// ThreadStatus thread_status;
if (thread_group)
CurrentThread::attachTo(thread_group);
SCOPE_EXIT(
if (thread_group)
CurrentThread::detachQueryIfNotDetached();
);
CurrentMetrics::Increment query_thread_metric_increment{CurrentMetrics::QueryThread};
setThreadName("QueryPipelineEx");
try
{
executor->execute(num_threads);
}
catch (...)
{
exception = true;
throw;
}
});
/// Wait in case of exception. Delete pipeline to release memory.
SCOPE_EXIT(
/// Clear queue in case if somebody is waiting lazy_format to push.
lazy_format->finish();
lazy_format->clearQueue();
pool.wait();
pipeline = QueryPipeline()
);
while (true)
{
Block block;
while (true)
{
if (isQueryCancelled())
{
/// A packet was received requesting to stop execution of the request.
executor->cancel();
break;
}
else
{
if (after_send_progress.elapsed() / 1000 >= query_context->getSettingsRef().interactive_delay)
{
/// Some time passed and there is a progress.
after_send_progress.restart();
sendProgress();
}
sendLogs();
if ((block = lazy_format->getBlock(query_context->getSettingsRef().interactive_delay / 1000)))
break;
if (lazy_format->isFinished())
break;
if (exception)
{
pool.wait();
break;
}
}
}
/** If data has run out, we will send the profiling data and total values to
* the last zero block to be able to use
* this information in the suffix output of stream.
* If the request was interrupted, then `sendTotals` and other methods could not be called,
* because we have not read all the data yet,
* and there could be ongoing calculations in other threads at the same time.
*/
if (!block && !isQueryCancelled())
{
pool.wait();
pipeline.finalize();
sendTotals(lazy_format->getTotals());
sendExtremes(lazy_format->getExtremes());
sendProfileInfo(lazy_format->getProfileInfo());
sendProgress();
sendLogs();
}
sendData(block);
if (!block)
break;
}
}
state.io.onFinish();
}
void TCPHandler::processTablesStatusRequest()
{
@ -495,18 +627,16 @@ void TCPHandler::processTablesStatusRequest()
}
void TCPHandler::sendProfileInfo()
void TCPHandler::sendProfileInfo(const BlockStreamProfileInfo & info)
{
writeVarUInt(Protocol::Server::ProfileInfo, *out);
state.io.in->getProfileInfo().write(*out);
info.write(*out);
out->next();
}
void TCPHandler::sendTotals()
void TCPHandler::sendTotals(const Block & totals)
{
const Block & totals = state.io.in->getTotals();
if (totals)
{
initBlockOutput(totals);
@ -521,10 +651,8 @@ void TCPHandler::sendTotals()
}
void TCPHandler::sendExtremes()
void TCPHandler::sendExtremes(const Block & extremes)
{
Block extremes = state.io.in->getExtremes();
if (extremes)
{
initBlockOutput(extremes);
@ -730,7 +858,7 @@ bool TCPHandler::receiveData()
if (!(storage = query_context->tryGetExternalTable(external_table_name)))
{
NamesAndTypesList columns = block.getNamesAndTypesList();
storage = StorageMemory::create(external_table_name, ColumnsDescription{columns});
storage = StorageMemory::create("_external", external_table_name, ColumnsDescription{columns});
storage->startup();
query_context->addExternalTable(external_table_name, storage);
}

View File

@ -145,6 +145,8 @@ private:
/// Process a request that does not require the receiving of data blocks from the client
void processOrdinaryQuery();
void processOrdinaryQueryWithProcessors(size_t num_threads);
void processTablesStatusRequest();
void sendHello();
@ -155,9 +157,9 @@ private:
void sendProgress();
void sendLogs();
void sendEndOfStream();
void sendProfileInfo();
void sendTotals();
void sendExtremes();
void sendProfileInfo(const BlockStreamProfileInfo & info);
void sendTotals(const Block & totals);
void sendExtremes(const Block & extremes);
/// Creates state.block_in/block_out for blocks read/write, depending on whether compression is enabled.
void initBlockInput();

View File

@ -45,11 +45,11 @@ namespace
/// Such default parameters were picked because they did good on some tests,
/// though it still requires to fit parameters to achieve better result
auto learning_rate = Float64(0.01);
auto l2_reg_coef = Float64(0.1);
UInt32 batch_size = 15;
auto learning_rate = Float64(1.0);
auto l2_reg_coef = Float64(0.5);
UInt64 batch_size = 15;
std::string weights_updater_name = "SGD";
std::string weights_updater_name = "Adam";
std::unique_ptr<IGradientComputer> gradient_computer;
if (!parameters.empty())
@ -62,12 +62,12 @@ namespace
}
if (parameters.size() > 2)
{
batch_size = applyVisitor(FieldVisitorConvertToNumber<UInt32>(), parameters[2]);
batch_size = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), parameters[2]);
}
if (parameters.size() > 3)
{
weights_updater_name = parameters[3].safeGet<String>();
if (weights_updater_name != "SGD" && weights_updater_name != "Momentum" && weights_updater_name != "Nesterov")
if (weights_updater_name != "SGD" && weights_updater_name != "Momentum" && weights_updater_name != "Nesterov" && weights_updater_name != "Adam")
throw Exception("Invalid parameter for weights updater. The only supported are 'SGD', 'Momentum' and 'Nesterov'",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
@ -106,8 +106,8 @@ void registerAggregateFunctionMLMethod(AggregateFunctionFactory & factory)
LinearModelData::LinearModelData(
Float64 learning_rate,
Float64 l2_reg_coef,
UInt32 param_num,
UInt32 batch_capacity,
UInt64 param_num,
UInt64 batch_capacity,
std::shared_ptr<DB::IGradientComputer> gradient_computer,
std::shared_ptr<DB::IWeightsUpdater> weights_updater)
: learning_rate(learning_rate)
@ -126,7 +126,7 @@ void LinearModelData::update_state()
if (batch_size == 0)
return;
weights_updater->update(batch_size, weights, bias, gradient_batch);
weights_updater->update(batch_size, weights, bias, learning_rate, gradient_batch);
batch_size = 0;
++iter_num;
gradient_batch.assign(gradient_batch.size(), Float64{0.0});
@ -191,6 +191,7 @@ void LinearModelData::merge(const DB::LinearModelData & rhs)
update_state();
/// can't update rhs state because it's constant
/// squared mean is more stable (in sence of quality of prediction) when two states with quietly different number of learning steps are merged
Float64 frac = (static_cast<Float64>(iter_num) * iter_num) / (iter_num * iter_num + rhs.iter_num * rhs.iter_num);
for (size_t i = 0; i < weights.size(); ++i)
@ -210,7 +211,7 @@ void LinearModelData::add(const IColumn ** columns, size_t row_num)
/// Here we have columns + 1 as first column corresponds to target value, and others - to features
weights_updater->add_to_batch(
gradient_batch, *gradient_computer, weights, bias, learning_rate, l2_reg_coef, target, columns + 1, row_num);
gradient_batch, *gradient_computer, weights, bias, l2_reg_coef, target, columns + 1, row_num);
++batch_size;
if (batch_size == batch_capacity)
@ -219,6 +220,90 @@ void LinearModelData::add(const IColumn ** columns, size_t row_num)
}
}
/// Weights updaters
void Adam::write(WriteBuffer & buf) const
{
writeBinary(average_gradient, buf);
writeBinary(average_squared_gradient, buf);
}
void Adam::read(ReadBuffer & buf)
{
readBinary(average_gradient, buf);
readBinary(average_squared_gradient, buf);
}
void Adam::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac)
{
auto & adam_rhs = static_cast<const Adam &>(rhs);
if (average_gradient.empty())
{
if (!average_squared_gradient.empty() ||
adam_rhs.average_gradient.size() != adam_rhs.average_squared_gradient.size())
throw Exception("Average_gradient and average_squared_gradient must have same size", ErrorCodes::LOGICAL_ERROR);
average_gradient.resize(adam_rhs.average_gradient.size(), Float64{0.0});
average_squared_gradient.resize(adam_rhs.average_squared_gradient.size(), Float64{0.0});
}
for (size_t i = 0; i < average_gradient.size(); ++i)
{
average_gradient[i] = average_gradient[i] * frac + adam_rhs.average_gradient[i] * rhs_frac;
average_squared_gradient[i] = average_squared_gradient[i] * frac + adam_rhs.average_squared_gradient[i] * rhs_frac;
}
beta1_powered_ *= adam_rhs.beta1_powered_;
beta2_powered_ *= adam_rhs.beta2_powered_;
}
void Adam::update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient)
{
if (average_gradient.empty())
{
if (!average_squared_gradient.empty())
throw Exception("Average_gradient and average_squared_gradient must have same size", ErrorCodes::LOGICAL_ERROR);
average_gradient.resize(batch_gradient.size(), Float64{0.0});
average_squared_gradient.resize(batch_gradient.size(), Float64{0.0});
}
for (size_t i = 0; i != average_gradient.size(); ++i)
{
Float64 normed_gradient = batch_gradient[i] / batch_size;
average_gradient[i] = beta1_ * average_gradient[i] + (1 - beta1_) * normed_gradient;
average_squared_gradient[i] = beta2_ * average_squared_gradient[i] +
(1 - beta2_) * normed_gradient * normed_gradient;
}
for (size_t i = 0; i < weights.size(); ++i)
{
weights[i] += (learning_rate * average_gradient[i]) /
((1 - beta1_powered_) * (sqrt(average_squared_gradient[i] / (1 - beta2_powered_)) + eps_));
}
bias += (learning_rate * average_gradient[weights.size()]) /
((1 - beta1_powered_) * (sqrt(average_squared_gradient[weights.size()] / (1 - beta2_powered_)) + eps_));
beta1_powered_ *= beta1_;
beta2_powered_ *= beta2_;
}
void Adam::add_to_batch(
std::vector<Float64> & batch_gradient,
IGradientComputer & gradient_computer,
const std::vector<Float64> & weights,
Float64 bias,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
size_t row_num)
{
if (average_gradient.empty())
{
average_gradient.resize(batch_gradient.size(), Float64{0.0});
average_squared_gradient.resize(batch_gradient.size(), Float64{0.0});
}
gradient_computer.compute(batch_gradient, weights, bias, l2_reg_coef, target, columns, row_num);
}
void Nesterov::read(ReadBuffer & buf)
{
@ -233,13 +318,16 @@ void Nesterov::write(WriteBuffer & buf) const
void Nesterov::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac)
{
auto & nesterov_rhs = static_cast<const Nesterov &>(rhs);
if (accumulated_gradient.empty())
accumulated_gradient.resize(nesterov_rhs.accumulated_gradient.size(), Float64{0.0});
for (size_t i = 0; i < accumulated_gradient.size(); ++i)
{
accumulated_gradient[i] = accumulated_gradient[i] * frac + nesterov_rhs.accumulated_gradient[i] * rhs_frac;
}
}
void Nesterov::update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient)
void Nesterov::update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient)
{
if (accumulated_gradient.empty())
{
@ -248,7 +336,7 @@ void Nesterov::update(UInt32 batch_size, std::vector<Float64> & weights, Float64
for (size_t i = 0; i < batch_gradient.size(); ++i)
{
accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + batch_gradient[i] / batch_size;
accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + (learning_rate * batch_gradient[i]) / batch_size;
}
for (size_t i = 0; i < weights.size(); ++i)
{
@ -262,7 +350,6 @@ void Nesterov::add_to_batch(
IGradientComputer & gradient_computer,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
@ -280,7 +367,7 @@ void Nesterov::add_to_batch(
}
auto shifted_bias = bias + accumulated_gradient[weights.size()] * alpha_;
gradient_computer.compute(batch_gradient, shifted_weights, shifted_bias, learning_rate, l2_reg_coef, target, columns, row_num);
gradient_computer.compute(batch_gradient, shifted_weights, shifted_bias, l2_reg_coef, target, columns, row_num);
}
void Momentum::read(ReadBuffer & buf)
@ -302,7 +389,7 @@ void Momentum::merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac
}
}
void Momentum::update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient)
void Momentum::update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient)
{
/// batch_size is already checked to be greater than 0
if (accumulated_gradient.empty())
@ -312,7 +399,7 @@ void Momentum::update(UInt32 batch_size, std::vector<Float64> & weights, Float64
for (size_t i = 0; i < batch_gradient.size(); ++i)
{
accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + batch_gradient[i] / batch_size;
accumulated_gradient[i] = accumulated_gradient[i] * alpha_ + (learning_rate * batch_gradient[i]) / batch_size;
}
for (size_t i = 0; i < weights.size(); ++i)
{
@ -322,14 +409,14 @@ void Momentum::update(UInt32 batch_size, std::vector<Float64> & weights, Float64
}
void StochasticGradientDescent::update(
UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient)
UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient)
{
/// batch_size is already checked to be greater than 0
for (size_t i = 0; i < weights.size(); ++i)
{
weights[i] += batch_gradient[i] / batch_size;
weights[i] += (learning_rate * batch_gradient[i]) / batch_size;
}
bias += batch_gradient[weights.size()] / batch_size;
bias += (learning_rate * batch_gradient[weights.size()]) / batch_size;
}
void IWeightsUpdater::add_to_batch(
@ -337,15 +424,16 @@ void IWeightsUpdater::add_to_batch(
IGradientComputer & gradient_computer,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
size_t row_num)
{
gradient_computer.compute(batch_gradient, weights, bias, learning_rate, l2_reg_coef, target, columns, row_num);
gradient_computer.compute(batch_gradient, weights, bias, l2_reg_coef, target, columns, row_num);
}
/// Gradient computers
void LogisticRegression::predict(
ColumnVector<Float64>::Container & container,
Block & block,
@ -387,7 +475,6 @@ void LogisticRegression::compute(
std::vector<Float64> & batch_gradient,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
@ -402,11 +489,11 @@ void LogisticRegression::compute(
derivative *= target;
derivative = exp(derivative);
batch_gradient[weights.size()] += learning_rate * target / (derivative + 1);
batch_gradient[weights.size()] += target / (derivative + 1);
for (size_t i = 0; i < weights.size(); ++i)
{
auto value = (*columns[i]).getFloat64(row_num);
batch_gradient[i] += learning_rate * target * value / (derivative + 1) - 2 * learning_rate * l2_reg_coef * weights[i];
batch_gradient[i] += target * value / (derivative + 1) - 2 * l2_reg_coef * weights[i];
}
}
@ -459,7 +546,6 @@ void LinearRegression::compute(
std::vector<Float64> & batch_gradient,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
@ -471,13 +557,13 @@ void LinearRegression::compute(
auto value = (*columns[i]).getFloat64(row_num);
derivative -= weights[i] * value;
}
derivative *= (2 * learning_rate);
derivative *= 2;
batch_gradient[weights.size()] += derivative;
for (size_t i = 0; i < weights.size(); ++i)
{
auto value = (*columns[i]).getFloat64(row_num);
batch_gradient[i] += derivative * value - 2 * learning_rate * l2_reg_coef * weights[i];
batch_gradient[i] += derivative * value - 2 * l2_reg_coef * weights[i];
}
}

View File

@ -33,7 +33,6 @@ public:
std::vector<Float64> & batch_gradient,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
@ -60,7 +59,6 @@ public:
std::vector<Float64> & batch_gradient,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
@ -87,7 +85,6 @@ public:
std::vector<Float64> & batch_gradient,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
@ -120,14 +117,18 @@ public:
IGradientComputer & gradient_computer,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
size_t row_num);
/// Updates current weights according to the gradient from the last mini-batch
virtual void update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & gradient) = 0;
virtual void update(
UInt64 batch_size,
std::vector<Float64> & weights,
Float64 & bias,
Float64 learning_rate,
const std::vector<Float64> & gradient) = 0;
/// Used during the merge of two states
virtual void merge(const IWeightsUpdater &, Float64, Float64) {}
@ -143,7 +144,7 @@ public:
class StochasticGradientDescent : public IWeightsUpdater
{
public:
void update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient) override;
void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override;
};
@ -154,7 +155,7 @@ public:
Momentum(Float64 alpha) : alpha_(alpha) {}
void update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient) override;
void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override;
virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override;
@ -180,13 +181,12 @@ public:
IGradientComputer & gradient_computer,
const std::vector<Float64> & weights,
Float64 bias,
Float64 learning_rate,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
size_t row_num) override;
void update(UInt32 batch_size, std::vector<Float64> & weights, Float64 & bias, const std::vector<Float64> & batch_gradient) override;
void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override;
virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override;
@ -195,11 +195,51 @@ public:
void read(ReadBuffer & buf) override;
private:
Float64 alpha_{0.1};
const Float64 alpha_ = 0.9;
std::vector<Float64> accumulated_gradient;
};
class Adam : public IWeightsUpdater
{
public:
Adam()
{
beta1_powered_ = beta1_;
beta2_powered_ = beta2_;
}
void add_to_batch(
std::vector<Float64> & batch_gradient,
IGradientComputer & gradient_computer,
const std::vector<Float64> & weights,
Float64 bias,
Float64 l2_reg_coef,
Float64 target,
const IColumn ** columns,
size_t row_num) override;
void update(UInt64 batch_size, std::vector<Float64> & weights, Float64 & bias, Float64 learning_rate, const std::vector<Float64> & batch_gradient) override;
virtual void merge(const IWeightsUpdater & rhs, Float64 frac, Float64 rhs_frac) override;
void write(WriteBuffer & buf) const override;
void read(ReadBuffer & buf) override;
private:
/// beta1 and beta2 hyperparameters have such recommended values
const Float64 beta1_ = 0.9;
const Float64 beta2_ = 0.999;
const Float64 eps_ = 0.000001;
Float64 beta1_powered_;
Float64 beta2_powered_;
std::vector<Float64> average_gradient;
std::vector<Float64> average_squared_gradient;
};
/** LinearModelData is a class which manages current state of learning
*/
class LinearModelData
@ -210,8 +250,8 @@ public:
LinearModelData(
Float64 learning_rate,
Float64 l2_reg_coef,
UInt32 param_num,
UInt32 batch_capacity,
UInt64 param_num,
UInt64 batch_capacity,
std::shared_ptr<IGradientComputer> gradient_computer,
std::shared_ptr<IWeightsUpdater> weights_updater);
@ -269,7 +309,7 @@ public:
std::string weights_updater_name,
Float64 learning_rate,
Float64 l2_reg_coef,
UInt32 batch_size,
UInt64 batch_size,
const DataTypes & arguments_types,
const Array & params)
: IAggregateFunctionDataHelper<Data, AggregateFunctionMLMethod<Data, Name>>(arguments_types, params)
@ -303,6 +343,8 @@ public:
new_weights_updater = std::make_shared<Momentum>();
else if (weights_updater_name == "Nesterov")
new_weights_updater = std::make_shared<Nesterov>();
else if (weights_updater_name == "Adam")
new_weights_updater = std::make_shared<Adam>();
else
throw Exception("Illegal name of weights updater (should have been checked earlier)", ErrorCodes::LOGICAL_ERROR);
@ -355,10 +397,10 @@ public:
const char * getHeaderFilePath() const override { return __FILE__; }
private:
UInt32 param_num;
UInt64 param_num;
Float64 learning_rate;
Float64 l2_reg_coef;
UInt32 batch_size;
UInt64 batch_size;
std::shared_ptr<IGradientComputer> gradient_computer;
std::string weights_updater_name;
};
@ -371,4 +413,5 @@ struct NameLogisticRegression
{
static constexpr auto name = "stochasticLogisticRegression";
};
}

View File

@ -82,14 +82,6 @@ public:
{
if (!returns_many && levels.size() > 1)
throw Exception("Aggregate function " + getName() + " require one parameter or less", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if constexpr (std::is_same_v<Data, QuantileTiming<Value>>)
{
/// QuantileTiming only supports integers (it works only for unsigned integers but signed are also accepted for convenience).
if (!isInteger(argument_type))
throw Exception("Argument for function " + std::string(Name::name) + " must be integer, but it has type "
+ argument_type->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
}
String getName() const override { return Name::name; }
@ -111,16 +103,21 @@ public:
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
/// Out of range conversion may occur. This is Ok.
auto value = static_cast<const ColVecType &>(*columns[0]).getData()[row_num];
const auto & column = static_cast<const ColVecType &>(*columns[0]);
if constexpr (std::is_same_v<Data, QuantileTiming<Value>>)
{
/// QuantileTiming only supports integers.
if (isNaN(value) || value > std::numeric_limits<Value>::max() || value < std::numeric_limits<Value>::min())
return;
}
if constexpr (has_second_arg)
this->data(place).add(
column.getData()[row_num],
value,
columns[1]->getUInt(row_num));
else
this->data(place).add(column.getData()[row_num]);
this->data(place).add(value);
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override

View File

@ -1,7 +1,6 @@
#pragma once
#include <Common/HashTable/Hash.h>
#include <Common/MemoryTracker.h>
#include <Common/PODArray.h>
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
@ -513,8 +512,6 @@ private:
void mediumToLarge()
{
CurrentMemoryTracker::alloc(sizeof(detail::QuantileTimingLarge));
/// While the data is copied from medium, it is not possible to set `large` value (otherwise it will overwrite some data).
detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge;
@ -528,8 +525,6 @@ private:
void tinyToLarge()
{
CurrentMemoryTracker::alloc(sizeof(detail::QuantileTimingLarge));
/// While the data is copied from `medium` it is not possible to set `large` value (otherwise it will overwrite some data).
detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge;
@ -562,8 +557,6 @@ public:
else if (kind == Kind::Large)
{
delete large;
CurrentMemoryTracker::free(sizeof(detail::QuantileTimingLarge));
}
}

View File

@ -126,20 +126,32 @@ private:
{
for (size_t i = 0; i < buf_size(); ++i)
{
if (buf[i] && !good(buf[i]))
if (buf[i])
{
buf[i] = 0;
--m_size;
if (!good(buf[i]))
{
buf[i] = 0;
--m_size;
}
/** After removing the elements, there may have been room for items,
* which were placed further than necessary, due to a collision.
* You need to move them.
*/
else if (i != place(buf[i]))
{
HashValue x = buf[i];
buf[i] = 0;
reinsertImpl(x);
}
}
}
/** After removing the elements, there may have been room for items,
* which were placed further than necessary, due to a collision.
* You need to move them.
/** We must process first collision resolution chain once again.
* Look at the comment in "resize" function.
*/
for (size_t i = 0; i < buf_size(); ++i)
for (size_t i = 0; i < buf_size() && buf[i]; ++i)
{
if (unlikely(buf[i] && i != place(buf[i])))
if (i != place(buf[i]))
{
HashValue x = buf[i];
buf[i] = 0;

View File

@ -12,5 +12,6 @@ add_subdirectory (Interpreters)
add_subdirectory (AggregateFunctions)
add_subdirectory (Client)
add_subdirectory (TableFunctions)
add_subdirectory (Processors)
add_subdirectory (Formats)
add_subdirectory (Compression)

View File

@ -73,7 +73,7 @@ void Connection::connect(const ConnectionTimeouts & timeouts)
current_resolved_address = DNSResolver::instance().resolveAddress(host, port);
socket->connect(current_resolved_address, timeouts.connection_timeout);
socket->connect(*current_resolved_address, timeouts.connection_timeout);
socket->setReceiveTimeout(timeouts.receive_timeout);
socket->setSendTimeout(timeouts.send_timeout);
socket->setNoDelay(true);
@ -533,12 +533,9 @@ void Connection::sendExternalTablesData(ExternalTablesData & data)
LOG_DEBUG(log_wrapper.get(), msg.rdbuf());
}
Poco::Net::SocketAddress Connection::getResolvedAddress() const
std::optional<Poco::Net::SocketAddress> Connection::getResolvedAddress() const
{
if (connected)
return current_resolved_address;
return DNSResolver::instance().resolveAddress(host, port);
return current_resolved_address;
}
@ -595,7 +592,9 @@ Connection::Packet Connection::receivePacket()
switch (res.type)
{
case Protocol::Server::Data:
case Protocol::Server::Data: [[fallthrough]];
case Protocol::Server::Totals: [[fallthrough]];
case Protocol::Server::Extremes:
res.block = receiveData();
return res;
@ -611,16 +610,6 @@ Connection::Packet Connection::receivePacket()
res.profile_info = receiveProfileInfo();
return res;
case Protocol::Server::Totals:
/// Block with total values is passed in same form as ordinary block. The only difference is packed id.
res.block = receiveData();
return res;
case Protocol::Server::Extremes:
/// Same as above.
res.block = receiveData();
return res;
case Protocol::Server::Log:
res.block = receiveLogData();
return res;
@ -720,11 +709,14 @@ void Connection::initBlockLogsInput()
void Connection::setDescription()
{
auto resolved_address = getResolvedAddress();
description = host + ":" + toString(resolved_address.port());
auto ip_address = resolved_address.host().toString();
description = host + ":" + toString(port);
if (host != ip_address)
description += ", " + ip_address;
if (resolved_address)
{
auto ip_address = resolved_address->host().toString();
if (host != ip_address)
description += ", " + ip_address;
}
}

View File

@ -63,7 +63,7 @@ public:
Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0))
:
host(host_), port(port_), default_database(default_database_),
user(user_), password(password_), current_resolved_address(host, port),
user(user_), password(password_),
client_name(client_name_),
compression(compression_),
secure(secure_),
@ -168,9 +168,6 @@ public:
size_t outBytesCount() const { return out ? out->count() : 0; }
size_t inBytesCount() const { return in ? in->count() : 0; }
/// Returns initially resolved address
Poco::Net::SocketAddress getResolvedAddress() const;
private:
String host;
UInt16 port;
@ -180,12 +177,15 @@ private:
/// Address is resolved during the first connection (or the following reconnects)
/// Use it only for logging purposes
Poco::Net::SocketAddress current_resolved_address;
std::optional<Poco::Net::SocketAddress> current_resolved_address;
/// For messages in log and in exceptions.
String description;
void setDescription();
/// Returns resolved address if it was resolved.
std::optional<Poco::Net::SocketAddress> getResolvedAddress() const;
String client_name;
bool connected = false;

View File

@ -6,8 +6,8 @@
#include <unicode/ucol.h>
#else
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wunused-private-field"
#pragma clang diagnostic ignored "-Wmissing-noreturn"
#endif
#endif
@ -26,7 +26,6 @@ namespace DB
}
}
Collator::Collator(const std::string & locale_) : locale(Poco::toLower(locale_))
{
#if USE_ICU

View File

@ -178,11 +178,16 @@ StringRef ColumnArray::serializeValueIntoArena(size_t n, Arena & arena, char con
char * pos = arena.allocContinue(sizeof(array_size), begin);
memcpy(pos, &array_size, sizeof(array_size));
size_t values_size = 0;
for (size_t i = 0; i < array_size; ++i)
values_size += getData().serializeValueIntoArena(offset + i, arena, begin).size;
StringRef res(pos, sizeof(array_size));
return StringRef(begin, sizeof(array_size) + values_size);
for (size_t i = 0; i < array_size; ++i)
{
auto value_ref = getData().serializeValueIntoArena(offset + i, arena, begin);
res.data = value_ref.data - res.size;
res.size += value_ref.size;
}
return res;
}

View File

@ -103,12 +103,13 @@ StringRef ColumnNullable::serializeValueIntoArena(size_t n, Arena & arena, char
auto pos = arena.allocContinue(s, begin);
memcpy(pos, &arr[n], s);
size_t nested_size = 0;
if (arr[n])
return StringRef(pos, s);
if (arr[n] == 0)
nested_size = getNestedColumn().serializeValueIntoArena(n, arena, begin).size;
auto nested_ref = getNestedColumn().serializeValueIntoArena(n, arena, begin);
return StringRef{begin, s + nested_size};
/// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back.
return StringRef(nested_ref.data - s, nested_ref.size + s);
}
const char * ColumnNullable::deserializeAndInsertFromArena(const char * pos)

View File

@ -142,11 +142,15 @@ void ColumnTuple::popBack(size_t n)
StringRef ColumnTuple::serializeValueIntoArena(size_t n, Arena & arena, char const *& begin) const
{
size_t values_size = 0;
StringRef res(begin, 0);
for (auto & column : columns)
values_size += column->serializeValueIntoArena(n, arena, begin).size;
{
auto value_ref = column->serializeValueIntoArena(n, arena, begin);
res.data = value_ref.data - res.size;
res.size += value_ref.size;
}
return StringRef(begin, values_size);
return res;
}
const char * ColumnTuple::deserializeAndInsertFromArena(const char * pos)

View File

@ -300,19 +300,19 @@ StringRef ColumnUnique<ColumnType>::serializeValueIntoArena(size_t n, Arena & ar
{
if (is_nullable)
{
const UInt8 null_flag = 1;
const UInt8 not_null_flag = 0;
static constexpr auto s = sizeof(UInt8);
auto pos = arena.allocContinue(sizeof(null_flag), begin);
auto & flag = (n == getNullValueIndex() ? null_flag : not_null_flag);
memcpy(pos, &flag, sizeof(flag));
auto pos = arena.allocContinue(s, begin);
UInt8 flag = (n == getNullValueIndex() ? 1 : 0);
unalignedStore<UInt8>(pos, flag);
size_t nested_size = 0;
if (n == getNullValueIndex())
return StringRef(pos, s);
if (n != getNullValueIndex())
nested_size = column_holder->serializeValueIntoArena(n, arena, begin).size;
auto nested_ref = column_holder->serializeValueIntoArena(n, arena, begin);
return StringRef(pos, sizeof(null_flag) + nested_size);
/// serializeValueIntoArena may reallocate memory. Have to use ptr from nested_ref.data and move it back.
return StringRef(nested_ref.data - s, nested_ref.size + s);
}
return column_holder->serializeValueIntoArena(n, arena, begin);

View File

@ -1,4 +0,0 @@
if(USE_GTEST)
add_executable(column_unique column_unique.cpp)
target_link_libraries(column_unique PRIVATE dbms ${GTEST_BOTH_LIBRARIES})
endif()

View File

@ -7,12 +7,6 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
#include <unordered_map>

View File

@ -108,13 +108,92 @@ class AllocatorWithHint : Hint
{
protected:
static constexpr bool clear_memory = clear_memory_;
static constexpr size_t small_memory_threshold = mmap_threshold;
public:
/// Allocate memory range.
void * alloc(size_t size, size_t alignment = 0)
{
CurrentMemoryTracker::alloc(size);
return allocNoTrack(size, alignment);
}
/// Free memory range.
void free(void * buf, size_t size)
{
freeNoTrack(buf, size);
CurrentMemoryTracker::free(size);
}
/** Enlarge memory range.
* Data from old range is moved to the beginning of new range.
* Address of memory range could change.
*/
void * realloc(void * buf, size_t old_size, size_t new_size, size_t alignment = 0)
{
if (old_size == new_size)
{
/// nothing to do.
/// BTW, it's not possible to change alignment while doing realloc.
}
else if (old_size < mmap_threshold && new_size < mmap_threshold && alignment <= MALLOC_MIN_ALIGNMENT)
{
/// Resize malloc'd memory region with no special alignment requirement.
CurrentMemoryTracker::realloc(old_size, new_size);
void * new_buf = ::realloc(buf, new_size);
if (nullptr == new_buf)
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
buf = new_buf;
if constexpr (clear_memory)
if (new_size > old_size)
memset(reinterpret_cast<char *>(buf) + old_size, 0, new_size - old_size);
}
else if (old_size >= mmap_threshold && new_size >= mmap_threshold)
{
/// Resize mmap'd memory region.
CurrentMemoryTracker::realloc(old_size, new_size);
// On apple and freebsd self-implemented mremap used (common/mremap.h)
buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_MREMAP);
/// No need for zero-fill, because mmap guarantees it.
}
else if (new_size < small_memory_threshold)
{
/// Small allocs that requires a copy. Assume there's enough memory in system. Call CurrentMemoryTracker once.
CurrentMemoryTracker::realloc(old_size, new_size);
void * new_buf = allocNoTrack(new_size, alignment);
memcpy(new_buf, buf, std::min(old_size, new_size));
freeNoTrack(buf, old_size);
buf = new_buf;
}
else
{
/// Big allocs that requires a copy. MemoryTracker is called inside 'alloc', 'free' methods.
void * new_buf = alloc(new_size, alignment);
memcpy(new_buf, buf, std::min(old_size, new_size));
free(buf, old_size);
buf = new_buf;
}
return buf;
}
protected:
static constexpr size_t getStackThreshold()
{
return 0;
}
private:
void * allocNoTrack(size_t size, size_t alignment)
{
void * buf;
if (size >= mmap_threshold)
@ -149,15 +228,14 @@ public:
if (0 != res)
DB::throwFromErrno("Cannot allocate memory (posix_memalign) " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
if (clear_memory)
if constexpr (clear_memory)
memset(buf, 0, size);
}
}
return buf;
}
/// Free memory range.
void free(void * buf, size_t size)
void freeNoTrack(void * buf, size_t size)
{
if (size >= mmap_threshold)
{
@ -168,63 +246,6 @@ public:
{
::free(buf);
}
CurrentMemoryTracker::free(size);
}
/** Enlarge memory range.
* Data from old range is moved to the beginning of new range.
* Address of memory range could change.
*/
void * realloc(void * buf, size_t old_size, size_t new_size, size_t alignment = 0)
{
if (old_size == new_size)
{
/// nothing to do.
/// BTW, it's not possible to change alignment while doing realloc.
}
else if (old_size < mmap_threshold && new_size < mmap_threshold && alignment <= MALLOC_MIN_ALIGNMENT)
{
/// Resize malloc'd memory region with no special alignment requirement.
CurrentMemoryTracker::realloc(old_size, new_size);
void * new_buf = ::realloc(buf, new_size);
if (nullptr == new_buf)
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
buf = new_buf;
if (clear_memory && new_size > old_size)
memset(reinterpret_cast<char *>(buf) + old_size, 0, new_size - old_size);
}
else if (old_size >= mmap_threshold && new_size >= mmap_threshold)
{
/// Resize mmap'd memory region.
CurrentMemoryTracker::realloc(old_size, new_size);
// On apple and freebsd self-implemented mremap used (common/mremap.h)
buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_MREMAP);
/// No need for zero-fill, because mmap guarantees it.
}
else
{
/// All other cases that requires a copy. MemoryTracker is called inside 'alloc', 'free' methods.
void * new_buf = alloc(new_size, alignment);
memcpy(new_buf, buf, std::min(old_size, new_size));
free(buf, old_size);
buf = new_buf;
}
return buf;
}
protected:
static constexpr size_t getStackThreshold()
{
return 0;
}
};
@ -267,7 +288,7 @@ public:
{
if (size <= N)
{
if (Base::clear_memory)
if constexpr (Base::clear_memory)
memset(stack_memory, 0, N);
return stack_memory;
}

View File

@ -3,7 +3,6 @@
#include <Common/HashTable/SmallTable.h>
#include <Common/HashTable/HashSet.h>
#include <Common/HyperLogLogCounter.h>
#include <Common/MemoryTracker.h>
#include <Core/Defines.h>
@ -230,7 +229,6 @@ private:
if (getContainerType() != details::ContainerType::SMALL)
throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR);
CurrentMemoryTracker::alloc(sizeof(Medium));
auto tmp_medium = std::make_unique<Medium>();
for (const auto & x : small)
@ -247,7 +245,6 @@ private:
if ((container_type != details::ContainerType::SMALL) && (container_type != details::ContainerType::MEDIUM))
throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR);
CurrentMemoryTracker::alloc(sizeof(Large));
auto tmp_large = std::make_unique<Large>();
if (container_type == details::ContainerType::SMALL)
@ -277,15 +274,11 @@ private:
{
delete medium;
medium = nullptr;
CurrentMemoryTracker::free(sizeof(Medium));
}
else if (container_type == details::ContainerType::LARGE)
{
delete large;
large = nullptr;
CurrentMemoryTracker::free(sizeof(Large));
}
}

View File

@ -14,8 +14,9 @@ namespace DB
namespace Cpu
{
#if defined(__x86_64__) || defined(__i386__)
inline UInt64 _xgetbv(UInt32 xcr) noexcept
#if (defined(__x86_64__) || defined(__i386__))
/// Our version is independent of -mxsave option, because we do dynamic dispatch.
inline UInt64 our_xgetbv(UInt32 xcr) noexcept
{
UInt32 eax;
UInt32 edx;
@ -185,7 +186,7 @@ bool haveAVX() noexcept
// http://www.intel.com/content/dam/www/public/us/en/documents/manuals/64-ia-32-architectures-optimization-manual.pdf
// https://bugs.chromium.org/p/chromium/issues/detail?id=375968
return haveOSXSAVE() // implies haveXSAVE()
&& (_xgetbv(0) & 6u) == 6u // XMM state and YMM state are enabled by OS
&& (our_xgetbv(0) & 6u) == 6u // XMM state and YMM state are enabled by OS
&& ((CpuInfo(0x1).ecx >> 28) & 1u); // AVX bit
#else
return false;
@ -217,8 +218,8 @@ bool haveAVX512F() noexcept
#if defined(__x86_64__) || defined(__i386__)
// https://software.intel.com/en-us/articles/how-to-detect-knl-instruction-support
return haveOSXSAVE() // implies haveXSAVE()
&& (_xgetbv(0) & 6u) == 6u // XMM state and YMM state are enabled by OS
&& ((_xgetbv(0) >> 5) & 7u) == 7u // ZMM state is enabled by OS
&& (our_xgetbv(0) & 6u) == 6u // XMM state and YMM state are enabled by OS
&& ((our_xgetbv(0) >> 5) & 7u) == 7u // ZMM state is enabled by OS
&& CpuInfo(0x0).eax >= 0x7 // leaf 7 is present
&& ((CpuInfo(0x7).ebx >> 16) & 1u); // AVX512F bit
#else

View File

@ -23,7 +23,7 @@ void CurrentThread::updatePerformanceCounters()
{
if (unlikely(!current_thread))
return;
get().updatePerformanceCounters();
current_thread->updatePerformanceCounters();
}
ThreadStatus & CurrentThread::get()
@ -36,35 +36,42 @@ ThreadStatus & CurrentThread::get()
ProfileEvents::Counters & CurrentThread::getProfileEvents()
{
return current_thread ? get().performance_counters : ProfileEvents::global_counters;
return current_thread ? current_thread->performance_counters : ProfileEvents::global_counters;
}
MemoryTracker * CurrentThread::getMemoryTracker()
{
if (unlikely(!current_thread))
return nullptr;
return &get().memory_tracker;
return &current_thread->memory_tracker;
}
Int64 & CurrentThread::getUntrackedMemory()
{
/// It assumes that (current_thread != nullptr) is already checked with getMemoryTracker()
return current_thread->untracked_memory;
}
void CurrentThread::updateProgressIn(const Progress & value)
{
if (unlikely(!current_thread))
return;
get().progress_in.incrementPiecewiseAtomically(value);
current_thread->progress_in.incrementPiecewiseAtomically(value);
}
void CurrentThread::updateProgressOut(const Progress & value)
{
if (unlikely(!current_thread))
return;
get().progress_out.incrementPiecewiseAtomically(value);
current_thread->progress_out.incrementPiecewiseAtomically(value);
}
void CurrentThread::attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue)
void CurrentThread::attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue,
LogsLevel client_logs_level)
{
if (unlikely(!current_thread))
return;
get().attachInternalTextLogsQueue(logs_queue);
current_thread->attachInternalTextLogsQueue(logs_queue, client_logs_level);
}
std::shared_ptr<InternalTextLogsQueue> CurrentThread::getInternalTextLogsQueue()
@ -73,10 +80,10 @@ std::shared_ptr<InternalTextLogsQueue> CurrentThread::getInternalTextLogsQueue()
if (unlikely(!current_thread))
return nullptr;
if (get().getCurrentState() == ThreadStatus::ThreadState::Died)
if (current_thread->getCurrentState() == ThreadStatus::ThreadState::Died)
return nullptr;
return get().getInternalTextLogsQueue();
return current_thread->getInternalTextLogsQueue();
}
ThreadGroupStatusPtr CurrentThread::getGroup()
@ -84,7 +91,7 @@ ThreadGroupStatusPtr CurrentThread::getGroup()
if (unlikely(!current_thread))
return nullptr;
return get().getThreadGroup();
return current_thread->getThreadGroup();
}
}

View File

@ -3,6 +3,7 @@
#include <memory>
#include <string>
#include <common/StringRef.h>
#include <Common/ThreadStatus.h>
@ -38,7 +39,8 @@ public:
static ThreadGroupStatusPtr getGroup();
/// A logs queue used by TCPHandler to pass logs to a client
static void attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue);
static void attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue,
LogsLevel client_logs_level);
static std::shared_ptr<InternalTextLogsQueue> getInternalTextLogsQueue();
/// Makes system calls to update ProfileEvents that contain info from rusage and taskstats
@ -46,6 +48,7 @@ public:
static ProfileEvents::Counters & getProfileEvents();
static MemoryTracker * getMemoryTracker();
static Int64 & getUntrackedMemory();
/// Update read and write rows (bytes) statistics (used in system.query_thread_log)
static void updateProgressIn(const Progress & value);
@ -69,7 +72,7 @@ public:
static void finalizePerformanceCounters();
/// Returns a non-empty string if the thread is attached to a query
static const std::string & getQueryId();
static StringRef getQueryId();
/// Non-master threads call this method in destructor automatically
static void detachQuery();

View File

@ -433,6 +433,7 @@ namespace ErrorCodes
extern const int UNKNOWN_QUERY_PARAMETER = 456;
extern const int BAD_QUERY_PARAMETER = 457;
extern const int CANNOT_UNLINK = 458;
extern const int CANNOT_SET_THREAD_PRIORITY = 459;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -0,0 +1,47 @@
#pragma once
#include <cstdint>
#include <mutex>
#include <condition_variable>
/** Allow to subscribe for multiple events and wait for them one by one in arbitrary order.
*/
class EventCounter
{
private:
size_t events_happened = 0;
size_t events_waited = 0;
mutable std::mutex mutex;
std::condition_variable condvar;
public:
void notify()
{
{
std::lock_guard lock(mutex);
++events_happened;
}
condvar.notify_all();
}
void wait()
{
std::unique_lock lock(mutex);
condvar.wait(lock, [&]{ return events_happened > events_waited; });
++events_waited;
}
template <typename Duration>
bool waitFor(Duration && duration)
{
std::unique_lock lock(mutex);
if (condvar.wait(lock, std::forward<Duration>(duration), [&]{ return events_happened > events_waited; }))
{
++events_waited;
return true;
}
return false;
}
};

View File

@ -42,35 +42,40 @@ void FileChecker::update(const Files::const_iterator & begin, const Files::const
save();
}
bool FileChecker::check() const
CheckResults FileChecker::check() const
{
/** Read the files again every time you call `check` - so as not to violate the constancy.
* `check` method is rarely called.
*/
CheckResults results;
Map local_map;
load(local_map, files_info_path);
if (local_map.empty())
return true;
return {};
for (const auto & name_size : local_map)
{
Poco::File file(Poco::Path(files_info_path).parent().toString() + "/" + name_size.first);
Poco::Path path = Poco::Path(files_info_path).parent().toString() + "/" + name_size.first;
Poco::File file(path);
if (!file.exists())
{
LOG_ERROR(log, "File " << file.path() << " doesn't exist");
return false;
results.emplace_back(path.getFileName(), false, "File " + file.path() + " doesn't exist");
break;
}
size_t real_size = file.getSize();
if (real_size != name_size.second)
{
LOG_ERROR(log, "Size of " << file.path() << " is wrong. Size is " << real_size << " but should be " << name_size.second);
return false;
results.emplace_back(path.getFileName(), false, "Size of " + file.path() + " is wrong. Size is " + toString(real_size) + " but should be " + toString(name_size.second));
break;
}
results.emplace_back(path.getFileName(), true, "");
}
return true;
return results;
}
void FileChecker::initialize()

View File

@ -3,6 +3,7 @@
#include <string>
#include <common/logger_useful.h>
#include <Poco/File.h>
#include <Storages/CheckResults.h>
namespace DB
@ -24,7 +25,7 @@ public:
void update(const Files::const_iterator & begin, const Files::const_iterator & end);
/// Check the files whose parameters are specified in sizes.json
bool check() const;
CheckResults check() const;
private:
void initialize();

View File

@ -4,7 +4,6 @@
#include <Common/HyperLogLogCounter.h>
#include <Common/HashTable/SmallTable.h>
#include <Common/MemoryTracker.h>
namespace DB
@ -39,8 +38,6 @@ private:
void toLarge()
{
CurrentMemoryTracker::alloc(sizeof(Large));
/// At the time of copying data from `tiny`, setting the value of `large` is still not possible (otherwise it will overwrite some data).
Large * tmp_large = new Large;
@ -56,11 +53,7 @@ public:
~HyperLogLogWithSmallSetOptimization()
{
if (isLarge())
{
delete large;
CurrentMemoryTracker::free(sizeof(Large));
}
}
void insert(Key value)

View File

@ -1,3 +1,5 @@
#include <cstdlib>
#include "MemoryTracker.h"
#include <common/likely.h>
#include <common/logger_useful.h>
@ -17,6 +19,8 @@ namespace DB
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
/// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters.
static constexpr Int64 untracked_memory_limit = 4 * 1024 * 1024;
MemoryTracker::~MemoryTracker()
@ -85,6 +89,9 @@ void MemoryTracker::alloc(Int64 size)
{
free(size);
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
auto untrack_lock = blocker.cancel();
std::stringstream message;
message << "Memory tracker";
if (description)
@ -100,6 +107,9 @@ void MemoryTracker::alloc(Int64 size)
{
free(size);
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
auto untrack_lock = blocker.cancel();
std::stringstream message;
message << "Memory limit";
if (description)
@ -191,19 +201,41 @@ namespace CurrentMemoryTracker
void alloc(Int64 size)
{
if (auto memory_tracker = DB::CurrentThread::getMemoryTracker())
memory_tracker->alloc(size);
{
Int64 & untracked = DB::CurrentThread::getUntrackedMemory();
untracked += size;
if (untracked > untracked_memory_limit)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be usefull for enlarge Exception message in rethrow logic.
Int64 tmp = untracked;
untracked = 0;
memory_tracker->alloc(tmp);
}
}
}
void realloc(Int64 old_size, Int64 new_size)
{
if (auto memory_tracker = DB::CurrentThread::getMemoryTracker())
memory_tracker->alloc(new_size - old_size);
Int64 addition = new_size - old_size;
if (addition > 0)
alloc(addition);
else
free(-addition);
}
void free(Int64 size)
{
if (auto memory_tracker = DB::CurrentThread::getMemoryTracker())
memory_tracker->free(size);
{
Int64 & untracked = DB::CurrentThread::getUntrackedMemory();
untracked -= size;
if (untracked < -untracked_memory_limit)
{
memory_tracker->free(-untracked);
untracked = 0;
}
}
}
}

View File

@ -45,7 +45,11 @@ public:
void realloc(Int64 old_size, Int64 new_size)
{
alloc(new_size - old_size);
Int64 addition = new_size - old_size;
if (addition > 0)
alloc(addition);
else
free(-addition);
}
/** This function should be called after memory deallocation.

View File

@ -5,15 +5,33 @@
#include "MiAllocator.h"
#include <mimalloc.h>
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <IO/WriteHelpers.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_ALLOCATE_MEMORY;
}
void * MiAllocator::alloc(size_t size, size_t alignment)
{
void * ptr;
if (alignment == 0)
return mi_malloc(size);
{
ptr = mi_malloc(size);
if (!ptr)
DB::throwFromErrno("MiAllocator: Cannot allocate in mimalloc " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
}
else
return mi_malloc_aligned(size, alignment);
{
ptr = mi_malloc_aligned(size, alignment);
if (!ptr)
DB::throwFromErrno("MiAllocator: Cannot allocate in mimalloc (mi_malloc_aligned) " + formatReadableSizeWithBinarySuffix(size) + " with alignment " + toString(alignment) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
}
return ptr;
}
void MiAllocator::free(void * buf, size_t)
@ -32,10 +50,21 @@ void * MiAllocator::realloc(void * old_ptr, size_t, size_t new_size, size_t alig
return nullptr;
}
if (alignment == 0)
return mi_realloc(old_ptr, alignment);
void * ptr;
return mi_realloc_aligned(old_ptr, new_size, alignment);
if (alignment == 0)
{
ptr = mi_realloc(old_ptr, alignment);
if (!ptr)
DB::throwFromErrno("MiAllocator: Cannot reallocate in mimalloc " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
}
else
{
ptr = mi_realloc_aligned(old_ptr, new_size, alignment);
if (!ptr)
DB::throwFromErrno("MiAllocator: Cannot reallocate in mimalloc (mi_realloc_aligned) " + formatReadableSizeWithBinarySuffix(size) + " with alignment " + toString(alignment) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
}
return ptr;
}
}

View File

@ -189,3 +189,16 @@ private:
Timestamp stop_ts;
bool is_running = false;
};
template <typename TStopwatch>
class StopwatchGuard : public TStopwatch
{
public:
explicit StopwatchGuard(UInt64 & elapsed_ns_) : elapsed_ns(elapsed_ns_) {}
~StopwatchGuard() { elapsed_ns += TStopwatch::elapsedNanoseconds(); }
private:
UInt64 & elapsed_ns;
};

View File

@ -34,6 +34,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
// Replace NLMSG_OK with explicit casts since that system macro contains signedness bugs which are not going to be fixed.
static inline bool is_nlmsg_ok(const struct nlmsghdr * const nlh, const ssize_t len)
{
return len >= static_cast<ssize_t>(sizeof(*nlh)) && nlh->nlmsg_len >= sizeof(*nlh) && static_cast<size_t>(len) >= nlh->nlmsg_len;
}
namespace
{
@ -128,7 +133,7 @@ struct NetlinkMessage
if (header.nlmsg_type == NLMSG_ERROR)
throw Exception("Can't receive Netlink response: error " + std::to_string(error.error), ErrorCodes::NETLINK_ERROR);
if (!NLMSG_OK((&header), bytes_received))
if (!is_nlmsg_ok(&header, bytes_received))
throw Exception("Can't receive Netlink response: wrong number of bytes received", ErrorCodes::NETLINK_ERROR);
}
};

View File

@ -50,6 +50,19 @@ ThreadStatus::ThreadStatus()
ThreadStatus::~ThreadStatus()
{
try
{
if (untracked_memory > 0)
memory_tracker.alloc(untracked_memory);
else
memory_tracker.free(-untracked_memory);
}
catch (const DB::Exception &)
{
/// It's a minor tracked memory leak here (not the memory itself but it's counter).
/// We've already allocated a little bit more then the limit and cannot track it in the thread memory tracker or its parent.
}
if (deleter)
deleter();
current_thread = nullptr;
@ -117,7 +130,8 @@ void ThreadStatus::assertState(const std::initializer_list<int> & permitted_stat
throw Exception(ss.str(), ErrorCodes::LOGICAL_ERROR);
}
void ThreadStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue)
void ThreadStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue,
LogsLevel client_logs_level)
{
logs_queue_ptr = logs_queue;
@ -126,6 +140,7 @@ void ThreadStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr &
std::lock_guard lock(thread_group->mutex);
thread_group->logs_queue_ptr = logs_queue;
thread_group->client_logs_level = client_logs_level;
}
}

View File

@ -1,8 +1,11 @@
#pragma once
#include <common/StringRef.h>
#include <Common/ProfileEvents.h>
#include <Common/MemoryTracker.h>
#include <Core/SettingsCommon.h>
#include <IO/Progress.h>
#include <memory>
@ -61,6 +64,8 @@ public:
UInt32 master_thread_number = 0;
Int32 master_thread_os_id = -1;
LogsLevel client_logs_level = LogsLevel::none;
String query;
};
@ -85,10 +90,14 @@ public:
UInt32 thread_number = 0;
/// Linux's PID (or TGID) (the same id is shown by ps util)
Int32 os_thread_id = -1;
/// Also called "nice" value. If it was changed to non-zero (when attaching query) - will be reset to zero when query is detached.
Int32 os_thread_priority = 0;
/// TODO: merge them into common entity
ProfileEvents::Counters performance_counters{VariableContext::Thread};
MemoryTracker memory_tracker{VariableContext::Thread};
/// Small amount of untracked memory (per thread atomic-less counter)
Int64 untracked_memory = 0;
/// Statistics of read and write rows/bytes
Progress progress_in;
@ -114,7 +123,7 @@ public:
return thread_state.load(std::memory_order_relaxed);
}
const std::string & getQueryId() const;
StringRef getQueryId() const;
/// Starts new query and create new thread group for it, current thread becomes master thread of the query
void initializeQuery();
@ -127,7 +136,8 @@ public:
return thread_state == Died ? nullptr : logs_queue_ptr.lock();
}
void attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue);
void attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue,
LogsLevel client_logs_level);
/// Sets query context for current thread and its thread group
/// NOTE: query_context have to be alive until detachQuery() is called

View File

@ -5,11 +5,6 @@
#include <iostream>
#include <chrono>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
#include <Common/ShellCommand.h>

View File

@ -153,7 +153,7 @@ void formatIPv6(const unsigned char * src, char *& dst, UInt8 zeroed_tail_bytes_
}
/// Was it a trailing run of 0x00's?
if (best.base != -1 && size_t(best.base + best.len) == words.size())
if (best.base != -1 && size_t(best.base) + size_t(best.len) == words.size())
*dst++ = ':';
*dst++ = '\0';

View File

@ -23,9 +23,7 @@ namespace DB
* - the routing rules that affect which network interface we go to the specified address are not checked.
*/
bool isLocalAddress(const Poco::Net::SocketAddress & address, UInt16 clickhouse_port);
bool isLocalAddress(const Poco::Net::SocketAddress & address);
bool isLocalAddress(const Poco::Net::IPAddress & address);
/// Returns number of different bytes in hostnames, used for load balancing

View File

@ -0,0 +1,143 @@
#include <new>
#include <common/config_common.h>
#include <common/memory.h>
#include <Common/MemoryTracker.h>
/// Replace default new/delete with memory tracking versions.
/// @sa https://en.cppreference.com/w/cpp/memory/new/operator_new
/// https://en.cppreference.com/w/cpp/memory/new/operator_delete
#if NOT_UNBUNDLED
namespace Memory
{
ALWAYS_INLINE void trackMemory(std::size_t size)
{
#if USE_JEMALLOC
/// The nallocx() function allocates no memory, but it performs the same size computation as the mallocx() function
/// @note je_mallocx() != je_malloc(). It's expected they don't differ much in allocation logic.
if (likely(size != 0))
CurrentMemoryTracker::alloc(nallocx(size, 0));
#else
CurrentMemoryTracker::alloc(size);
#endif
}
ALWAYS_INLINE bool trackMemoryNoExept(std::size_t size) noexcept
{
try
{
trackMemory(size);
}
catch (...)
{
return false;
}
return true;
}
ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [[maybe_unused]] = 0) noexcept
{
try
{
#if USE_JEMALLOC
/// @note It's also possible to use je_malloc_usable_size() here.
if (likely(ptr != nullptr))
CurrentMemoryTracker::free(sallocx(ptr, 0));
#else
if (size)
CurrentMemoryTracker::free(size);
#endif
}
catch (...)
{}
}
}
/// new
void * operator new(std::size_t size)
{
Memory::trackMemory(size);
return Memory::newImpl(size);
}
void * operator new[](std::size_t size)
{
Memory::trackMemory(size);
return Memory::newImpl(size);
}
void * operator new(std::size_t size, const std::nothrow_t &) noexcept
{
if (likely(Memory::trackMemoryNoExept(size)))
return Memory::newNoExept(size);
return nullptr;
}
void * operator new[](std::size_t size, const std::nothrow_t &) noexcept
{
if (likely(Memory::trackMemoryNoExept(size)))
return Memory::newNoExept(size);
return nullptr;
}
/// delete
/// C++17 std 21.6.2.1 (11)
/// If a function without a size parameter is defined, the program should also define the corresponding function with a size parameter.
/// If a function with a size parameter is defined, the program shall also define the corresponding version without the size parameter.
/// cppreference:
/// It's unspecified whether size-aware or size-unaware version is called when deleting objects of
/// incomplete type and arrays of non-class and trivially-destructible class types.
void operator delete(void * ptr) noexcept
{
Memory::untrackMemory(ptr);
Memory::deleteImpl(ptr);
}
void operator delete[](void * ptr) noexcept
{
Memory::untrackMemory(ptr);
Memory::deleteImpl(ptr);
}
void operator delete(void * ptr, std::size_t size) noexcept
{
Memory::untrackMemory(ptr, size);
Memory::deleteSized(ptr, size);
}
void operator delete[](void * ptr, std::size_t size) noexcept
{
Memory::untrackMemory(ptr, size);
Memory::deleteSized(ptr, size);
}
#else
/// new
void * operator new(std::size_t size) { return Memory::newImpl(size); }
void * operator new[](std::size_t size) { return Memory::newImpl(size); }
void * operator new(std::size_t size, const std::nothrow_t &) noexcept { return Memory::newNoExept(size); }
void * operator new[](std::size_t size, const std::nothrow_t &) noexcept { return Memory::newNoExept(size); }
/// delete
void operator delete(void * ptr) noexcept { Memory::deleteImpl(ptr); }
void operator delete[](void * ptr) noexcept { Memory::deleteImpl(ptr); }
void operator delete(void * ptr, const std::nothrow_t &) noexcept { Memory::deleteImpl(ptr); }
void operator delete[](void * ptr, const std::nothrow_t &) noexcept { Memory::deleteImpl(ptr); }
void operator delete(void * ptr, std::size_t size) noexcept { Memory::deleteSized(ptr, size); }
void operator delete[](void * ptr, std::size_t size) noexcept { Memory::deleteSized(ptr, size); }
#endif

View File

@ -23,10 +23,10 @@ add_executable (small_table small_table.cpp)
target_link_libraries (small_table PRIVATE clickhouse_common_io)
add_executable (parallel_aggregation parallel_aggregation.cpp)
target_link_libraries (parallel_aggregation PRIVATE clickhouse_compression clickhouse_common_io)
target_link_libraries (parallel_aggregation PRIVATE dbms)
add_executable (parallel_aggregation2 parallel_aggregation2.cpp)
target_link_libraries (parallel_aggregation2 PRIVATE clickhouse_compression clickhouse_common_io)
target_link_libraries (parallel_aggregation2 PRIVATE dbms)
add_executable (int_hashes_perf int_hashes_perf.cpp AvalancheTest.cpp Random.cpp)
target_link_libraries (int_hashes_perf PRIVATE clickhouse_common_io)
@ -42,7 +42,7 @@ add_executable (radix_sort radix_sort.cpp)
target_link_libraries (radix_sort PRIVATE clickhouse_common_io)
add_executable (arena_with_free_lists arena_with_free_lists.cpp)
target_link_libraries (arena_with_free_lists PRIVATE clickhouse_compression clickhouse_common_io)
target_link_libraries (arena_with_free_lists PRIVATE dbms)
add_executable (pod_array pod_array.cpp)
target_link_libraries (pod_array PRIVATE clickhouse_common_io)
@ -62,7 +62,7 @@ target_link_libraries (space_saving PRIVATE clickhouse_common_io)
add_executable (integer_hash_tables_and_hashes integer_hash_tables_and_hashes.cpp)
target_include_directories (integer_hash_tables_and_hashes SYSTEM BEFORE PRIVATE ${SPARCEHASH_INCLUDE_DIR})
target_link_libraries (integer_hash_tables_and_hashes PRIVATE clickhouse_compression clickhouse_common_io)
target_link_libraries (integer_hash_tables_and_hashes PRIVATE dbms)
add_executable (allocator allocator.cpp)
target_link_libraries (allocator PRIVATE clickhouse_common_io)
@ -75,3 +75,6 @@ target_link_libraries (cow_compositions PRIVATE clickhouse_common_io)
add_executable (stopwatch stopwatch.cpp)
target_link_libraries (stopwatch PRIVATE clickhouse_common_io)
add_executable (mi_malloc_test mi_malloc_test.cpp)
target_link_libraries (mi_malloc_test PRIVATE clickhouse_common_io)

View File

@ -1,8 +1,3 @@
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
#include <Common/RWLock.h>

View File

@ -9,11 +9,6 @@
#include <chrono>
#include <thread>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>

View File

@ -1,10 +1,5 @@
#include <Common/ThreadPool.h>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
/** Reproduces bug in ThreadPool.

View File

@ -2,11 +2,6 @@
#include <iostream>
#include <Common/ThreadPool.h>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>
/// Test for thread self-removal when number of free threads in pool is too large.

View File

@ -2,11 +2,6 @@
#include <iostream>
#include <Common/ThreadPool.h>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>

View File

@ -2,11 +2,6 @@
#include <stdexcept>
#include <Common/ThreadPool.h>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>

View File

@ -1,10 +1,5 @@
#include <Common/escapeForFileName.h>
#pragma GCC diagnostic ignored "-Wsign-compare"
#ifdef __clang__
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
#pragma clang diagnostic ignored "-Wundef"
#endif
#include <gtest/gtest.h>

View File

@ -0,0 +1,118 @@
/** In addition to ClickHouse (Apache 2) license, this file can be also used under MIT license:
MIT License
Copyright (c) 2019 Yandex LLC, Alexey Milovidov
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
*/
#include <map>
#include <vector>
#include <cstdint>
#include <random>
#include <stdexcept>
#include <iostream>
#include <Common/config.h>
//#undef USE_MIMALLOC
//#define USE_MIMALLOC 0
#if USE_MIMALLOC
#include <mimalloc.h>
#define malloc mi_malloc
#define free mi_free
#else
#include <stdlib.h>
#endif
size_t total_size{0};
struct Allocation
{
void * ptr = nullptr;
size_t size = 0;
Allocation() {}
Allocation(size_t size)
: size(size)
{
ptr = malloc(size);
if (!ptr)
throw std::runtime_error("Cannot allocate memory");
total_size += size;
}
~Allocation()
{
if (ptr)
{
free(ptr);
total_size -= size;
}
ptr = nullptr;
}
Allocation(const Allocation &) = delete;
Allocation(Allocation && rhs)
{
ptr = rhs.ptr;
size = rhs.size;
rhs.ptr = nullptr;
rhs.size = 0;
}
};
int main(int, char **)
{
std::vector<Allocation> allocations;
constexpr size_t limit = 100000000;
constexpr size_t min_alloc_size = 65536;
constexpr size_t max_alloc_size = 10000000;
std::mt19937 rng;
auto distribution = std::uniform_int_distribution(min_alloc_size, max_alloc_size);
size_t total_allocations = 0;
while (true)
{
size_t size = distribution(rng);
while (total_size + size > limit)
allocations.pop_back();
allocations.emplace_back(size);
++total_allocations;
if (total_allocations % (1ULL << 20) == 0)
std::cerr << "Total allocations: " << total_allocations << "\n";
}
}

View File

@ -34,7 +34,7 @@ struct AggregateIndependent
{
results.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
results.emplace_back(new Map);
results.emplace_back(std::make_unique<Map>());
for (size_t i = 0; i < num_threads; ++i)
{
@ -77,7 +77,7 @@ struct AggregateIndependentWithSequentialKeysOptimization
{
results.reserve(num_threads);
for (size_t i = 0; i < num_threads; ++i)
results.emplace_back(new Map);
results.emplace_back(std::make_unique<Map>());
for (size_t i = 0; i < num_threads; ++i)
{

View File

@ -1,21 +1,3 @@
include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(clickhouse_compression .)
add_library(clickhouse_compression ${clickhouse_compression_headers} ${clickhouse_compression_sources})
target_link_libraries(clickhouse_compression PRIVATE clickhouse_parsers clickhouse_common_io ${LZ4_LIBRARY} ${CITYHASH_LIBRARIES})
if(ZSTD_LIBRARY)
target_link_libraries(clickhouse_compression PRIVATE ${ZSTD_LIBRARY})
endif()
target_include_directories(clickhouse_compression PUBLIC ${DBMS_INCLUDE_DIR})
target_include_directories(clickhouse_compression SYSTEM PUBLIC ${PCG_RANDOM_INCLUDE_DIR})
if (NOT USE_INTERNAL_LZ4_LIBRARY)
target_include_directories(clickhouse_compression SYSTEM BEFORE PRIVATE ${LZ4_INCLUDE_DIR})
endif ()
if (NOT USE_INTERNAL_ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
target_include_directories(clickhouse_compression SYSTEM BEFORE PRIVATE ${ZSTD_INCLUDE_DIR})
endif ()
if(ENABLE_TESTS)
add_subdirectory(tests)
endif()

View File

@ -12,6 +12,7 @@
#include <algorithm>
#include <cstdlib>
#include <type_traits>
#include <limits>
namespace DB
{
@ -24,28 +25,23 @@ extern const int CANNOT_DECOMPRESS;
namespace
{
UInt32 getDeltaTypeByteSize(UInt8 data_bytes_size)
Int64 getMaxValueForByteSize(UInt8 byte_size)
{
// both delta and double delta can be twice the size of data item, but not less than 32 bits and not more that 64.
return std::min(64/8, std::max(32/8, data_bytes_size * 2));
}
UInt32 getCompressedHeaderSize(UInt8 data_bytes_size)
{
const UInt8 items_count_size = 4;
return items_count_size + data_bytes_size + getDeltaTypeByteSize(data_bytes_size);
}
UInt32 getCompressedDataSize(UInt8 data_bytes_size, UInt32 uncompressed_size)
{
const UInt32 items_count = uncompressed_size / data_bytes_size;
// 11111 + max 64 bits of double delta.
const UInt32 max_item_size_bits = 5 + getDeltaTypeByteSize(data_bytes_size) * 8;
// + 8 is to round up to next byte.
return (items_count * max_item_size_bits + 8) / 8;
switch (byte_size)
{
case sizeof(UInt8):
return std::numeric_limits<Int8>::max();
case sizeof(UInt16):
return std::numeric_limits<Int16>::max();
case sizeof(UInt32):
return std::numeric_limits<Int32>::max();
case sizeof(UInt64):
return std::numeric_limits<Int64>::max();
default:
assert(false && "only 1, 2, 4 and 8 data sizes are supported");
}
__builtin_unreachable();
}
struct WriteSpec
@ -55,8 +51,10 @@ struct WriteSpec
const UInt8 data_bits;
};
const std::array<UInt8, 5> DELTA_SIZES{7, 9, 12, 32, 64};
template <typename T>
WriteSpec getWriteSpec(const T & value)
WriteSpec getDeltaWriteSpec(const T & value)
{
if (value > -63 && value < 64)
{
@ -80,27 +78,60 @@ WriteSpec getWriteSpec(const T & value)
}
}
template <typename T, typename DeltaType>
WriteSpec getDeltaMaxWriteSpecByteSize(UInt8 data_bytes_size)
{
return getDeltaWriteSpec(getMaxValueForByteSize(data_bytes_size));
}
UInt32 getCompressedHeaderSize(UInt8 data_bytes_size)
{
const UInt8 items_count_size = 4;
const UInt8 first_delta_bytes_size = data_bytes_size;
return items_count_size + data_bytes_size + first_delta_bytes_size;
}
UInt32 getCompressedDataSize(UInt8 data_bytes_size, UInt32 uncompressed_size)
{
const UInt32 items_count = uncompressed_size / data_bytes_size;
const auto double_delta_write_spec = getDeltaMaxWriteSpecByteSize(data_bytes_size);
const UInt32 max_item_size_bits = double_delta_write_spec.prefix_bits + double_delta_write_spec.data_bits;
// + 8 is to round up to next byte.
auto result = (items_count * max_item_size_bits + 7) / 8;
return result;
}
template <typename ValueType>
UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest)
{
static_assert(std::is_unsigned_v<T> && std::is_signed_v<DeltaType>, "T must be unsigned, while DeltaType must be signed integer type.");
using UnsignedDeltaType = typename std::make_unsigned<DeltaType>::type;
// Since only unsinged int has granted 2-compliment overflow handling, we are doing math here on unsigned types.
// To simplify and booletproof code, we operate enforce ValueType to be unsigned too.
static_assert(std::is_unsigned_v<ValueType>, "ValueType must be unsigned.");
using UnsignedDeltaType = ValueType;
if (source_size % sizeof(T) != 0)
throw Exception("Cannot compress, data size " + toString(source_size) + " is not aligned to " + toString(sizeof(T)), ErrorCodes::CANNOT_COMPRESS);
// We use signed delta type to turn huge unsigned values into smaller signed:
// ffffffff => -1
using SignedDeltaType = typename std::make_signed<UnsignedDeltaType>::type;
if (source_size % sizeof(ValueType) != 0)
throw Exception("Cannot compress, data size " + toString(source_size)
+ " is not aligned to " + toString(sizeof(ValueType)), ErrorCodes::CANNOT_COMPRESS);
const char * source_end = source + source_size;
const UInt32 items_count = source_size / sizeof(T);
const UInt32 items_count = source_size / sizeof(ValueType);
unalignedStore<UInt32>(dest, items_count);
dest += sizeof(items_count);
T prev_value{};
DeltaType prev_delta{};
ValueType prev_value{};
UnsignedDeltaType prev_delta{};
if (source < source_end)
{
prev_value = unalignedLoad<T>(source);
unalignedStore<T>(dest, prev_value);
prev_value = unalignedLoad<ValueType>(source);
unalignedStore<ValueType>(dest, prev_value);
source += sizeof(prev_value);
dest += sizeof(prev_value);
@ -108,24 +139,26 @@ UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest)
if (source < source_end)
{
const T curr_value = unalignedLoad<T>(source);
prev_delta = static_cast<DeltaType>(curr_value - prev_value);
unalignedStore<DeltaType>(dest, prev_delta);
const ValueType curr_value = unalignedLoad<ValueType>(source);
prev_delta = curr_value - prev_value;
unalignedStore<UnsignedDeltaType>(dest, prev_delta);
source += sizeof(curr_value);
dest += sizeof(prev_delta);
prev_value = curr_value;
}
WriteBuffer buffer(dest, getCompressedDataSize(sizeof(T), source_size - sizeof(T)*2));
WriteBuffer buffer(dest, getCompressedDataSize(sizeof(ValueType), source_size - sizeof(ValueType)*2));
BitWriter writer(buffer);
for (; source < source_end; source += sizeof(T))
int item = 2;
for (; source < source_end; source += sizeof(ValueType), ++item)
{
const T curr_value = unalignedLoad<T>(source);
const ValueType curr_value = unalignedLoad<ValueType>(source);
const DeltaType delta = static_cast<DeltaType>(curr_value - prev_value);
const DeltaType double_delta = delta - prev_delta;
const UnsignedDeltaType delta = curr_value - prev_value;
const UnsignedDeltaType double_delta = delta - prev_delta;
prev_delta = delta;
prev_value = curr_value;
@ -136,9 +169,11 @@ UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest)
}
else
{
const auto sign = std::signbit(double_delta);
const auto abs_value = static_cast<UnsignedDeltaType>(std::abs(double_delta));
const auto write_spec = getWriteSpec(double_delta);
const SignedDeltaType signed_dd = static_cast<SignedDeltaType>(double_delta);
const auto sign = std::signbit(signed_dd);
// -1 shirnks dd down to fit into number of bits, and there can't be 0, so it is OK.
const auto abs_value = static_cast<UnsignedDeltaType>(std::abs(signed_dd) - 1);
const auto write_spec = getDeltaWriteSpec(signed_dd);
writer.writeBits(write_spec.prefix_bits, write_spec.prefix);
writer.writeBits(1, sign);
@ -151,22 +186,25 @@ UInt32 compressDataForType(const char * source, UInt32 source_size, char * dest)
return sizeof(items_count) + sizeof(prev_value) + sizeof(prev_delta) + buffer.count();
}
template <typename T, typename DeltaType>
template <typename ValueType>
void decompressDataForType(const char * source, UInt32 source_size, char * dest)
{
static_assert(std::is_unsigned_v<T> && std::is_signed_v<DeltaType>, "T must be unsigned, while DeltaType must be signed integer type.");
static_assert(std::is_unsigned_v<ValueType>, "ValueType must be unsigned.");
using UnsignedDeltaType = ValueType;
using SignedDeltaType = typename std::make_signed<UnsignedDeltaType>::type;
const char * source_end = source + source_size;
const UInt32 items_count = unalignedLoad<UInt32>(source);
source += sizeof(items_count);
T prev_value{};
DeltaType prev_delta{};
ValueType prev_value{};
UnsignedDeltaType prev_delta{};
if (source < source_end)
{
prev_value = unalignedLoad<T>(source);
unalignedStore<T>(dest, prev_value);
prev_value = unalignedLoad<ValueType>(source);
unalignedStore<ValueType>(dest, prev_value);
source += sizeof(prev_value);
dest += sizeof(prev_value);
@ -174,9 +212,9 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest)
if (source < source_end)
{
prev_delta = unalignedLoad<DeltaType>(source);
prev_value = prev_value + static_cast<T>(prev_delta);
unalignedStore<T>(dest, prev_value);
prev_delta = unalignedLoad<UnsignedDeltaType>(source);
prev_value = prev_value + static_cast<ValueType>(prev_delta);
unalignedStore<ValueType>(dest, prev_value);
source += sizeof(prev_delta);
dest += sizeof(prev_value);
@ -189,32 +227,35 @@ void decompressDataForType(const char * source, UInt32 source_size, char * dest)
// we have to keep track of items to avoid reading more that there is.
for (UInt32 items_read = 2; items_read < items_count && !reader.eof(); ++items_read)
{
DeltaType double_delta = 0;
UnsignedDeltaType double_delta = 0;
if (reader.readBit() == 1)
{
const UInt8 data_sizes[] = {6, 8, 11, 31, 63};
UInt8 i = 0;
for (; i < sizeof(data_sizes) - 1; ++i)
for (; i < sizeof(DELTA_SIZES) - 1; ++i)
{
const auto next_bit = reader.readBit();
if (next_bit == 0)
{
break;
}
}
const UInt8 sign = reader.readBit();
double_delta = static_cast<DeltaType>(reader.readBits(data_sizes[i]));
SignedDeltaType signed_dd = static_cast<SignedDeltaType>(reader.readBits(DELTA_SIZES[i] - 1) + 1);
if (sign)
{
double_delta *= -1;
signed_dd *= -1;
}
double_delta = static_cast<UnsignedDeltaType>(signed_dd);
}
// else if first bit is zero, no need to read more data.
const T curr_value = prev_value + static_cast<T>(prev_delta + double_delta);
unalignedStore<T>(dest, curr_value);
const UnsignedDeltaType delta = double_delta + prev_delta;
const ValueType curr_value = prev_value + delta;
unalignedStore<ValueType>(dest, curr_value);
dest += sizeof(curr_value);
prev_delta = static_cast<DeltaType>(curr_value - prev_value);
prev_delta = curr_value - prev_value;
prev_value = curr_value;
}
}
@ -267,19 +308,20 @@ UInt32 CompressionCodecDoubleDelta::doCompressData(const char * source, UInt32 s
memcpy(&dest[2], source, bytes_to_skip);
size_t start_pos = 2 + bytes_to_skip;
UInt32 compressed_size = 0;
switch (data_bytes_size)
{
case 1:
compressed_size = compressDataForType<UInt8, Int16>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
compressed_size = compressDataForType<UInt8>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
break;
case 2:
compressed_size = compressDataForType<UInt16, Int32>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
compressed_size = compressDataForType<UInt16>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
break;
case 4:
compressed_size = compressDataForType<UInt32, Int64>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
compressed_size = compressDataForType<UInt32>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
break;
case 8:
compressed_size = compressDataForType<UInt64, Int64>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
compressed_size = compressDataForType<UInt64>(&source[bytes_to_skip], source_size - bytes_to_skip, &dest[start_pos]);
break;
}
@ -296,16 +338,16 @@ void CompressionCodecDoubleDelta::doDecompressData(const char * source, UInt32 s
switch (bytes_size)
{
case 1:
decompressDataForType<UInt8, Int16>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
decompressDataForType<UInt8>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
break;
case 2:
decompressDataForType<UInt16, Int32>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
decompressDataForType<UInt16>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
break;
case 4:
decompressDataForType<UInt32, Int64>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
decompressDataForType<UInt32>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
break;
case 8:
decompressDataForType<UInt64, Int64>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
decompressDataForType<UInt64>(&source[2 + bytes_to_skip], source_size_no_header, &dest[bytes_to_skip]);
break;
}
}

Some files were not shown because too many files have changed in this diff Show More