Merge remote-tracking branch 'origin' into integration--7

This commit is contained in:
Yatsishin Ilya 2021-10-12 09:35:48 +03:00
commit 5f7c53923d
230 changed files with 3389 additions and 1825 deletions

3
.gitmodules vendored
View File

@ -249,3 +249,6 @@
[submodule "contrib/magic_enum"]
path = contrib/magic_enum
url = https://github.com/Neargye/magic_enum
[submodule "contrib/sysroot"]
path = contrib/sysroot
url = https://github.com/ClickHouse-Extras/sysroot.git

View File

@ -1,4 +1,4 @@
### ClickHouse release v21.10, 2021-10-08
### ClickHouse release v21.10, 2021-10-14
#### Backward Incompatible Change
@ -110,6 +110,7 @@
* Fix the issue that in case of some sophisticated query with column aliases identical to the names of expressions, bad cast may happen. This fixes [#25447](https://github.com/ClickHouse/ClickHouse/issues/25447). This fixes [#26914](https://github.com/ClickHouse/ClickHouse/issues/26914). This fix may introduce backward incompatibility: if there are different expressions with identical names, exception will be thrown. It may break some rare cases when `enable_optimize_predicate_expression` is set. [#26639](https://github.com/ClickHouse/ClickHouse/pull/26639) ([alexey-milovidov](https://github.com/alexey-milovidov)).
* Now, scalar subquery always returns `Nullable` result if it's type can be `Nullable`. It is needed because in case of empty subquery it's result should be `Null`. Previously, it was possible to get error about incompatible types (type deduction does not execute scalar subquery, and it could use not-nullable type). Scalar subquery with empty result which can't be converted to `Nullable` (like `Array` or `Tuple`) now throws error. Fixes [#25411](https://github.com/ClickHouse/ClickHouse/issues/25411). [#26423](https://github.com/ClickHouse/ClickHouse/pull/26423) ([Nikolai Kochetov](https://github.com/KochetovNicolai)).
* Introduce syntax for here documents. Example `SELECT $doc$ VALUE $doc$`. [#26671](https://github.com/ClickHouse/ClickHouse/pull/26671) ([Maksim Kita](https://github.com/kitaisreal)). This change is backward incompatible if in query there are identifiers that contain `$` [#28768](https://github.com/ClickHouse/ClickHouse/issues/28768).
* Now indices can handle Nullable types, including `isNull` and `isNotNull`. [#12433](https://github.com/ClickHouse/ClickHouse/pull/12433) and [#12455](https://github.com/ClickHouse/ClickHouse/pull/12455) ([Amos Bird](https://github.com/amosbird)) and [#27250](https://github.com/ClickHouse/ClickHouse/pull/27250) ([Azat Khuzhin](https://github.com/azat)). But this was done with on-disk format changes, and even though new server can read old data, old server cannot. Also, in case you have `MINMAX` data skipping indices, you may get `Data after mutation/merge is not byte-identical` error, since new index will have `.idx2` extension while before it was `.idx`. That said, that you should not delay updating all existing replicas, in this case, otherwise, if old replica (<21.9) will download data from new replica with 21.9+ it will not be able to apply index for downloaded part.
#### New Feature
@ -179,7 +180,6 @@
* Add setting `log_formatted_queries` to log additional formatted query into `system.query_log`. It's useful for normalized query analysis because functions like `normalizeQuery` and `normalizeQueryKeepNames` don't parse/format queries in order to achieve better performance. [#27380](https://github.com/ClickHouse/ClickHouse/pull/27380) ([Amos Bird](https://github.com/amosbird)).
* Add two settings `max_hyperscan_regexp_length` and `max_hyperscan_regexp_total_length` to prevent huge regexp being used in hyperscan related functions, such as `multiMatchAny`. [#27378](https://github.com/ClickHouse/ClickHouse/pull/27378) ([Amos Bird](https://github.com/amosbird)).
* Memory consumed by bitmap aggregate functions now is taken into account for memory limits. This closes [#26555](https://github.com/ClickHouse/ClickHouse/issues/26555). [#27252](https://github.com/ClickHouse/ClickHouse/pull/27252) ([alexey-milovidov](https://github.com/alexey-milovidov)).
* Add new index data skipping minmax index format for proper Nullable support. [#27250](https://github.com/ClickHouse/ClickHouse/pull/27250) ([Azat Khuzhin](https://github.com/azat)).
* Add 10 seconds cache for S3 proxy resolver. [#27216](https://github.com/ClickHouse/ClickHouse/pull/27216) ([ianton-ru](https://github.com/ianton-ru)).
* Split global mutex into individual regexp construction. This helps avoid huge regexp construction blocking other related threads. [#27211](https://github.com/ClickHouse/ClickHouse/pull/27211) ([Amos Bird](https://github.com/amosbird)).
* Support schema for PostgreSQL database engine. Closes [#27166](https://github.com/ClickHouse/ClickHouse/issues/27166). [#27198](https://github.com/ClickHouse/ClickHouse/pull/27198) ([Kseniia Sumarokova](https://github.com/kssenii)).
@ -234,7 +234,6 @@
* Fix multiple block insertion into distributed table with `insert_distributed_one_random_shard = 1`. This is a marginal feature. Mark as improvement. [#23140](https://github.com/ClickHouse/ClickHouse/pull/23140) ([Amos Bird](https://github.com/amosbird)).
* Support `LowCardinality` and `FixedString` keys/values for `Map` type. [#21543](https://github.com/ClickHouse/ClickHouse/pull/21543) ([hexiaoting](https://github.com/hexiaoting)).
* Enable reloading of local disk config. [#19526](https://github.com/ClickHouse/ClickHouse/pull/19526) ([taiyang-li](https://github.com/taiyang-li)).
* Now KeyConditions can correctly skip nullable keys, including `isNull` and `isNotNull`. https://github.com/ClickHouse/ClickHouse/pull/12433. [#12455](https://github.com/ClickHouse/ClickHouse/pull/12455) ([Amos Bird](https://github.com/amosbird)).
#### Bug Fix

View File

@ -336,6 +336,10 @@ if (COMPILER_GCC OR COMPILER_CLANG)
set(COMPILER_FLAGS "${COMPILER_FLAGS} -falign-functions=32")
endif ()
if (COMPILER_GCC)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fcoroutines")
endif ()
# Compiler-specific coverage flags e.g. -fcoverage-mapping for gcc
option(WITH_COVERAGE "Profile the resulting binary/binaries" OFF)

View File

@ -1,4 +1,4 @@
#include <cstddef>
#include <stddef.h>
#include <emmintrin.h>

View File

@ -13,7 +13,6 @@ endif ()
if ((ARCH_ARM AND NOT ARCH_AARCH64) OR ARCH_I386)
message (FATAL_ERROR "32bit platforms are not supported")
endif ()
if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(ppc64le.*|PPC64LE.*)")
set (ARCH_PPC64LE 1)
endif ()

View File

@ -1,14 +1,7 @@
if (CMAKE_VERSION VERSION_GREATER_EQUAL "3.12")
macro(add_glob cur_list)
file(GLOB __tmp RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} CONFIGURE_DEPENDS ${ARGN})
list(APPEND ${cur_list} ${__tmp})
endmacro()
else ()
macro(add_glob cur_list)
file(GLOB __tmp RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} ${ARGN})
list(APPEND ${cur_list} ${__tmp})
endmacro()
endif ()
macro(add_glob cur_list)
file(GLOB __tmp RELATIVE ${CMAKE_CURRENT_SOURCE_DIR} ${ARGN})
list(APPEND ${cur_list} ${__tmp})
endmacro()
macro(add_headers_and_sources prefix common_path)
add_glob(${prefix}_headers ${CMAKE_CURRENT_SOURCE_DIR} ${common_path}/*.h)

View File

@ -64,6 +64,7 @@ if (NOT OPENLDAP_FOUND AND NOT MISSING_INTERNAL_LDAP_LIBRARY)
( "${_system_name}" STREQUAL "linux" AND "${_system_processor}" STREQUAL "aarch64" ) OR
( "${_system_name}" STREQUAL "linux" AND "${_system_processor}" STREQUAL "ppc64le" ) OR
( "${_system_name}" STREQUAL "freebsd" AND "${_system_processor}" STREQUAL "x86_64" ) OR
( "${_system_name}" STREQUAL "freebsd" AND "${_system_processor}" STREQUAL "aarch64" ) OR
( "${_system_name}" STREQUAL "darwin" AND "${_system_processor}" STREQUAL "x86_64" ) OR
( "${_system_name}" STREQUAL "darwin" AND "${_system_processor}" STREQUAL "aarch64" )
)

View File

@ -0,0 +1,22 @@
set (CMAKE_SYSTEM_NAME "FreeBSD")
set (CMAKE_SYSTEM_PROCESSOR "aarch64")
set (CMAKE_C_COMPILER_TARGET "aarch64-unknown-freebsd12")
set (CMAKE_CXX_COMPILER_TARGET "aarch64-unknown-freebsd12")
set (CMAKE_ASM_COMPILER_TARGET "aarch64-unknown-freebsd12")
set (CMAKE_SYSROOT "${CMAKE_CURRENT_LIST_DIR}/../toolchain/freebsd-aarch64")
set (CMAKE_TRY_COMPILE_TARGET_TYPE STATIC_LIBRARY) # disable linkage check - it doesn't work in CMake
set (CMAKE_AR "/usr/bin/ar" CACHE FILEPATH "" FORCE)
set (CMAKE_RANLIB "/usr/bin/ranlib" CACHE FILEPATH "" FORCE)
set (LINKER_NAME "ld.lld" CACHE STRING "" FORCE)
set (CMAKE_EXE_LINKER_FLAGS_INIT "-fuse-ld=lld")
set (CMAKE_SHARED_LINKER_FLAGS_INIT "-fuse-ld=lld")
set (HAS_PRE_1970_EXITCODE "0" CACHE STRING "Result from TRY_RUN" FORCE)
set (HAS_PRE_1970_EXITCODE__TRYRUN_OUTPUT "" CACHE STRING "Output from TRY_RUN" FORCE)
set (HAS_POST_2038_EXITCODE "0" CACHE STRING "Result from TRY_RUN" FORCE)
set (HAS_POST_2038_EXITCODE__TRYRUN_OUTPUT "" CACHE STRING "Output from TRY_RUN" FORCE)

View File

@ -1,17 +1,24 @@
set (CMAKE_TRY_COMPILE_TARGET_TYPE STATIC_LIBRARY)
set (CMAKE_SYSTEM_NAME "Linux")
set (CMAKE_SYSTEM_PROCESSOR "aarch64")
set (CMAKE_C_COMPILER_TARGET "aarch64-linux-gnu")
set (CMAKE_CXX_COMPILER_TARGET "aarch64-linux-gnu")
set (CMAKE_ASM_COMPILER_TARGET "aarch64-linux-gnu")
set (CMAKE_SYSROOT "${CMAKE_CURRENT_LIST_DIR}/../toolchain/linux-aarch64/aarch64-linux-gnu/libc")
# We don't use compiler from toolchain because it's gcc-8, and we provide support only for gcc-9.
set (CMAKE_AR "${CMAKE_CURRENT_LIST_DIR}/../toolchain/linux-aarch64/bin/aarch64-linux-gnu-ar" CACHE FILEPATH "" FORCE)
set (CMAKE_RANLIB "${CMAKE_CURRENT_LIST_DIR}/../toolchain/linux-aarch64/bin/aarch64-linux-gnu-ranlib" CACHE FILEPATH "" FORCE)
set (TOOLCHAIN_PATH "${CMAKE_CURRENT_LIST_DIR}/../../contrib/sysroot/linux-aarch64")
set (CMAKE_C_FLAGS_INIT "${CMAKE_C_FLAGS} --gcc-toolchain=${CMAKE_CURRENT_LIST_DIR}/../toolchain/linux-aarch64")
set (CMAKE_CXX_FLAGS_INIT "${CMAKE_CXX_FLAGS} --gcc-toolchain=${CMAKE_CURRENT_LIST_DIR}/../toolchain/linux-aarch64")
set (CMAKE_ASM_FLAGS_INIT "${CMAKE_ASM_FLAGS} --gcc-toolchain=${CMAKE_CURRENT_LIST_DIR}/../toolchain/linux-aarch64")
set (CMAKE_SYSROOT "${TOOLCHAIN_PATH}/aarch64-linux-gnu/libc")
find_program (LLVM_AR_PATH NAMES "llvm-ar" "llvm-ar-13" "llvm-ar-12" "llvm-ar-11" "llvm-ar-10" "llvm-ar-9" "llvm-ar-8")
find_program (LLVM_RANLIB_PATH NAMES "llvm-ranlib" "llvm-ranlib-13" "llvm-ranlib-12" "llvm-ranlib-11" "llvm-ranlib-10" "llvm-ranlib-9")
set (CMAKE_AR "${LLVM_AR_PATH}" CACHE FILEPATH "" FORCE)
set (CMAKE_RANLIB "${LLVM_RANLIB_PATH}" CACHE FILEPATH "" FORCE)
set (CMAKE_C_FLAGS_INIT "${CMAKE_C_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_CXX_FLAGS_INIT "${CMAKE_CXX_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_ASM_FLAGS_INIT "${CMAKE_ASM_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (LINKER_NAME "ld.lld" CACHE STRING "" FORCE)

View File

@ -0,0 +1,32 @@
set (CMAKE_TRY_COMPILE_TARGET_TYPE STATIC_LIBRARY)
set (CMAKE_SYSTEM_NAME "Linux")
set (CMAKE_SYSTEM_PROCESSOR "x86_64")
set (CMAKE_C_COMPILER_TARGET "x86_64-linux-gnu")
set (CMAKE_CXX_COMPILER_TARGET "x86_64-linux-gnu")
set (CMAKE_ASM_COMPILER_TARGET "x86_64-linux-gnu")
set (TOOLCHAIN_PATH "${CMAKE_CURRENT_LIST_DIR}/../../contrib/sysroot/linux-x86_64")
set (CMAKE_SYSROOT "${TOOLCHAIN_PATH}/x86_64-linux-gnu/libc")
find_program (LLVM_AR_PATH NAMES "llvm-ar" "llvm-ar-13" "llvm-ar-12" "llvm-ar-11" "llvm-ar-10" "llvm-ar-9" "llvm-ar-8")
find_program (LLVM_RANLIB_PATH NAMES "llvm-ranlib" "llvm-ranlib-13" "llvm-ranlib-12" "llvm-ranlib-11" "llvm-ranlib-10" "llvm-ranlib-9")
set (CMAKE_AR "${LLVM_AR_PATH}" CACHE FILEPATH "" FORCE)
set (CMAKE_RANLIB "${LLVM_RANLIB_PATH}" CACHE FILEPATH "" FORCE)
set (CMAKE_C_FLAGS_INIT "${CMAKE_C_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_CXX_FLAGS_INIT "${CMAKE_CXX_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_ASM_FLAGS_INIT "${CMAKE_ASM_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (LINKER_NAME "ld.lld" CACHE STRING "" FORCE)
set (CMAKE_EXE_LINKER_FLAGS_INIT "-fuse-ld=lld")
set (CMAKE_SHARED_LINKER_FLAGS_INIT "-fuse-ld=lld")
set (HAS_PRE_1970_EXITCODE "0" CACHE STRING "Result from TRY_RUN" FORCE)
set (HAS_PRE_1970_EXITCODE__TRYRUN_OUTPUT "" CACHE STRING "Output from TRY_RUN" FORCE)
set (HAS_POST_2038_EXITCODE "0" CACHE STRING "Result from TRY_RUN" FORCE)
set (HAS_POST_2038_EXITCODE__TRYRUN_OUTPUT "" CACHE STRING "Output from TRY_RUN" FORCE)

View File

@ -1,2 +0,0 @@
wget 'https://developer.arm.com/-/media/Files/downloads/gnu-a/8.3-2019.03/binrel/gcc-arm-8.3-2019.03-x86_64-aarch64-linux-gnu.tar.xz?revision=2e88a73f-d233-4f96-b1f4-d8b36e9bb0b9&la=en' -O gcc-arm-8.3-2019.03-x86_64-aarch64-linux-gnu.tar.xz
tar xJf gcc-arm-8.3-2019.03-x86_64-aarch64-linux-gnu.tar.xz --strip-components=1

View File

@ -89,4 +89,3 @@ if (LINKER_NAME)
message(STATUS "Using custom linker by name: ${LINKER_NAME}")
endif ()

View File

@ -163,7 +163,7 @@ endif ()
if(USE_INTERNAL_SNAPPY_LIBRARY)
set(SNAPPY_BUILD_TESTS 0 CACHE INTERNAL "")
add_subdirectory(snappy)
add_subdirectory(snappy-cmake)
set (SNAPPY_INCLUDE_DIR "${ClickHouse_SOURCE_DIR}/contrib/snappy")
endif()
@ -215,6 +215,7 @@ function(add_llvm)
# Do not adjust RPATH in llvm, since then it will not be able to find libcxx/libcxxabi/libunwind
set (CMAKE_INSTALL_RPATH "ON")
set (LLVM_COMPILER_CHECKED 1 CACHE INTERNAL "")
set (LLVM_ENABLE_EH 1 CACHE INTERNAL "")
set (LLVM_ENABLE_RTTI 1 CACHE INTERNAL "")
set (LLVM_ENABLE_PIC 0 CACHE INTERNAL "")

View File

@ -54,7 +54,7 @@ target_link_libraries (${THRIFT_LIBRARY} PRIVATE boost::headers_only)
set(ORC_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/orc/c++")
set(ORC_INCLUDE_DIR "${ORC_SOURCE_DIR}/include")
set(ORC_SOURCE_SRC_DIR "${ORC_SOURCE_DIR}/src")
set(ORC_SOURCE_WRAP_DIR "${ORC_SOURCE_DIR}/wrap")
# set(ORC_SOURCE_WRAP_DIR "${ORC_SOURCE_DIR}/wrap")
set(ORC_BUILD_SRC_DIR "${CMAKE_CURRENT_BINARY_DIR}/../orc/c++/src")
set(ORC_BUILD_INCLUDE_DIR "${CMAKE_CURRENT_BINARY_DIR}/../orc/c++/include")
@ -101,7 +101,14 @@ if (CMAKE_CXX_COMPILER_ID STREQUAL "AppleClang")
set(CXX11_FLAGS "-std=c++0x")
endif ()
include("${ClickHouse_SOURCE_DIR}/contrib/orc/cmake_modules/CheckSourceCompiles.cmake")
set (ORC_CXX_HAS_INITIALIZER_LIST 1)
set (ORC_CXX_HAS_NOEXCEPT 1)
set (ORC_CXX_HAS_NULLPTR 1)
set (ORC_CXX_HAS_OVERRIDE 1)
set (ORC_CXX_HAS_UNIQUE_PTR 1)
set (ORC_CXX_HAS_CSTDINT 1)
set (ORC_CXX_HAS_THREAD_LOCAL 1)
include(orc_check.cmake)
configure_file("${ORC_INCLUDE_DIR}/orc/orc-config.hh.in" "${ORC_BUILD_INCLUDE_DIR}/orc/orc-config.hh")
configure_file("${ORC_SOURCE_SRC_DIR}/Adaptor.hh.in" "${ORC_BUILD_INCLUDE_DIR}/Adaptor.hh")

View File

@ -1,130 +1,14 @@
# Not changed part of contrib/orc/c++/src/CMakeLists.txt
set (HAS_PREAD 1)
set (HAS_STRPTIME 1)
set (HAS_STOLL 1)
set (INT64_IS_LL 1)
set (HAS_DIAGNOSTIC_PUSH 1)
set (HAS_STD_ISNAN 1)
set (HAS_STD_MUTEX 1)
set (NEEDS_REDUNDANT_MOVE 1)
set (HAS_PRE_1970 1)
set (HAS_POST_2038 1)
set (NEEDS_Z_PREFIX 0)
INCLUDE(CheckCXXSourceCompiles)
CHECK_CXX_SOURCE_COMPILES("
#include<fcntl.h>
#include<unistd.h>
int main(int,char*[]){
int f = open(\"/x/y\", O_RDONLY);
char buf[100];
return pread(f, buf, 100, 1000) == 0;
}"
HAS_PREAD
)
CHECK_CXX_SOURCE_COMPILES("
#include<time.h>
int main(int,char*[]){
struct tm time2020;
return !strptime(\"2020-02-02 12:34:56\", \"%Y-%m-%d %H:%M:%S\", &time2020);
}"
HAS_STRPTIME
)
CHECK_CXX_SOURCE_COMPILES("
#include<string>
int main(int,char* argv[]){
return static_cast<int>(std::stoll(argv[0]));
}"
HAS_STOLL
)
CHECK_CXX_SOURCE_COMPILES("
#include<stdint.h>
#include<stdio.h>
int main(int,char*[]){
int64_t x = 1; printf(\"%lld\",x);
}"
INT64_IS_LL
)
CHECK_CXX_SOURCE_COMPILES("
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored \"-Wdeprecated\"
#pragma clang diagnostic pop
#elif defined(__GNUC__)
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored \"-Wdeprecated\"
#pragma GCC diagnostic pop
#elif defined(_MSC_VER)
#pragma warning( push )
#pragma warning( disable : 4996 )
#pragma warning( pop )
#else
unknownCompiler!
#endif
int main(int, char *[]) {}"
HAS_DIAGNOSTIC_PUSH
)
CHECK_CXX_SOURCE_COMPILES("
#include<cmath>
int main(int, char *[]) {
return std::isnan(1.0f);
}"
HAS_STD_ISNAN
)
CHECK_CXX_SOURCE_COMPILES("
#include<mutex>
int main(int, char *[]) {
std::mutex test_mutex;
std::lock_guard<std::mutex> lock_mutex(test_mutex);
}"
HAS_STD_MUTEX
)
CHECK_CXX_SOURCE_COMPILES("
#include<string>
std::string func() {
std::string var = \"test\";
return std::move(var);
}
int main(int, char *[]) {}"
NEEDS_REDUNDANT_MOVE
)
INCLUDE(CheckCXXSourceRuns)
CHECK_CXX_SOURCE_RUNS("
#include<time.h>
int main(int, char *[]) {
time_t t = -14210715; // 1969-07-20 12:34:45
struct tm *ptm = gmtime(&t);
return !(ptm && ptm->tm_year == 69);
}"
HAS_PRE_1970
)
CHECK_CXX_SOURCE_RUNS("
#include<stdlib.h>
#include<time.h>
int main(int, char *[]) {
setenv(\"TZ\", \"America/Los_Angeles\", 1);
tzset();
struct tm time2037;
struct tm time2038;
strptime(\"2037-05-05 12:34:56\", \"%Y-%m-%d %H:%M:%S\", &time2037);
strptime(\"2038-05-05 12:34:56\", \"%Y-%m-%d %H:%M:%S\", &time2038);
return mktime(&time2038) - mktime(&time2037) != 31536000;
}"
HAS_POST_2038
)
set(CMAKE_REQUIRED_INCLUDES ${ZLIB_INCLUDE_DIR})
set(CMAKE_REQUIRED_LIBRARIES zlib)
CHECK_CXX_SOURCE_COMPILES("
#define Z_PREFIX
#include<zlib.h>
z_stream strm;
int main(int, char *[]) {
deflateReset(&strm);
}"
NEEDS_Z_PREFIX
)
# See https://cmake.org/cmake/help/v3.14/policy/CMP0075.html. Without unsetting it breaks thrift.
set(CMAKE_REQUIRED_INCLUDES)
set(CMAKE_REQUIRED_LIBRARIES)

2
contrib/cctz vendored

@ -1 +1 @@
Subproject commit c0f1bcb97fd2782f7c3f972fadd5aad5affac4b8
Subproject commit 9edd0861d8328b2ae77e8fb5f4d7dcd1cf33b42b

2
contrib/grpc vendored

@ -1 +1 @@
Subproject commit 60c986e15cae70aade721d26badabab1f822fdd6
Subproject commit 7eac189a6badddac593580ec2ad1478bd2656fc7

View File

@ -187,7 +187,7 @@ function(protobuf_generate_grpc)
add_custom_command(
OUTPUT ${_generated_srcs}
COMMAND protobuf::protoc
COMMAND $<TARGET_FILE:protobuf::protoc>
ARGS --${protobuf_generate_grpc_LANGUAGE}_out ${_dll_export_decl}${protobuf_generate_grpc_PROTOC_OUT_DIR}
--grpc_out ${_dll_export_decl}${protobuf_generate_grpc_PROTOC_OUT_DIR}
--plugin=protoc-gen-grpc=$<TARGET_FILE:${protobuf_generate_grpc_PLUGIN}>
@ -204,4 +204,4 @@ function(protobuf_generate_grpc)
if(protobuf_generate_grpc_TARGET)
target_sources(${protobuf_generate_grpc_TARGET} PRIVATE ${_generated_srcs_all})
endif()
endfunction()
endfunction()

View File

@ -161,7 +161,7 @@
* JEMALLOC_DSS enables use of sbrk(2) to allocate extents from the data storage
* segment (DSS).
*/
#define JEMALLOC_DSS
/* #undef JEMALLOC_DSS */
/* Support memory filling (junk/zero). */
#define JEMALLOC_FILL

View File

@ -1,6 +1,6 @@
find_program(AWK_PROGRAM awk)
if(NOT AWK_PROGRAM)
message(FATAL_ERROR "You need the awk program to build ClickHouse with krb5 enabled.")
message(FATAL_ERROR "You need the awk program to build ClickHouse with krb5 enabled.")
endif()
set(KRB5_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/krb5/src")

@ -1 +1 @@
Subproject commit a720b7105a610acbd7427eea475a5b6810c151eb
Subproject commit aa5429bf67a346e48ad60efd88bcefc286644bf3

2
contrib/libcxx vendored

@ -1 +1 @@
Subproject commit 2fa892f69acbaa40f8a18c6484854a6183a34482
Subproject commit 61e60294b1de01483caa9f5d00f437c99b674de6

View File

@ -1,10 +0,0 @@
#include <exception>
#include <stdexcept>
int main() {
try {
throw 2;
} catch (int) {
std::throw_with_nested(std::runtime_error("test"));
}
}

View File

@ -1,7 +0,0 @@
#include <chrono>
using std::chrono::steady_clock;
void foo(const steady_clock &clock) {
return;
}

View File

@ -1,4 +1,4 @@
OPTION(ENABLE_SSE "enable SSE4.2 buildin function" ON)
OPTION(ENABLE_SSE "enable SSE4.2 builtin function" ON)
INCLUDE (CheckFunctionExists)
CHECK_FUNCTION_EXISTS(dladdr HAVE_DLADDR)
@ -21,30 +21,21 @@ ADD_DEFINITIONS(-D_GNU_SOURCE)
ADD_DEFINITIONS(-D_GLIBCXX_USE_NANOSLEEP)
TRY_COMPILE(STRERROR_R_RETURN_INT
${CMAKE_CURRENT_BINARY_DIR}
"${HDFS3_ROOT_DIR}/CMake/CMakeTestCompileStrerror.cpp"
${CMAKE_CURRENT_BINARY_DIR}
"${CMAKE_CURRENT_SOURCE_DIR}/CMake/CMakeTestCompileStrerror.c"
CMAKE_FLAGS "-DCMAKE_CXX_LINK_EXECUTABLE='echo not linking now...'"
OUTPUT_VARIABLE OUTPUT)
OUTPUT_VARIABLE OUTPUT)
MESSAGE(STATUS "Checking whether strerror_r returns an int")
IF(STRERROR_R_RETURN_INT)
MESSAGE(STATUS "Checking whether strerror_r returns an int -- yes")
MESSAGE(STATUS "Checking whether strerror_r returns an int -- yes")
ELSE(STRERROR_R_RETURN_INT)
MESSAGE(STATUS "Checking whether strerror_r returns an int -- no")
MESSAGE(STATUS "Checking whether strerror_r returns an int -- no")
ENDIF(STRERROR_R_RETURN_INT)
TRY_COMPILE(HAVE_STEADY_CLOCK
${CMAKE_CURRENT_BINARY_DIR}
"${HDFS3_ROOT_DIR}/CMake/CMakeTestCompileSteadyClock.cpp"
CMAKE_FLAGS "-DCMAKE_CXX_LINK_EXECUTABLE='echo not linking now...'"
OUTPUT_VARIABLE OUTPUT)
TRY_COMPILE(HAVE_NESTED_EXCEPTION
${CMAKE_CURRENT_BINARY_DIR}
"${HDFS3_ROOT_DIR}/CMake/CMakeTestCompileNestedException.cpp"
CMAKE_FLAGS "-DCMAKE_CXX_LINK_EXECUTABLE='echo not linking now...'"
OUTPUT_VARIABLE OUTPUT)
set(HAVE_STEADY_CLOCK 1)
set(HAVE_NESTED_EXCEPTION 1)
SET(HAVE_BOOST_CHRONO 0)
SET(HAVE_BOOST_ATOMIC 0)

2
contrib/llvm vendored

@ -1 +1 @@
Subproject commit f30bbecef78b75b527e257c1304d0be2f2f95975
Subproject commit 20607e61728e97c969e536644c3c0c1bb1a50672

View File

@ -0,0 +1,63 @@
/* include/lber_types.h. Generated from lber_types.hin by configure. */
/* $OpenLDAP$ */
/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
*
* Copyright 1998-2020 The OpenLDAP Foundation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted only as authorized by the OpenLDAP
* Public License.
*
* A copy of this license is available in file LICENSE in the
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>.
*/
/*
* LBER types
*/
#ifndef _LBER_TYPES_H
#define _LBER_TYPES_H
#include <ldap_cdefs.h>
LDAP_BEGIN_DECL
/* LBER boolean, enum, integers (32 bits or larger) */
#define LBER_INT_T int
/* LBER tags (32 bits or larger) */
#define LBER_TAG_T long
/* LBER socket descriptor */
#define LBER_SOCKET_T int
/* LBER lengths (32 bits or larger) */
#define LBER_LEN_T long
/* ------------------------------------------------------------ */
/* booleans, enumerations, and integers */
typedef LBER_INT_T ber_int_t;
/* signed and unsigned versions */
typedef signed LBER_INT_T ber_sint_t;
typedef unsigned LBER_INT_T ber_uint_t;
/* tags */
typedef unsigned LBER_TAG_T ber_tag_t;
/* "socket" descriptors */
typedef LBER_SOCKET_T ber_socket_t;
/* lengths */
typedef unsigned LBER_LEN_T ber_len_t;
/* signed lengths */
typedef signed LBER_LEN_T ber_slen_t;
LDAP_END_DECL
#endif /* _LBER_TYPES_H */

View File

@ -0,0 +1,74 @@
/* include/ldap_config.h. Generated from ldap_config.hin by configure. */
/* $OpenLDAP$ */
/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
*
* Copyright 1998-2020 The OpenLDAP Foundation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted only as authorized by the OpenLDAP
* Public License.
*
* A copy of this license is available in file LICENSE in the
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>.
*/
/*
* This file works in conjunction with OpenLDAP configure system.
* If you do no like the values below, adjust your configure options.
*/
#ifndef _LDAP_CONFIG_H
#define _LDAP_CONFIG_H
/* directory separator */
#ifndef LDAP_DIRSEP
#ifndef _WIN32
#define LDAP_DIRSEP "/"
#else
#define LDAP_DIRSEP "\\"
#endif
#endif
/* directory for temporary files */
#if defined(_WIN32)
# define LDAP_TMPDIR "C:\\." /* we don't have much of a choice */
#elif defined( _P_tmpdir )
# define LDAP_TMPDIR _P_tmpdir
#elif defined( P_tmpdir )
# define LDAP_TMPDIR P_tmpdir
#elif defined( _PATH_TMPDIR )
# define LDAP_TMPDIR _PATH_TMPDIR
#else
# define LDAP_TMPDIR LDAP_DIRSEP "tmp"
#endif
/* directories */
#ifndef LDAP_BINDIR
#define LDAP_BINDIR "/tmp/ldap-prefix/bin"
#endif
#ifndef LDAP_SBINDIR
#define LDAP_SBINDIR "/tmp/ldap-prefix/sbin"
#endif
#ifndef LDAP_DATADIR
#define LDAP_DATADIR "/tmp/ldap-prefix/share/openldap"
#endif
#ifndef LDAP_SYSCONFDIR
#define LDAP_SYSCONFDIR "/tmp/ldap-prefix/etc/openldap"
#endif
#ifndef LDAP_LIBEXECDIR
#define LDAP_LIBEXECDIR "/tmp/ldap-prefix/libexec"
#endif
#ifndef LDAP_MODULEDIR
#define LDAP_MODULEDIR "/tmp/ldap-prefix/libexec/openldap"
#endif
#ifndef LDAP_RUNDIR
#define LDAP_RUNDIR "/tmp/ldap-prefix/var"
#endif
#ifndef LDAP_LOCALEDIR
#define LDAP_LOCALEDIR ""
#endif
#endif /* _LDAP_CONFIG_H */

View File

@ -0,0 +1,61 @@
/* include/ldap_features.h. Generated from ldap_features.hin by configure. */
/* $OpenLDAP$ */
/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
*
* Copyright 1998-2020 The OpenLDAP Foundation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted only as authorized by the OpenLDAP
* Public License.
*
* A copy of this license is available in file LICENSE in the
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>.
*/
/*
* LDAP Features
*/
#ifndef _LDAP_FEATURES_H
#define _LDAP_FEATURES_H 1
/* OpenLDAP API version macros */
#define LDAP_VENDOR_VERSION 20501
#define LDAP_VENDOR_VERSION_MAJOR 2
#define LDAP_VENDOR_VERSION_MINOR 5
#define LDAP_VENDOR_VERSION_PATCH X
/*
** WORK IN PROGRESS!
**
** OpenLDAP reentrancy/thread-safeness should be dynamically
** checked using ldap_get_option().
**
** The -lldap implementation is not thread-safe.
**
** The -lldap_r implementation is:
** LDAP_API_FEATURE_THREAD_SAFE (basic thread safety)
** but also be:
** LDAP_API_FEATURE_SESSION_THREAD_SAFE
** LDAP_API_FEATURE_OPERATION_THREAD_SAFE
**
** The preprocessor flag LDAP_API_FEATURE_X_OPENLDAP_THREAD_SAFE
** can be used to determine if -lldap_r is available at compile
** time. You must define LDAP_THREAD_SAFE if and only if you
** link with -lldap_r.
**
** If you fail to define LDAP_THREAD_SAFE when linking with
** -lldap_r or define LDAP_THREAD_SAFE when linking with -lldap,
** provided header definitions and declarations may be incorrect.
**
*/
/* is -lldap_r available or not */
#define LDAP_API_FEATURE_X_OPENLDAP_THREAD_SAFE 1
/* LDAP v2 Referrals */
/* #undef LDAP_API_FEATURE_X_OPENLDAP_V2_REFERRALS */
#endif /* LDAP_FEATURES */

File diff suppressed because it is too large Load Diff

2
contrib/protobuf vendored

@ -1 +1 @@
Subproject commit 75601841d172c73ae6bf4ce8121f42b875cdbabd
Subproject commit c1c5d02026059f4c3cb51aaa08e82288d3e08b89

View File

@ -181,11 +181,11 @@ function(protobuf_generate)
add_custom_command(
OUTPUT ${_generated_srcs}
COMMAND protobuf::protoc
COMMAND $<TARGET_FILE:protobuf::protoc>
ARGS --${protobuf_generate_LANGUAGE}_out ${_dll_export_decl}${protobuf_generate_PROTOC_OUT_DIR} ${_dll_desc_out} ${_protobuf_include_path} ${_abs_file}
DEPENDS ${_abs_file} protobuf::protoc
COMMENT "Running ${protobuf_generate_LANGUAGE} protocol buffer compiler on ${_proto}"
VERBATIM )
VERBATIM)
endforeach()
set_source_files_properties(${_generated_srcs_all} PROPERTIES GENERATED TRUE)

View File

@ -106,18 +106,6 @@ if(NOT MSVC)
set(CMAKE_REQUIRED_FLAGS "-msse4.2 -mpclmul")
endif()
CHECK_CXX_SOURCE_COMPILES("
#include <cstdint>
#include <nmmintrin.h>
#include <wmmintrin.h>
int main() {
volatile uint32_t x = _mm_crc32_u32(0, 0);
const auto a = _mm_set_epi64x(0, 0);
const auto b = _mm_set_epi64x(0, 0);
const auto c = _mm_clmulepi64_si128(a, b, 0x00);
auto d = _mm_cvtsi128_si64(c);
}
" HAVE_SSE42)
unset(CMAKE_REQUIRED_FLAGS)
if(HAVE_SSE42)
add_definitions(-DHAVE_SSE42)
@ -126,14 +114,7 @@ elseif(FORCE_SSE42)
message(FATAL_ERROR "FORCE_SSE42=ON but unable to compile with SSE4.2 enabled")
endif()
CHECK_CXX_SOURCE_COMPILES("
#if defined(_MSC_VER) && !defined(__thread)
#define __thread __declspec(thread)
#endif
int main() {
static __thread int tls;
}
" HAVE_THREAD_LOCAL)
set (HAVE_THREAD_LOCAL 1)
if(HAVE_THREAD_LOCAL)
add_definitions(-DROCKSDB_SUPPORT_THREAD_LOCAL)
endif()
@ -174,7 +155,7 @@ endif()
option(WITH_FALLOCATE "build with fallocate" ON)
if(WITH_FALLOCATE)
CHECK_CXX_SOURCE_COMPILES("
CHECK_C_SOURCE_COMPILES("
#include <fcntl.h>
#include <linux/falloc.h>
int main() {
@ -187,7 +168,7 @@ int main() {
endif()
endif()
CHECK_CXX_SOURCE_COMPILES("
CHECK_C_SOURCE_COMPILES("
#include <fcntl.h>
int main() {
int fd = open(\"/dev/null\", 0);
@ -198,7 +179,7 @@ if(HAVE_SYNC_FILE_RANGE_WRITE)
add_definitions(-DROCKSDB_RANGESYNC_PRESENT)
endif()
CHECK_CXX_SOURCE_COMPILES("
CHECK_C_SOURCE_COMPILES("
#include <pthread.h>
int main() {
(void) PTHREAD_MUTEX_ADAPTIVE_NP;
@ -228,6 +209,11 @@ if(HAVE_AUXV_GETAUXVAL)
add_definitions(-DROCKSDB_AUXV_GETAUXVAL_PRESENT)
endif()
check_cxx_symbol_exists(elf_aux_info sys/auxv.h HAVE_ELF_AUX_INFO)
if(HAVE_ELF_AUX_INFO)
add_definitions(-DROCKSDB_AUXV_GETAUXVAL_PRESENT)
endif()
include_directories(${ROCKSDB_SOURCE_DIR})
include_directories("${ROCKSDB_SOURCE_DIR}/include")
if(WITH_FOLLY_DISTRIBUTED_MUTEX)

View File

@ -0,0 +1,47 @@
set (SOURCE_DIR "${CMAKE_SOURCE_DIR}/contrib/snappy")
set(SNAPPY_IS_BIG_ENDIAN 0)
include(CheckIncludeFile)
check_include_file("byteswap.h" HAVE_BYTESWAP_H)
check_include_file("sys/endian.h" HAVE_SYS_ENDIAN_H)
check_include_file("sys/mman.h" HAVE_SYS_MMAN_H)
check_include_file("sys/resource.h" HAVE_SYS_RESOURCE_H)
check_include_file("sys/time.h" HAVE_SYS_TIME_H)
check_include_file("sys/uio.h" HAVE_SYS_UIO_H)
check_include_file("unistd.h" HAVE_UNISTD_H)
check_include_file("windows.h" HAVE_WINDOWS_H)
set (HAVE_BUILTIN_EXPECT 1)
set (HAVE_BUILTIN_CTZ 1)
set (HAVE_FUNC_MMAP 1)
set (HAVE_FUNC_SYSCONF 1)
if (ARCH_AMD64 AND ENABLE_SSSE3)
set (SNAPPY_HAVE_SSSE3 1)
else ()
set (SNAPPY_HAVE_SSSE3 0)
endif ()
configure_file(
"${SOURCE_DIR}/cmake/config.h.in"
"${CMAKE_CURRENT_BINARY_DIR}/config.h")
set(HAVE_SYS_UIO_H_01 1)
configure_file(
"${SOURCE_DIR}/snappy-stubs-public.h.in"
"${CMAKE_CURRENT_BINARY_DIR}/snappy-stubs-public.h")
add_library(snappy "")
target_sources(snappy
PRIVATE
"${SOURCE_DIR}/snappy-internal.h"
"${SOURCE_DIR}/snappy-stubs-internal.h"
"${SOURCE_DIR}/snappy-c.cc"
"${SOURCE_DIR}/snappy-sinksource.cc"
"${SOURCE_DIR}/snappy-stubs-internal.cc"
"${SOURCE_DIR}/snappy.cc")
target_include_directories(snappy PUBLIC ${SOURCE_DIR} ${CMAKE_CURRENT_BINARY_DIR})
target_compile_definitions(snappy PRIVATE -DHAVE_CONFIG_H)

1
contrib/sysroot vendored Submodule

@ -0,0 +1 @@
Subproject commit 611d3315e9e369a338de4ffa128eb87b4fb87dec

View File

@ -93,10 +93,6 @@ RUN git clone https://github.com/tpoechtrager/cctools-port.git \
# Download toolchain and SDK for Darwin
RUN wget -nv https://github.com/phracker/MacOSX-SDKs/releases/download/11.3/MacOSX11.0.sdk.tar.xz
# Download toolchain for ARM
# It contains all required headers and libraries. Note that it's named as "gcc" but actually we are using clang for cross compiling.
RUN wget -nv "https://developer.arm.com/-/media/Files/downloads/gnu-a/8.3-2019.03/binrel/gcc-arm-8.3-2019.03-x86_64-aarch64-linux-gnu.tar.xz?revision=2e88a73f-d233-4f96-b1f4-d8b36e9bb0b9&la=en" -O gcc-arm-8.3-2019.03-x86_64-aarch64-linux-gnu.tar.xz
# Download toolchain for FreeBSD 11.3
RUN wget -nv https://clickhouse-datasets.s3.yandex.net/toolchains/toolchains/freebsd-11.3-toolchain.tar.xz

View File

@ -6,9 +6,6 @@ mkdir -p build/cmake/toolchain/darwin-x86_64
tar xJf MacOSX11.0.sdk.tar.xz -C build/cmake/toolchain/darwin-x86_64 --strip-components=1
ln -sf darwin-x86_64 build/cmake/toolchain/darwin-aarch64
mkdir -p build/cmake/toolchain/linux-aarch64
tar xJf gcc-arm-8.3-2019.03-x86_64-aarch64-linux-gnu.tar.xz -C build/cmake/toolchain/linux-aarch64 --strip-components=1
mkdir -p build/cmake/toolchain/freebsd-x86_64
tar xJf freebsd-11.3-toolchain.tar.xz -C build/cmake/toolchain/freebsd-x86_64 --strip-components=1

View File

@ -67,7 +67,7 @@ RUN apt-get update \
unixodbc \
--yes --no-install-recommends
RUN pip3 install numpy scipy pandas Jinja2
RUN pip3 install numpy scipy pandas Jinja2 pandas clickhouse_driver
# This symlink required by gcc to find lld compiler
RUN ln -s /usr/bin/lld-${LLVM_VERSION} /usr/bin/ld.lld

View File

@ -27,7 +27,7 @@ RUN apt-get update \
&& apt-get clean \
&& rm -rf /var/lib/apt/lists/*
RUN pip3 install Jinja2
RUN pip3 install Jinja2 pandas clickhouse_driver
COPY * /

View File

@ -34,7 +34,7 @@ RUN apt-get update -y \
postgresql-client \
sqlite3
RUN pip3 install numpy scipy pandas Jinja2
RUN pip3 install numpy scipy pandas Jinja2 clickhouse_driver
RUN mkdir -p /tmp/clickhouse-odbc-tmp \
&& wget -nv -O - ${odbc_driver_url} | tar --strip-components=1 -xz -C /tmp/clickhouse-odbc-tmp \

View File

@ -10,7 +10,7 @@ RUN apt-get update && env DEBIAN_FRONTEND=noninteractive apt-get install --yes \
python3-pip \
pylint \
yamllint \
&& pip3 install codespell
&& pip3 install codespell pandas clickhouse_driver
COPY run.sh /
COPY process_style_check_result.py /

View File

@ -9,15 +9,11 @@ This is for the case when you have Linux machine and want to use it to build `cl
The cross-build for AARCH64 is based on the [Build instructions](../development/build.md), follow them first.
## Install Clang-8 {#install-clang-8}
## Install Clang-13
Follow the instructions from https://apt.llvm.org/ for your Ubuntu or Debian setup.
For example, in Ubuntu Bionic you can use the following commands:
``` bash
echo "deb [trusted=yes] http://apt.llvm.org/bionic/ llvm-toolchain-bionic-8 main" | sudo tee /etc/apt/sources.list.d/llvm.list
sudo apt-get update
sudo apt-get install clang-8
Follow the instructions from https://apt.llvm.org/ for your Ubuntu or Debian setup or do
```
sudo bash -c "$(wget -O - https://apt.llvm.org/llvm.sh)"
```
## Install Cross-Compilation Toolset {#install-cross-compilation-toolset}
@ -34,7 +30,7 @@ tar xJf gcc-arm-8.3-2019.03-x86_64-aarch64-linux-gnu.tar.xz -C build-aarch64/cma
``` bash
cd ClickHouse
mkdir build-arm64
CC=clang-8 CXX=clang++-8 cmake . -Bbuild-arm64 -DCMAKE_TOOLCHAIN_FILE=cmake/linux/toolchain-aarch64.cmake
CC=clang-13 CXX=clang++-13 cmake . -Bbuild-arm64 -DCMAKE_TOOLCHAIN_FILE=cmake/linux/toolchain-aarch64.cmake
ninja -C build-arm64
```

View File

@ -162,5 +162,6 @@ toc_title: Adopters
| <a href="https://zagravagames.com/en/" class="favicon">Zagrava Trading</a> | — | — | — | — | [Job offer, May 2021](https://twitter.com/datastackjobs/status/1394707267082063874) |
| <a href="https://beeline.ru/" class="favicon">Beeline</a> | Telecom | Data Platform | — | — | [Blog post, July 2021](https://habr.com/en/company/beeline/blog/567508/) |
| <a href="https://ecommpay.com/" class="favicon">Ecommpay</a> | Payment Processing | Logs | — | — | [Video, Nov 2019](https://www.youtube.com/watch?v=d3GdZTOWGLk) |
| <a href="https://omnicomm.ru/" class="favicon">Omnicomm</a> | Transportation Monitoring | — | — | — | [Facebook post, Oct 2021](https://www.facebook.com/OmnicommTeam/posts/2824479777774500) |
[Original article](https://clickhouse.com/docs/en/introduction/adopters/) <!--hide-->

View File

@ -28,7 +28,7 @@
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/UseSSL.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <Interpreters/Context.h>
#include <Client/Connection.h>
#include <Common/InterruptListener.h>
@ -424,20 +424,19 @@ private:
if (reconnect)
connection.disconnect();
RemoteBlockInputStream stream(
RemoteQueryExecutor executor(
connection, query, {}, global_context, nullptr, Scalars(), Tables(), query_processing_stage);
if (!query_id.empty())
stream.setQueryId(query_id);
executor.setQueryId(query_id);
Progress progress;
stream.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
executor.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
stream.readPrefix();
while (Block block = stream.read());
BlockStreamProfileInfo info;
while (Block block = executor.read())
info.update(block);
stream.readSuffix();
const BlockStreamProfileInfo & info = stream.getProfileInfo();
executor.finish();
double seconds = watch.elapsedSeconds();

View File

@ -14,7 +14,6 @@
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Sources/RemoteSource.h>
#include <DataStreams/ExpressionBlockInputStream.h>
namespace DB
{
@ -87,7 +86,7 @@ decltype(auto) ClusterCopier::retry(T && func, UInt64 max_tries)
if (try_number < max_tries)
{
tryLogCurrentException(log, "Will retry");
std::this_thread::sleep_for(default_sleep_time);
std::this_thread::sleep_for(retry_delay_ms);
}
}
}
@ -310,7 +309,7 @@ void ClusterCopier::process(const ConnectionTimeouts & timeouts)
/// Retry table processing
bool table_is_done = false;
for (UInt64 num_table_tries = 0; num_table_tries < max_table_tries; ++num_table_tries)
for (UInt64 num_table_tries = 1; num_table_tries <= max_table_tries; ++num_table_tries)
{
if (tryProcessTable(timeouts, task_table))
{
@ -341,7 +340,7 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee
const String & description,
bool unprioritized)
{
std::chrono::milliseconds current_sleep_time = default_sleep_time;
std::chrono::milliseconds current_sleep_time = retry_delay_ms;
static constexpr std::chrono::milliseconds max_sleep_time(30000); // 30 sec
if (unprioritized)
@ -367,7 +366,7 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee
LOG_INFO(log, "Too many workers ({}, maximum {}). Postpone processing {}", stat.numChildren, task_cluster->max_workers, description);
if (unprioritized)
current_sleep_time = std::min(max_sleep_time, current_sleep_time + default_sleep_time);
current_sleep_time = std::min(max_sleep_time, current_sleep_time + retry_delay_ms);
std::this_thread::sleep_for(current_sleep_time);
num_bad_version_errors = 0;
@ -786,7 +785,7 @@ bool ClusterCopier::tryDropPartitionPiece(
if (e.code == Coordination::Error::ZNODEEXISTS)
{
LOG_INFO(log, "Partition {} piece {} is cleaning now by somebody, sleep", task_partition.name, toString(current_piece_number));
std::this_thread::sleep_for(default_sleep_time);
std::this_thread::sleep_for(retry_delay_ms);
return false;
}
@ -799,7 +798,7 @@ bool ClusterCopier::tryDropPartitionPiece(
if (stat.numChildren != 0)
{
LOG_INFO(log, "Partition {} contains {} active workers while trying to drop it. Going to sleep.", task_partition.name, stat.numChildren);
std::this_thread::sleep_for(default_sleep_time);
std::this_thread::sleep_for(retry_delay_ms);
return false;
}
else
@ -1006,7 +1005,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
task_status = TaskStatus::Error;
bool was_error = false;
has_shard_to_process = true;
for (UInt64 try_num = 0; try_num < max_shard_partition_tries; ++try_num)
for (UInt64 try_num = 1; try_num <= max_shard_partition_tries; ++try_num)
{
task_status = tryProcessPartitionTask(timeouts, partition, is_unprioritized_task);
@ -1021,7 +1020,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
break;
/// Repeat on errors
std::this_thread::sleep_for(default_sleep_time);
std::this_thread::sleep_for(retry_delay_ms);
}
if (task_status == TaskStatus::Error)
@ -1069,7 +1068,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
break;
/// Repeat on errors.
std::this_thread::sleep_for(default_sleep_time);
std::this_thread::sleep_for(retry_delay_ms);
}
catch (...)
{
@ -1110,7 +1109,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
if (!table_is_done)
{
LOG_INFO(log, "Table {} is not processed yet.Copied {} of {}, will retry", task_table.table_id, finished_partitions, required_partitions);
LOG_INFO(log, "Table {} is not processed yet. Copied {} of {}, will retry", task_table.table_id, finished_partitions, required_partitions);
}
else
{
@ -1213,7 +1212,7 @@ TaskStatus ClusterCopier::iterateThroughAllPiecesInPartition(const ConnectionTim
break;
/// Repeat on errors
std::this_thread::sleep_for(default_sleep_time);
std::this_thread::sleep_for(retry_delay_ms);
}
was_active_pieces = (res == TaskStatus::Active);

View File

@ -65,6 +65,23 @@ public:
experimental_use_sample_offset = value;
}
void setMaxTableTries(UInt64 tries)
{
max_table_tries = tries;
}
void setMaxShardPartitionTries(UInt64 tries)
{
max_shard_partition_tries = tries;
}
void setMaxShardPartitionPieceTriesForAlter(UInt64 tries)
{
max_shard_partition_piece_tries_for_alter = tries;
}
void setRetryDelayMs(std::chrono::milliseconds ms)
{
retry_delay_ms = ms;
}
protected:
String getWorkersPath() const
@ -123,10 +140,6 @@ protected:
bool tryDropPartitionPiece(ShardPartition & task_partition, size_t current_piece_number,
const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock);
static constexpr UInt64 max_table_tries = 3;
static constexpr UInt64 max_shard_partition_tries = 3;
static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 10;
bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
TaskStatus tryCreateDestinationTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
@ -218,6 +231,9 @@ private:
Poco::Logger * log;
std::chrono::milliseconds default_sleep_time{1000};
UInt64 max_table_tries = 3;
UInt64 max_shard_partition_tries = 3;
UInt64 max_shard_partition_piece_tries_for_alter = 10;
std::chrono::milliseconds retry_delay_ms{1000};
};
}

View File

@ -31,6 +31,10 @@ void ClusterCopierApp::initialize(Poco::Util::Application & self)
move_fault_probability = std::max(std::min(config().getDouble("move-fault-probability"), 1.0), 0.0);
base_dir = (config().has("base-dir")) ? config().getString("base-dir") : fs::current_path().string();
max_table_tries = std::max<size_t>(config().getUInt("max-table-tries", 3), 1);
max_shard_partition_tries = std::max<size_t>(config().getUInt("max-shard-partition-tries", 3), 1);
max_shard_partition_piece_tries_for_alter = std::max<size_t>(config().getUInt("max-shard-partition-piece-tries-for-alter", 10), 1);
retry_delay_ms = std::chrono::milliseconds(std::max<size_t>(config().getUInt("retry-delay-ms", 1000), 100));
if (config().has("experimental-use-sample-offset"))
experimental_use_sample_offset = config().getBool("experimental-use-sample-offset");
@ -100,6 +104,15 @@ void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
.argument("experimental-use-sample-offset").binding("experimental-use-sample-offset"));
options.addOption(Poco::Util::Option("status", "", "Get for status for current execution").binding("status"));
options.addOption(Poco::Util::Option("max-table-tries", "", "Number of tries for the copy table task")
.argument("max-table-tries").binding("max-table-tries"));
options.addOption(Poco::Util::Option("max-shard-partition-tries", "", "Number of tries for the copy one partition task")
.argument("max-shard-partition-tries").binding("max-shard-partition-tries"));
options.addOption(Poco::Util::Option("max-shard-partition-piece-tries-for-alter", "", "Number of tries for final ALTER ATTACH to destination table")
.argument("max-shard-partition-piece-tries-for-alter").binding("max-shard-partition-piece-tries-for-alter"));
options.addOption(Poco::Util::Option("retry-delay-ms", "", "Delay between task retries")
.argument("retry-delay-ms").binding("retry-delay-ms"));
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("help", "", "produce this help message").binding("help")
.callback(Poco::Util::OptionCallback<Me>(this, &Me::handleHelp)));
@ -161,7 +174,10 @@ void ClusterCopierApp::mainImpl()
copier->setSafeMode(is_safe_mode);
copier->setCopyFaultProbability(copy_fault_probability);
copier->setMoveFaultProbability(move_fault_probability);
copier->setMaxTableTries(max_table_tries);
copier->setMaxShardPartitionTries(max_shard_partition_tries);
copier->setMaxShardPartitionPieceTriesForAlter(max_shard_partition_piece_tries_for_alter);
copier->setRetryDelayMs(retry_delay_ms);
copier->setExperimentalUseSampleOffset(experimental_use_sample_offset);
auto task_file = config().getString("task-file", "");

View File

@ -83,6 +83,11 @@ private:
double move_fault_probability = 0.0;
bool is_help = false;
UInt64 max_table_tries = 3;
UInt64 max_shard_partition_tries = 3;
UInt64 max_shard_partition_piece_tries_for_alter = 10;
std::chrono::milliseconds retry_delay_ms{1000};
bool experimental_use_sample_offset{false};
std::string base_dir;

View File

@ -49,7 +49,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Formats/FormatSettings.h>
#include <DataStreams/RemoteBlockInputStream.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/NullBlockOutputStream.h>

View File

@ -1036,6 +1036,10 @@ if (ThreadFuzzer::instance().isEffective())
server.start();
SCOPE_EXIT({
/// Stop reloading of the main config. This must be done before `global_context->shutdown()` because
/// otherwise the reloading may pass a changed config to some destroyed parts of ContextSharedPart.
main_config_reloader.reset();
/** Ask to cancel background jobs all table engines,
* and also query_log.
* It is important to do early, not in destructor of Context, because
@ -1076,9 +1080,6 @@ if (ThreadFuzzer::instance().isEffective())
/// Wait server pool to avoid use-after-free of destroyed context in the handlers
server_pool.joinAll();
// Uses a raw pointer to global context for getting ZooKeeper.
main_config_reloader.reset();
/** Explicitly destroy Context. It is more convenient than in destructor of Server, because logger is still available.
* At this moment, no one could own shared part of Context.
*/
@ -1138,18 +1139,7 @@ if (ThreadFuzzer::instance().isEffective())
/// Init trace collector only after trace_log system table was created
/// Disable it if we collect test coverage information, because it will work extremely slow.
///
/// It also cannot work with sanitizers.
/// Sanitizers are using quick "frame walking" stack unwinding (this implies -fno-omit-frame-pointer)
/// And they do unwinding frequently (on every malloc/free, thread/mutex operations, etc).
/// They change %rbp during unwinding and it confuses libunwind if signal comes during sanitizer unwinding
/// and query profiler decide to unwind stack with libunwind at this moment.
///
/// Symptoms: you'll get silent Segmentation Fault - without sanitizer message and without usual ClickHouse diagnostics.
///
/// Look at compiler-rt/lib/sanitizer_common/sanitizer_stacktrace.h
///
#if USE_UNWIND && !WITH_COVERAGE && !defined(SANITIZER) && defined(__x86_64__)
#if USE_UNWIND && !WITH_COVERAGE && defined(__x86_64__)
/// Profilers cannot work reliably with any other libunwind or without PHDR cache.
if (hasPHDRCache())
{
@ -1181,7 +1171,7 @@ if (ThreadFuzzer::instance().isEffective())
#endif
#if defined(SANITIZER)
LOG_INFO(log, "Query Profiler and TraceCollector are disabled because they cannot work under sanitizers"
LOG_INFO(log, "Query Profiler disabled because they cannot work under sanitizers"
" when two different stack unwinding methods will interfere with each other.");
#endif

View File

@ -62,6 +62,27 @@
-->
</logger>
<!-- Add headers to response in options request. OPTIONS method is used in CORS preflight requests. -->
<!-- It is off by default. Next headers are obligate for CORS.-->
<!-- http_options_response>
<header>
<name>Access-Control-Allow-Origin</name>
<value>*</value>
</header>
<header>
<name>Access-Control-Allow-Headers</name>
<value>origin, x-requested-with</value>
</header>
<header>
<name>Access-Control-Allow-Methods</name>
<value>POST, GET, OPTIONS</value>
</header>
<header>
<name>Access-Control-Max-Age</name>
<value>86400</value>
</header>
</http_options_response -->
<!-- It is the name that will be shown in the clickhouse-client.
By default, anything with "production" will be highlighted in red in query prompt.
-->

View File

@ -48,7 +48,7 @@
#include <IO/CompressionMethod.h>
#include <DataStreams/NullBlockOutputStream.h>
#include <DataStreams/InternalTextLogsRowOutputStream.h>
#include <DataStreams/InternalTextLogs.h>
namespace fs = std::filesystem;
@ -95,6 +95,9 @@ void interruptSignalHandler(int signum)
_exit(signum);
}
ClientBase::~ClientBase() = default;
ClientBase::ClientBase() = default;
void ClientBase::setupSignalHandler()
{
exit_on_signal.test_and_set();
@ -393,8 +396,7 @@ void ClientBase::initLogsOutputStream()
}
}
logs_out_stream = std::make_shared<InternalTextLogsRowOutputStream>(*wb, stdout_is_a_tty);
logs_out_stream->writePrefix();
logs_out_stream = std::make_unique<InternalTextLogs>(*wb, stdout_is_a_tty);
}
}
@ -426,10 +428,8 @@ void ClientBase::processTextAsSingleQuery(const String & full_query)
catch (Exception & e)
{
if (!is_interactive)
{
e.addMessage("(in query: {})", full_query);
throw;
}
throw;
}
if (have_error)
@ -641,9 +641,6 @@ void ClientBase::onEndOfStream()
if (block_out_stream)
block_out_stream->writeSuffix();
if (logs_out_stream)
logs_out_stream->writeSuffix();
resetOutput();
if (is_interactive && !written_first_block)

View File

@ -32,12 +32,17 @@ enum MultiQueryProcessingStage
void interruptSignalHandler(int signum);
class InternalTextLogs;
class ClientBase : public Poco::Util::Application
{
public:
using Arguments = std::vector<String>;
ClientBase();
~ClientBase() override;
void init(int argc, char ** argv);
protected:
@ -177,7 +182,7 @@ protected:
/// The user could specify special file for server logs (stderr by default)
std::unique_ptr<WriteBuffer> out_logs_buf;
String server_logs_file;
BlockOutputStreamPtr logs_out_stream;
std::unique_ptr<InternalTextLogs> logs_out_stream;
String home_path;
String history_file; /// Path to a file containing command history.

View File

@ -9,8 +9,8 @@
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <IO/TimeoutSetter.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NativeReader.h>
#include <DataStreams/NativeWriter.h>
#include <Client/Connection.h>
#include <Client/ConnectionParameters.h>
#include <Common/ClickHouseRevision.h>
@ -58,6 +58,35 @@ namespace ErrorCodes
extern const int EMPTY_DATA_PASSED;
}
Connection::~Connection() = default;
Connection::Connection(const String & host_, UInt16 port_,
const String & default_database_,
const String & user_, const String & password_,
const String & cluster_,
const String & cluster_secret_,
const String & client_name_,
Protocol::Compression compression_,
Protocol::Secure secure_,
Poco::Timespan sync_request_timeout_)
: host(host_), port(port_), default_database(default_database_)
, user(user_), password(password_)
, cluster(cluster_)
, cluster_secret(cluster_secret_)
, client_name(client_name_)
, compression(compression_)
, secure(secure_)
, sync_request_timeout(sync_request_timeout_)
, log_wrapper(*this)
{
/// Don't connect immediately, only on first need.
if (user.empty())
user = "default";
setDescription();
}
void Connection::connect(const ConnectionTimeouts & timeouts)
{
@ -533,11 +562,11 @@ void Connection::sendData(const Block & block, const String & name, bool scalar)
if (!block_out)
{
if (compression == Protocol::Compression::Enable)
maybe_compressed_out = std::make_shared<CompressedWriteBuffer>(*out, compression_codec);
maybe_compressed_out = std::make_unique<CompressedWriteBuffer>(*out, compression_codec);
else
maybe_compressed_out = out;
block_out = std::make_shared<NativeBlockOutputStream>(*maybe_compressed_out, server_revision, block.cloneEmpty());
block_out = std::make_unique<NativeWriter>(*maybe_compressed_out, server_revision, block.cloneEmpty());
}
if (scalar)
@ -866,18 +895,18 @@ Packet Connection::receivePacket()
Block Connection::receiveData()
{
initBlockInput();
return receiveDataImpl(block_in);
return receiveDataImpl(*block_in);
}
Block Connection::receiveLogData()
{
initBlockLogsInput();
return receiveDataImpl(block_logs_in);
return receiveDataImpl(*block_logs_in);
}
Block Connection::receiveDataImpl(BlockInputStreamPtr & stream)
Block Connection::receiveDataImpl(NativeReader & reader)
{
String external_table_name;
readStringBinary(external_table_name, *in);
@ -885,7 +914,7 @@ Block Connection::receiveDataImpl(BlockInputStreamPtr & stream)
size_t prev_bytes = in->count();
/// Read one block from network.
Block res = stream->read();
Block res = reader.read();
if (throttler)
throttler->add(in->count() - prev_bytes);
@ -912,7 +941,7 @@ void Connection::initBlockInput()
maybe_compressed_in = in;
}
block_in = std::make_shared<NativeBlockInputStream>(*maybe_compressed_in, server_revision);
block_in = std::make_unique<NativeReader>(*maybe_compressed_in, server_revision);
}
}
@ -922,7 +951,7 @@ void Connection::initBlockLogsInput()
if (!block_logs_in)
{
/// Have to return superset of SystemLogsQueue::getSampleBlock() columns
block_logs_in = std::make_shared<NativeBlockInputStream>(*in, server_revision);
block_logs_in = std::make_unique<NativeReader>(*in, server_revision);
}
}

View File

@ -32,6 +32,9 @@ struct ConnectionParameters;
using ConnectionPtr = std::shared_ptr<Connection>;
using Connections = std::vector<ConnectionPtr>;
class NativeReader;
class NativeWriter;
/** Connection with database server, to use by client.
* How to use - see Core/Protocol.h
@ -53,25 +56,9 @@ public:
const String & client_name_,
Protocol::Compression compression_,
Protocol::Secure secure_,
Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0))
:
host(host_), port(port_), default_database(default_database_),
user(user_), password(password_),
cluster(cluster_),
cluster_secret(cluster_secret_),
client_name(client_name_),
compression(compression_),
secure(secure_),
sync_request_timeout(sync_request_timeout_),
log_wrapper(*this)
{
/// Don't connect immediately, only on first need.
Poco::Timespan sync_request_timeout_ = Poco::Timespan(DBMS_DEFAULT_SYNC_REQUEST_TIMEOUT_SEC, 0));
if (user.empty())
user = "default";
setDescription();
}
~Connection() override;
static ServerConnectionPtr createConnection(const ConnectionParameters & parameters, ContextPtr context);
@ -217,12 +204,12 @@ private:
/// From where to read query execution result.
std::shared_ptr<ReadBuffer> maybe_compressed_in;
BlockInputStreamPtr block_in;
BlockInputStreamPtr block_logs_in;
std::unique_ptr<NativeReader> block_in;
std::unique_ptr<NativeReader> block_logs_in;
/// Where to write data for INSERT.
std::shared_ptr<WriteBuffer> maybe_compressed_out;
BlockOutputStreamPtr block_out;
std::unique_ptr<NativeWriter> block_out;
/// Logger is created lazily, for avoid to run DNS request in constructor.
class LoggerWrapper
@ -261,7 +248,7 @@ private:
Block receiveData();
Block receiveLogData();
Block receiveDataImpl(BlockInputStreamPtr & stream);
Block receiveDataImpl(NativeReader & reader);
std::vector<String> receiveMultistringMessage(UInt64 msg_type) const;
std::unique_ptr<Exception> receiveException() const;

View File

@ -144,7 +144,7 @@ Field QueryFuzzer::fuzzField(Field field)
{
size_t pos = fuzz_rand() % arr.size();
arr.erase(arr.begin() + pos);
fprintf(stderr, "erased\n");
std::cerr << "erased\n";
}
if (fuzz_rand() % 5 == 0)
@ -153,12 +153,12 @@ Field QueryFuzzer::fuzzField(Field field)
{
size_t pos = fuzz_rand() % arr.size();
arr.insert(arr.begin() + pos, fuzzField(arr[pos]));
fprintf(stderr, "inserted (pos %zd)\n", pos);
std::cerr << fmt::format("inserted (pos {})\n", pos);
}
else
{
arr.insert(arr.begin(), getRandomField(0));
fprintf(stderr, "inserted (0)\n");
std::cerr << "inserted (0)\n";
}
}
@ -278,7 +278,7 @@ void QueryFuzzer::fuzzOrderByList(IAST * ast)
}
else
{
fprintf(stderr, "no random col!\n");
std::cerr << "No random column.\n";
}
}
@ -312,13 +312,9 @@ void QueryFuzzer::fuzzColumnLikeExpressionList(IAST * ast)
: impl->children.begin() + fuzz_rand() % impl->children.size();
auto col = getRandomColumnLike();
if (col)
{
impl->children.insert(pos, col);
}
else
{
fprintf(stderr, "no random col!\n");
}
std::cerr << "No random column.\n";
}
// We don't have to recurse here to fuzz the children, this is handled by

View File

@ -2,7 +2,7 @@
#include <Columns/ColumnsCommon.h>
#include <Columns/MaskOperations.h>
#include <Common/assert_cast.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteBufferFromArena.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>

View File

@ -13,7 +13,7 @@
#include <base/unaligned.h>
#include <base/sort.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <Common/Exception.h>
#include <Common/Arena.h>

View File

@ -16,7 +16,7 @@
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/MaskOperations.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
template <typename T> bool decimalLess(T x, T y, UInt32 x_scale, UInt32 y_scale);

View File

@ -2,7 +2,7 @@
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnCompressed.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteHelpers.h>
#include <Common/Arena.h>
#include <Common/HashTable/Hash.h>
@ -248,31 +248,23 @@ ColumnPtr ColumnFixedString::filter(const IColumn::Filter & filt, ssize_t result
UInt16 mask = _mm_movemask_epi8(_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(filt_pos)), zero16));
mask = ~mask;
if (0 == mask)
{
/// Nothing is inserted.
data_pos += chars_per_simd_elements;
}
else if (0xFFFF == mask)
if (0xFFFF == mask)
{
res->chars.insert(data_pos, data_pos + chars_per_simd_elements);
data_pos += chars_per_simd_elements;
}
else
{
size_t res_chars_size = res->chars.size();
for (size_t i = 0; i < SIMD_BYTES; ++i)
while (mask)
{
if (filt_pos[i])
{
res->chars.resize(res_chars_size + n);
memcpySmallAllowReadWriteOverflow15(&res->chars[res_chars_size], data_pos, n);
res_chars_size += n;
}
data_pos += n;
size_t index = __builtin_ctz(mask);
res->chars.resize(res_chars_size + n);
memcpySmallAllowReadWriteOverflow15(&res->chars[res_chars_size], data_pos + index * n, n);
res_chars_size += n;
mask = mask & (mask - 1);
}
}
data_pos += chars_per_simd_elements;
filt_pos += SIMD_BYTES;
}
#endif

View File

@ -2,7 +2,7 @@
#include <Columns/ColumnString.h>
#include <Columns/ColumnsNumber.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <DataTypes/NumberTraits.h>
#include <Common/HashTable/HashMap.h>
#include <Common/WeakHash.h>

View File

@ -1,7 +1,7 @@
#include <Columns/ColumnMap.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/IColumnImpl.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <base/map.h>

View File

@ -8,7 +8,7 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnCompressed.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
namespace DB

View File

@ -4,7 +4,7 @@
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/MaskOperations.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <Common/Arena.h>
#include <Common/HashTable/Hash.h>
#include <Common/WeakHash.h>

View File

@ -3,7 +3,7 @@
#include <Columns/IColumnImpl.h>
#include <Columns/ColumnCompressed.h>
#include <Core/Field.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/Operators.h>
#include <IO/WriteBufferFromString.h>
#include <Common/WeakHash.h>

View File

@ -4,7 +4,7 @@
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnCompressed.h>
#include <Columns/MaskOperations.h>
#include <DataStreams/ColumnGathererStream.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteHelpers.h>
#include <Common/Arena.h>
#include <Common/Exception.h>
@ -327,19 +327,18 @@ ColumnPtr ColumnVector<T>::filter(const IColumn::Filter & filt, ssize_t result_s
UInt16 mask = _mm_movemask_epi8(_mm_cmpeq_epi8(_mm_loadu_si128(reinterpret_cast<const __m128i *>(filt_pos)), zero16));
mask = ~mask;
if (0 == mask)
{
/// Nothing is inserted.
}
else if (0xFFFF == mask)
if (0xFFFF == mask)
{
res_data.insert(data_pos, data_pos + SIMD_BYTES);
}
else
{
for (size_t i = 0; i < SIMD_BYTES; ++i)
if (filt_pos[i])
res_data.push_back(data_pos[i]);
while (mask)
{
size_t index = __builtin_ctz(mask);
res_data.push_back(data_pos[index]);
mask = mask & (mask - 1);
}
}
filt_pos += SIMD_BYTES;

View File

@ -241,11 +241,7 @@ namespace
zero_vec));
mask = ~mask;
if (mask == 0)
{
/// SIMD_BYTES consecutive rows do not pass the filter
}
else if (mask == 0xffff)
if (mask == 0xffff)
{
/// SIMD_BYTES consecutive rows pass the filter
const auto first = offsets_pos == offsets_begin;
@ -262,9 +258,12 @@ namespace
}
else
{
for (size_t i = 0; i < SIMD_BYTES; ++i)
if (filt_pos[i])
copy_array(offsets_pos + i);
while (mask)
{
size_t index = __builtin_ctz(mask);
copy_array(offsets_pos + index);
mask = mask & (mask - 1);
}
}
filt_pos += SIMD_BYTES;

View File

@ -94,7 +94,7 @@ class JSONMap : public IItem
};
public:
void add(std::string key, ItemPtr value) { values.emplace_back(Pair{.key = std::move(key), .value = std::move(value)}); }
void add(std::string key, ItemPtr value) { values.emplace_back(Pair{.key = std::move(key), .value = std::move(value)}); } //-V1030
void add(std::string key, std::string value) { add(std::move(key), std::make_unique<JSONString>(std::move(value))); }
void add(std::string key, const char * value) { add(std::move(key), std::make_unique<JSONString>(value)); }
void add(std::string key, std::string_view value) { add(std::move(key), std::make_unique<JSONString>(value)); }

View File

@ -82,7 +82,21 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const UInt64 thread_id, const
: log(&Poco::Logger::get("QueryProfiler"))
, pause_signal(pause_signal_)
{
#if USE_UNWIND
#if defined(SANITIZER)
UNUSED(thread_id);
UNUSED(clock_type);
UNUSED(period);
UNUSED(pause_signal);
throw Exception("QueryProfiler disabled because they cannot work under sanitizers", ErrorCodes::NOT_IMPLEMENTED);
#elif !USE_UNWIND
UNUSED(thread_id);
UNUSED(clock_type);
UNUSED(period);
UNUSED(pause_signal);
throw Exception("QueryProfiler cannot work with stock libunwind", ErrorCodes::NOT_IMPLEMENTED);
#else
/// Sanity check.
if (!hasPHDRCache())
throw Exception("QueryProfiler cannot be used without PHDR cache, that is not available for TSan build", ErrorCodes::NOT_IMPLEMENTED);
@ -144,13 +158,6 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const UInt64 thread_id, const
tryCleanup();
throw;
}
#else
UNUSED(thread_id);
UNUSED(clock_type);
UNUSED(period);
UNUSED(pause_signal);
throw Exception("QueryProfiler cannot work with stock libunwind", ErrorCodes::NOT_IMPLEMENTED);
#endif
}

View File

@ -186,6 +186,8 @@ static void * getCallerAddress(const ucontext_t & context)
#elif defined(__APPLE__) && defined(__aarch64__)
return reinterpret_cast<void *>(context.uc_mcontext->__ss.__pc);
#elif defined(__FreeBSD__) && defined(__aarch64__)
return reinterpret_cast<void *>(context.uc_mcontext.mc_gpregs.gp_elr);
#elif defined(__aarch64__)
return reinterpret_cast<void *>(context.uc_mcontext.pc);
#elif defined(__powerpc64__)

View File

@ -44,7 +44,7 @@ namespace
struct ThreadStack
{
ThreadStack()
: data(aligned_alloc(getPageSize(), size))
: data(aligned_alloc(getPageSize(), getSize()))
{
/// Add a guard page
/// (and since the stack grows downward, we need to protect the first page).
@ -56,12 +56,11 @@ struct ThreadStack
free(data);
}
static size_t getSize() { return size; }
static size_t getSize() { return std::max<size_t>(16 << 10, MINSIGSTKSZ); }
void * getData() const { return data; }
private:
/// 16 KiB - not too big but enough to handle error.
static constexpr size_t size = std::max<size_t>(16 << 10, MINSIGSTKSZ);
void * data;
};

View File

@ -289,7 +289,7 @@ ZooKeeper::~ZooKeeper()
{
try
{
finalize(false, false);
finalize(false, false, "destructor called");
if (send_thread.joinable())
send_thread.join();
@ -299,7 +299,7 @@ ZooKeeper::~ZooKeeper()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
}
@ -317,6 +317,7 @@ ZooKeeper::ZooKeeper(
session_timeout(session_timeout_),
operation_timeout(std::min(operation_timeout_, session_timeout_))
{
log = &Poco::Logger::get("ZooKeeperClient");
std::atomic_store(&zk_log, std::move(zk_log_));
if (!root_path.empty())
@ -450,6 +451,10 @@ void ZooKeeper::connect(
message << fail_reasons.str() << "\n";
throw Exception(message.str(), Error::ZCONNECTIONLOSS);
}
else
{
LOG_TEST(log, "Connected to ZooKeeper at {} with session_id {}", socket.peerAddress().toString(), session_id);
}
}
@ -604,8 +609,8 @@ void ZooKeeper::sendThread()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
finalize(true, false);
tryLogCurrentException(log);
finalize(true, false, "exception in sendThread");
}
}
@ -663,8 +668,8 @@ void ZooKeeper::receiveThread()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
finalize(false, true);
tryLogCurrentException(log);
finalize(false, true, "exception in receiveThread");
}
}
@ -799,7 +804,7 @@ void ZooKeeper::receiveEvent()
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
/// Unrecoverable. Don't leave incorrect state in memory.
if (!response)
@ -819,7 +824,7 @@ void ZooKeeper::receiveEvent()
catch (...)
{
/// Throw initial exception, not exception from callback.
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
throw;
@ -832,10 +837,15 @@ void ZooKeeper::receiveEvent()
}
void ZooKeeper::finalize(bool error_send, bool error_receive)
void ZooKeeper::finalize(bool error_send, bool error_receive, const String & reason)
{
/// If some thread (send/receive) already finalizing session don't try to do it
if (finalization_started.exchange(true))
bool already_started = finalization_started.exchange(true);
LOG_TEST(log, "Finalizing session {}: finalization_started={}, queue_closed={}, reason={}",
session_id, already_started, requests_queue.isClosed(), reason);
if (already_started)
return;
auto expire_session_if_not_expired = [&]
@ -860,7 +870,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
/// This happens for example, when "Cannot push request to queue within operation timeout".
/// Just mark session expired in case of error on close request, otherwise sendThread may not stop.
expire_session_if_not_expired();
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
/// Send thread will exit after sending close request or on expired flag
@ -879,7 +889,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
catch (...)
{
/// We must continue to execute all callbacks, because the user is waiting for them.
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
if (!error_receive && receive_thread.joinable())
@ -908,7 +918,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
catch (...)
{
/// We must continue to all other callbacks, because the user is waiting for them.
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
}
}
@ -939,7 +949,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
}
}
@ -967,7 +977,7 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
}
}
@ -983,14 +993,14 @@ void ZooKeeper::finalize(bool error_send, bool error_receive)
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
}
}
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
tryLogCurrentException(log);
}
}
@ -1028,7 +1038,7 @@ void ZooKeeper::pushRequest(RequestInfo && info)
}
catch (...)
{
finalize(false, false);
finalize(false, false, getCurrentExceptionMessage(false, false, false));
throw;
}

View File

@ -187,7 +187,7 @@ public:
/// it will do read in another session, that read may not see the
/// already performed write.
void finalize() override { finalize(false, false); }
void finalize() override { finalize(false, false, "unknown"); }
void setZooKeeperLog(std::shared_ptr<DB::ZooKeeperLog> zk_log_);
@ -240,6 +240,8 @@ private:
ThreadFromGlobalPool send_thread;
ThreadFromGlobalPool receive_thread;
Poco::Logger * log;
void connect(
const Nodes & node,
Poco::Timespan connection_timeout);
@ -257,7 +259,7 @@ private:
void close();
/// Call all remaining callbacks and watches, passing errors to them.
void finalize(bool error_send, bool error_receive);
void finalize(bool error_send, bool error_receive, const String & reason);
template <typename T>
void write(const T &);

View File

@ -64,3 +64,18 @@
/// Max depth of hierarchical dictionary
#define DBMS_HIERARCHICAL_DICTIONARY_MAX_DEPTH 1000
/// Query profiler cannot work with sanitizers.
/// Sanitizers are using quick "frame walking" stack unwinding (this implies -fno-omit-frame-pointer)
/// And they do unwinding frequently (on every malloc/free, thread/mutex operations, etc).
/// They change %rbp during unwinding and it confuses libunwind if signal comes during sanitizer unwinding
/// and query profiler decide to unwind stack with libunwind at this moment.
///
/// Symptoms: you'll get silent Segmentation Fault - without sanitizer message and without usual ClickHouse diagnostics.
///
/// Look at compiler-rt/lib/sanitizer_common/sanitizer_stacktrace.h
#if !defined(SANITIZER)
#define QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS 1000000000
#else
#define QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS 0
#endif

View File

@ -105,12 +105,16 @@ namespace MySQLReplication
if (query.starts_with("BEGIN") || query.starts_with("COMMIT"))
{
typ = QUERY_EVENT_MULTI_TXN_FLAG;
if (!query.starts_with("COMMIT"))
transaction_complete = false;
}
else if (query.starts_with("XA"))
{
if (query.starts_with("XA ROLLBACK"))
throw ReplicationError("ParseQueryEvent: Unsupported query event:" + query, ErrorCodes::LOGICAL_ERROR);
typ = QUERY_EVENT_XA;
if (!query.starts_with("XA COMMIT"))
transaction_complete = false;
}
else if (query.starts_with("SAVEPOINT"))
{
@ -711,9 +715,26 @@ namespace MySQLReplication
{
switch (event->header.type)
{
case FORMAT_DESCRIPTION_EVENT:
case QUERY_EVENT:
case FORMAT_DESCRIPTION_EVENT: {
binlog_pos = event->header.log_pos;
break;
}
case QUERY_EVENT: {
auto query = std::static_pointer_cast<QueryEvent>(event);
if (query->transaction_complete && pending_gtid)
{
gtid_sets.update(*pending_gtid);
pending_gtid.reset();
}
binlog_pos = event->header.log_pos;
break;
}
case XID_EVENT: {
if (pending_gtid)
{
gtid_sets.update(*pending_gtid);
pending_gtid.reset();
}
binlog_pos = event->header.log_pos;
break;
}
@ -724,9 +745,11 @@ namespace MySQLReplication
break;
}
case GTID_EVENT: {
if (pending_gtid)
gtid_sets.update(*pending_gtid);
auto gtid_event = std::static_pointer_cast<GTIDEvent>(event);
binlog_pos = event->header.log_pos;
gtid_sets.update(gtid_event->gtid);
pending_gtid = gtid_event->gtid;
break;
}
default:
@ -792,6 +815,7 @@ namespace MySQLReplication
{
event = std::make_shared<QueryEvent>(std::move(event_header));
event->parseEvent(event_payload);
position.update(event);
auto query = std::static_pointer_cast<QueryEvent>(event);
switch (query->typ)
@ -803,7 +827,7 @@ namespace MySQLReplication
break;
}
default:
position.update(event);
break;
}
break;
}

View File

@ -383,6 +383,7 @@ namespace MySQLReplication
String schema;
String query;
QueryType typ = QUERY_EVENT_DDL;
bool transaction_complete = true;
QueryEvent(EventHeader && header_)
: EventBase(std::move(header_)), thread_id(0), exec_time(0), schema_len(0), error_code(0), status_len(0)
@ -536,6 +537,9 @@ namespace MySQLReplication
void update(BinlogEventPtr event);
void update(UInt64 binlog_pos_, const String & binlog_name_, const String & gtid_sets_);
void dump(WriteBuffer & out) const;
private:
std::optional<GTID> pending_gtid;
};
class IFlavor : public MySQLProtocol::IMySQLReadPacket

View File

@ -265,8 +265,8 @@ class IColumn;
M(Bool, allow_suspicious_codecs, false, "If it is set to true, allow to specify meaningless compression codecs.", 0) \
M(Bool, allow_experimental_codecs, false, "If it is set to true, allow to specify experimental compression codecs (but we don't have those yet and this option does nothing).", 0) \
M(UInt64, odbc_max_field_size, 1024, "Max size of filed can be read from ODBC dictionary. Long strings are truncated.", 0) \
M(UInt64, query_profiler_real_time_period_ns, 1000000000, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off the real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(UInt64, query_profiler_cpu_time_period_ns, 1000000000, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(UInt64, query_profiler_real_time_period_ns, QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS, "Period for real clock timer of query profiler (in nanoseconds). Set 0 value to turn off the real clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(UInt64, query_profiler_cpu_time_period_ns, QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS, "Period for CPU clock timer of query profiler (in nanoseconds). Set 0 value to turn off the CPU clock query profiler. Recommended value is at least 10000000 (100 times a second) for single queries or 1000000000 (once a second) for cluster-wide profiling.", 0) \
M(Bool, metrics_perf_events_enabled, false, "If enabled, some of the perf events will be measured throughout queries' execution.", 0) \
M(String, metrics_perf_events_list, "", "Comma separated list of perf metrics that will be measured throughout queries' execution. Empty means all events. See PerfEventInfo in sources for the available events.", 0) \
M(Float, opentelemetry_start_trace_probability, 0., "Probability to start an OpenTelemetry trace for an incoming query.", 0) \

View File

@ -6,6 +6,9 @@
#include <base/logger_useful.h>
#include <cstdlib>
namespace
{
/// Detect does epoll_wait with nested epoll fds works correctly.
/// Polling nested epoll fds from epoll_wait is required for async_socket_for_remote and use_hedged_requests.
///
@ -31,6 +34,15 @@ bool nestedEpollWorks(Poco::Logger * log)
return true;
}
/// See also QUERY_PROFILER_DEFAULT_SAMPLE_RATE_NS in Core/Defines.h
#if !defined(SANITIZER)
bool queryProfilerWorks() { return true; }
#else
bool queryProfilerWorks() { return false; }
#endif
}
namespace DB
{
@ -52,6 +64,22 @@ void applySettingsQuirks(Settings & settings, Poco::Logger * log)
LOG_WARNING(log, "use_hedged_requests has been disabled (you can explicitly enable it still)");
}
}
if (!queryProfilerWorks())
{
if (settings.query_profiler_real_time_period_ns)
{
settings.query_profiler_real_time_period_ns = 0;
if (log)
LOG_WARNING(log, "query_profiler_real_time_period_ns has been disabled (due to server had been compiled with sanitizers)");
}
if (settings.query_profiler_cpu_time_period_ns)
{
settings.query_profiler_cpu_time_period_ns = 0;
if (log)
LOG_WARNING(log, "query_profiler_cpu_time_period_ns has been disabled (due to server had been compiled with sanitizers)");
}
}
}
}

View File

@ -13,3 +13,6 @@ target_link_libraries (mysql_protocol PRIVATE dbms)
if(USE_SSL)
target_include_directories (mysql_protocol SYSTEM PRIVATE ${OPENSSL_INCLUDE_DIR})
endif()
add_executable (coro coro.cpp)
target_link_libraries (coro PRIVATE clickhouse_common_io)

189
src/Core/examples/coro.cpp Normal file
View File

@ -0,0 +1,189 @@
#include <cassert>
#include <iostream>
#include <string>
#include <optional>
#include <Common/Exception.h>
#include <base/logger_useful.h>
#include <Poco/ConsoleChannel.h>
#include <Poco/Logger.h>
#include <Poco/AutoPtr.h>
#if defined(__clang__)
#include <experimental/coroutine>
namespace std
{
using namespace experimental::coroutines_v1;
}
#else
#include <coroutine>
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wzero-as-null-pointer-constant"
#endif
template <typename T>
struct suspend_value // NOLINT(readability-identifier-naming)
{
constexpr bool await_ready() const noexcept { return true; } // NOLINT(readability-identifier-naming)
constexpr void await_suspend(std::coroutine_handle<>) const noexcept {} // NOLINT(readability-identifier-naming)
constexpr T await_resume() const noexcept // NOLINT(readability-identifier-naming)
{
std::cout << " ret " << val << std::endl;
return val;
}
T val;
};
template <typename T>
struct Task
{
struct promise_type // NOLINT(readability-identifier-naming)
{
using coro_handle = std::coroutine_handle<promise_type>;
auto get_return_object() { return coro_handle::from_promise(*this); } // NOLINT(readability-identifier-naming)
auto initial_suspend() { return std::suspend_never(); } // NOLINT(readability-identifier-naming)
auto final_suspend() noexcept { return suspend_value<T>{*r->value}; } // NOLINT(readability-identifier-naming)
//void return_void() {}
void return_value(T value_) { r->value = value_; } // NOLINT(readability-identifier-naming)
void unhandled_exception() // NOLINT(readability-identifier-naming)
{
DB::tryLogCurrentException("Logger");
r->exception = std::current_exception(); // NOLINT(bugprone-throw-keyword-missing)
}
explicit promise_type(std::string tag_) : tag(tag_) {}
~promise_type() { std::cout << "~promise_type " << tag << std::endl; }
std::string tag;
coro_handle next;
Task * r = nullptr;
};
using coro_handle = std::coroutine_handle<promise_type>;
bool await_ready() const noexcept { return false; } // NOLINT(readability-identifier-naming)
void await_suspend(coro_handle g) noexcept // NOLINT(readability-identifier-naming)
{
std::cout << " await_suspend " << my.promise().tag << std::endl;
std::cout << " g tag " << g.promise().tag << std::endl;
g.promise().next = my;
}
T await_resume() noexcept // NOLINT(readability-identifier-naming)
{
std::cout << " await_res " << my.promise().tag << std::endl;
return *value;
}
Task(coro_handle handle) : my(handle), tag(handle.promise().tag) // NOLINT(google-explicit-constructor)
{
assert(handle);
my.promise().r = this;
std::cout << " Task " << tag << std::endl;
}
Task(Task &) = delete;
Task(Task &&rhs) : my(rhs.my), tag(rhs.tag)
{
rhs.my = {};
std::cout << " Task&& " << tag << std::endl;
}
static bool resumeImpl(Task *r)
{
if (r->value)
return false;
auto & next = r->my.promise().next;
if (next)
{
if (resumeImpl(next.promise().r))
return true;
next = {};
}
if (!r->value)
{
r->my.resume();
if (r->exception)
std::rethrow_exception(r->exception);
}
return !r->value;
}
bool resume()
{
return resumeImpl(this);
}
T res()
{
return *value;
}
~Task()
{
std::cout << " ~Task " << tag << std::endl;
}
private:
coro_handle my;
std::string tag;
std::optional<T> value;
std::exception_ptr exception;
};
Task<int> boo([[maybe_unused]] std::string tag)
{
std::cout << "x" << std::endl;
co_await std::suspend_always();
std::cout << StackTrace().toString();
std::cout << "y" << std::endl;
co_return 1;
}
Task<int> bar([[maybe_unused]] std::string tag)
{
std::cout << "a" << std::endl;
int res1 = co_await boo("boo1");
std::cout << "b " << res1 << std::endl;
int res2 = co_await boo("boo2");
if (res2 == 1)
throw DB::Exception(1, "hello");
std::cout << "c " << res2 << std::endl;
co_return res1 + res2; // 1 + 1 = 2
}
Task<int> foo([[maybe_unused]] std::string tag)
{
std::cout << "Hello" << std::endl;
auto res1 = co_await bar("bar1");
std::cout << "Coro " << res1 << std::endl;
auto res2 = co_await bar("bar2");
std::cout << "World " << res2 << std::endl;
co_return res1 * res2; // 2 * 2 = 4
}
int main()
{
Poco::AutoPtr<Poco::ConsoleChannel> app_channel(new Poco::ConsoleChannel(std::cerr));
Poco::Logger::root().setChannel(app_channel);
Poco::Logger::root().setLevel("trace");
LOG_INFO(&Poco::Logger::get(""), "Starting");
try
{
auto t = foo("foo");
std::cout << ".. started" << std::endl;
while (t.resume())
std::cout << ".. yielded" << std::endl;
std::cout << ".. done: " << t.res() << std::endl;
}
catch (DB::Exception & e)
{
std::cout << "Got exception " << e.what() << std::endl;
std::cout << e.getStackTraceString() << std::endl;
}
}

View File

@ -1,114 +0,0 @@
#include <DataStreams/ColumnGathererStream.h>
#include <base/logger_useful.h>
#include <Common/typeid_cast.h>
#include <Common/formatReadable.h>
#include <IO/WriteHelpers.h>
#include <iomanip>
namespace DB
{
namespace ErrorCodes
{
extern const int INCOMPATIBLE_COLUMNS;
extern const int INCORRECT_NUMBER_OF_COLUMNS;
extern const int EMPTY_DATA_PASSED;
extern const int RECEIVED_EMPTY_DATA;
}
ColumnGathererStream::ColumnGathererStream(
const String & column_name_, const BlockInputStreams & source_streams, ReadBuffer & row_sources_buf_,
size_t block_preferred_size_)
: column_name(column_name_), sources(source_streams.size()), row_sources_buf(row_sources_buf_)
, block_preferred_size(block_preferred_size_), log(&Poco::Logger::get("ColumnGathererStream"))
{
if (source_streams.empty())
throw Exception("There are no streams to gather", ErrorCodes::EMPTY_DATA_PASSED);
children.assign(source_streams.begin(), source_streams.end());
for (size_t i = 0; i < children.size(); ++i)
{
const Block & header = children[i]->getHeader();
/// Sometimes MergeTreeReader injects additional column with partitioning key
if (header.columns() > 2)
throw Exception(
"Block should have 1 or 2 columns, but contains " + toString(header.columns()),
ErrorCodes::INCORRECT_NUMBER_OF_COLUMNS);
if (i == 0)
{
column.name = column_name;
column.type = header.getByName(column_name).type;
column.column = column.type->createColumn();
}
else if (header.getByName(column_name).column->getName() != column.column->getName())
throw Exception("Column types don't match", ErrorCodes::INCOMPATIBLE_COLUMNS);
}
}
Block ColumnGathererStream::readImpl()
{
/// Special case: single source and there are no skipped rows
if (children.size() == 1 && row_sources_buf.eof() && !source_to_fully_copy)
return children[0]->read();
if (!source_to_fully_copy && row_sources_buf.eof())
return Block();
MutableColumnPtr output_column = column.column->cloneEmpty();
output_block = Block{column.cloneEmpty()};
/// Surprisingly this call may directly change output_block, bypassing
/// output_column. See ColumnGathererStream::gather.
output_column->gather(*this);
if (!output_column->empty())
output_block.getByPosition(0).column = std::move(output_column);
return output_block;
}
void ColumnGathererStream::fetchNewBlock(Source & source, size_t source_num)
{
try
{
source.block = children[source_num]->read();
source.update(column_name);
}
catch (Exception & e)
{
e.addMessage("Cannot fetch required block. Stream " + children[source_num]->getName() + ", part " + toString(source_num));
throw;
}
if (0 == source.size)
{
throw Exception("Fetched block is empty. Stream " + children[source_num]->getName() + ", part " + toString(source_num),
ErrorCodes::RECEIVED_EMPTY_DATA);
}
}
void ColumnGathererStream::readSuffixImpl()
{
const BlockStreamProfileInfo & profile_info = getProfileInfo();
/// Don't print info for small parts (< 10M rows)
if (profile_info.rows < 10000000)
return;
double seconds = profile_info.total_stopwatch.elapsedSeconds();
if (!seconds)
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in 0 sec.",
column_name, static_cast<double>(profile_info.bytes) / profile_info.rows);
else
LOG_DEBUG(log, "Gathered column {} ({} bytes/elem.) in {} sec., {} rows/sec., {}/sec.",
column_name, static_cast<double>(profile_info.bytes) / profile_info.rows, seconds,
profile_info.rows / seconds, ReadableSize(profile_info.bytes / seconds));
}
}

View File

@ -1,39 +0,0 @@
#include <Interpreters/ExpressionActions.h>
#include <DataStreams/ExpressionBlockInputStream.h>
namespace DB
{
ExpressionBlockInputStream::ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_)
: expression(expression_)
{
children.push_back(input);
cached_header = children.back()->getHeader();
expression->execute(cached_header, true);
}
String ExpressionBlockInputStream::getName() const { return "Expression"; }
Block ExpressionBlockInputStream::getTotals()
{
totals = children.back()->getTotals();
expression->execute(totals);
return totals;
}
Block ExpressionBlockInputStream::getHeader() const
{
return cached_header.cloneEmpty();
}
Block ExpressionBlockInputStream::readImpl()
{
Block res = children.back()->read();
if (res)
expression->execute(res);
return res;
}
}

View File

@ -1,52 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
class ExpressionActions;
/** Executes a certain expression over the block.
* The expression consists of column identifiers from the block, constants, common functions.
* For example: hits * 2 + 3, url LIKE '%yandex%'
* The expression processes each row independently of the others.
*/
class ExpressionBlockInputStream : public IBlockInputStream
{
public:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
ExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_);
String getName() const override;
Block getTotals() override;
Block getHeader() const override;
protected:
ExpressionActionsPtr expression;
Block readImpl() override;
private:
Block cached_header;
};
/// ExpressionBlockInputStream that could generate many out blocks for single input block.
class InflatingExpressionBlockInputStream : public ExpressionBlockInputStream
{
public:
InflatingExpressionBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_)
: ExpressionBlockInputStream(input, expression_)
{}
protected:
Block readImpl() override;
private:
ExtraBlockPtr not_processed;
size_t action_number = 0;
};
}

View File

@ -1,4 +1,4 @@
#include "InternalTextLogsRowOutputStream.h"
#include "InternalTextLogs.h"
#include <Core/Block.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Common/typeid_cast.h>
@ -13,12 +13,7 @@
namespace DB
{
Block InternalTextLogsRowOutputStream::getHeader() const
{
return InternalTextLogsQueue::getSampleBlock();
}
void InternalTextLogsRowOutputStream::write(const Block & block)
void InternalTextLogs::write(const Block & block)
{
const auto & array_event_time = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time").column).getData();
const auto & array_microseconds = typeid_cast<const ColumnUInt32 &>(*block.getByName("event_time_microseconds").column).getData();

View File

@ -9,16 +9,15 @@ namespace DB
/// Prints internal server logs
/// Input blocks have to have the same structure as SystemLogsQueue::getSampleBlock()
/// NOTE: IRowOutputFormat does not suite well for this case
class InternalTextLogsRowOutputStream : public IBlockOutputStream
class InternalTextLogs
{
public:
InternalTextLogsRowOutputStream(WriteBuffer & buf_out, bool color_) : wb(buf_out), color(color_) {}
InternalTextLogs(WriteBuffer & buf_out, bool color_) : wb(buf_out), color(color_) {}
Block getHeader() const override;
void write(const Block & block) override;
void write(const Block & block);
void flush() override
void flush()
{
wb.next();
}

View File

@ -1,28 +0,0 @@
#include <DataStreams/MaterializingBlockInputStream.h>
#include <DataStreams/materializeBlock.h>
namespace DB
{
MaterializingBlockInputStream::MaterializingBlockInputStream(const BlockInputStreamPtr & input)
{
children.push_back(input);
}
String MaterializingBlockInputStream::getName() const
{
return "Materializing";
}
Block MaterializingBlockInputStream::getHeader() const
{
return materializeBlock(children.back()->getHeader());
}
Block MaterializingBlockInputStream::readImpl()
{
return materializeBlock(children.back()->read());
}
}

View File

@ -1,21 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/** Converts columns-constants to full columns ("materializes" them).
*/
class MaterializingBlockInputStream : public IBlockInputStream
{
public:
MaterializingBlockInputStream(const BlockInputStreamPtr & input);
String getName() const override;
Block getHeader() const override;
protected:
Block readImpl() override;
};
}

View File

@ -8,7 +8,7 @@
#include <Common/typeid_cast.h>
#include <base/range.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeReader.h>
#include <DataTypes/DataTypeLowCardinality.h>
@ -23,17 +23,17 @@ namespace ErrorCodes
}
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_)
NativeReader::NativeReader(ReadBuffer & istr_, UInt64 server_revision_)
: istr(istr_), server_revision(server_revision_)
{
}
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_)
NativeReader::NativeReader(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_)
: istr(istr_), header(header_), server_revision(server_revision_)
{
}
NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
NativeReader::NativeReader(ReadBuffer & istr_, UInt64 server_revision_,
IndexForNativeFormat::Blocks::const_iterator index_block_it_,
IndexForNativeFormat::Blocks::const_iterator index_block_end_)
: istr(istr_), server_revision(server_revision_),
@ -57,21 +57,13 @@ NativeBlockInputStream::NativeBlockInputStream(ReadBuffer & istr_, UInt64 server
}
// also resets few vars from IBlockInputStream (I didn't want to propagate resetParser upthere)
void NativeBlockInputStream::resetParser()
void NativeReader::resetParser()
{
istr_concrete = nullptr;
use_index = false;
#ifndef NDEBUG
read_prefix_is_called = false;
read_suffix_is_called = false;
#endif
is_cancelled.store(false);
is_killed.store(false);
}
void NativeBlockInputStream::readData(const IDataType & type, ColumnPtr & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)
void NativeReader::readData(const IDataType & type, ColumnPtr & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint)
{
ISerialization::DeserializeBinaryBulkSettings settings;
settings.getter = [&](ISerialization::SubstreamPath) -> ReadBuffer * { return &istr; };
@ -91,13 +83,13 @@ void NativeBlockInputStream::readData(const IDataType & type, ColumnPtr & column
}
Block NativeBlockInputStream::getHeader() const
Block NativeReader::getHeader() const
{
return header;
}
Block NativeBlockInputStream::readImpl()
Block NativeReader::read()
{
Block res;
@ -215,7 +207,7 @@ Block NativeBlockInputStream::readImpl()
return res;
}
void NativeBlockInputStream::updateAvgValueSizeHints(const Block & block)
void NativeReader::updateAvgValueSizeHints(const Block & block)
{
auto rows = block.rows();
if (rows < 10)

View File

@ -57,32 +57,28 @@ struct IndexForNativeFormat
* Can also be used to store data on disk.
* In this case, can use the index.
*/
class NativeBlockInputStream : public IBlockInputStream
class NativeReader
{
public:
/// If a non-zero server_revision is specified, additional block information may be expected and read.
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_);
NativeReader(ReadBuffer & istr_, UInt64 server_revision_);
/// For cases when data structure (header) is known in advance.
/// NOTE We may use header for data validation and/or type conversions. It is not implemented.
NativeBlockInputStream(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_);
NativeReader(ReadBuffer & istr_, const Block & header_, UInt64 server_revision_);
/// For cases when we have an index. It allows to skip columns. Only columns specified in the index will be read.
NativeBlockInputStream(ReadBuffer & istr_, UInt64 server_revision_,
NativeReader(ReadBuffer & istr_, UInt64 server_revision_,
IndexForNativeFormat::Blocks::const_iterator index_block_it_,
IndexForNativeFormat::Blocks::const_iterator index_block_end_);
String getName() const override { return "Native"; }
static void readData(const IDataType & type, ColumnPtr & column, ReadBuffer & istr, size_t rows, double avg_value_size_hint);
Block getHeader() const override;
Block getHeader() const;
void resetParser();
protected:
Block readImpl() override;
Block read();
private:
ReadBuffer & istr;

View File

@ -6,7 +6,7 @@
#include <Compression/CompressedWriteBuffer.h>
#include <DataStreams/MarkInCompressedFile.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NativeWriter.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeLowCardinality.h>
@ -20,7 +20,7 @@ namespace ErrorCodes
}
NativeBlockOutputStream::NativeBlockOutputStream(
NativeWriter::NativeWriter(
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_, bool remove_low_cardinality_,
WriteBuffer * index_ostr_, size_t initial_size_of_file_)
: ostr(ostr_), client_revision(client_revision_), header(header_),
@ -35,7 +35,7 @@ NativeBlockOutputStream::NativeBlockOutputStream(
}
void NativeBlockOutputStream::flush()
void NativeWriter::flush()
{
ostr.next();
}
@ -62,7 +62,7 @@ static void writeData(const IDataType & type, const ColumnPtr & column, WriteBuf
}
void NativeBlockOutputStream::write(const Block & block)
void NativeWriter::write(const Block & block)
{
/// Additional information about the block.
if (client_revision > 0)

View File

@ -1,8 +1,8 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <base/types.h>
#include <DataTypes/IDataType.h>
#include <Core/Block.h>
namespace DB
{
@ -17,20 +17,20 @@ class CompressedWriteBuffer;
* A stream can be specified to write the index. The index contains offsets to each part of each column.
* If an `append` is made to an existing file, and you need to write the index, then specify `initial_size_of_file`.
*/
class NativeBlockOutputStream : public IBlockOutputStream
class NativeWriter
{
public:
/** If non-zero client_revision is specified, additional block information can be written.
*/
NativeBlockOutputStream(
NativeWriter(
WriteBuffer & ostr_, UInt64 client_revision_, const Block & header_, bool remove_low_cardinality_ = false,
WriteBuffer * index_ostr_ = nullptr, size_t initial_size_of_file_ = 0);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void flush() override;
Block getHeader() const { return header; }
void write(const Block & block);
void flush();
String getContentType() const override { return "application/octet-stream"; }
static String getContentType() { return "application/octet-stream"; }
private:
WriteBuffer & ostr;

View File

@ -1,69 +0,0 @@
#include <DataStreams/RemoteBlockInputStream.h>
#include <Interpreters/Context.h>
namespace DB
{
RemoteBlockInputStream::RemoteBlockInputStream(
Connection & connection,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query_executor(connection, query_, header_, context_, throttler, scalars_, external_tables_, stage_)
{
init();
}
RemoteBlockInputStream::RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query_executor(pool, std::move(connections), query_, header_, context_, throttler, scalars_, external_tables_, stage_)
{
init();
}
RemoteBlockInputStream::RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_, QueryProcessingStage::Enum stage_)
: query_executor(pool, query_, header_, context_, throttler, scalars_, external_tables_, stage_)
{
init();
}
void RemoteBlockInputStream::init()
{
query_executor.setProgressCallback([this](const Progress & progress) { progressImpl(progress); });
query_executor.setProfileInfoCallback([this](const BlockStreamProfileInfo & info_) { info.setFrom(info_, true); });
query_executor.setLogger(log);
}
void RemoteBlockInputStream::cancel(bool kill)
{
if (kill)
is_killed = true;
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
query_executor.cancel();
}
Block RemoteBlockInputStream::readImpl()
{
auto block = query_executor.read();
if (isCancelledOrThrowIfKilled())
return Block();
return block;
}
void RemoteBlockInputStream::readSuffixImpl()
{
query_executor.finish();
}
}

View File

@ -1,78 +0,0 @@
#pragma once
#include <optional>
#include <base/logger_useful.h>
#include <DataStreams/IBlockInputStream.h>
#include <Common/Throttler.h>
#include <Client/ConnectionPool.h>
#include <Client/MultiplexedConnections.h>
#include <Interpreters/Cluster.h>
#include <DataStreams/RemoteQueryExecutor.h>
namespace DB
{
class Context;
/** This class allows one to launch queries on remote replicas of one shard and get results
*/
class RemoteBlockInputStream : public IBlockInputStream
{
public:
/// Takes already set connection.
RemoteBlockInputStream(
Connection & connection,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Accepts several connections already taken from pool.
RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
std::vector<IConnectionPool::Entry> && connections,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Takes a pool and gets one or several connections from it.
RemoteBlockInputStream(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete);
/// Set the query_id. For now, used by performance test to later find the query
/// in the server query_log. Must be called before sending the query to the server.
void setQueryId(const std::string & query_id) { query_executor.setQueryId(query_id); }
/// Specify how we allocate connections on a shard.
void setPoolMode(PoolMode pool_mode) { query_executor.setPoolMode(pool_mode); }
void setMainTable(StorageID main_table_) { query_executor.setMainTable(std::move(main_table_)); }
/// Prevent default progress notification because progress' callback is called by its own.
void progress(const Progress & /*value*/) override {}
void cancel(bool kill) override;
String getName() const override { return "Remote"; }
Block getHeader() const override { return query_executor.getHeader(); }
Block getTotals() override { return query_executor.getTotals(); }
Block getExtremes() override { return query_executor.getExtremes(); }
protected:
Block readImpl() override;
void readSuffixImpl() override;
private:
RemoteQueryExecutor query_executor;
Poco::Logger * log = &Poco::Logger::get("RemoteBlockInputStream");
void init();
};
}

View File

@ -16,18 +16,17 @@
namespace DB
{
TTLBlockInputStream::TTLBlockInputStream(
const BlockInputStreamPtr & input_,
TTLTransform::TTLTransform(
const Block & header_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time_,
bool force_)
: data_part(data_part_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLBlockInputStream)"))
: IAccumulatingTransform(header_, header_)
, data_part(data_part_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLTransform)"))
{
children.push_back(input_);
header = children.at(0)->getHeader();
auto old_ttl_infos = data_part->ttl_infos;
if (metadata_snapshot_->hasRowsTTL())
@ -50,7 +49,7 @@ TTLBlockInputStream::TTLBlockInputStream(
for (const auto & group_by_ttl : metadata_snapshot_->getGroupByTTLs())
algorithms.emplace_back(std::make_unique<TTLAggregationAlgorithm>(
group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_, header, storage_));
group_by_ttl, old_ttl_infos.group_by_ttl[group_by_ttl.result_column], current_time_, force_, getInputPort().getHeader(), storage_));
if (metadata_snapshot_->hasAnyColumnTTL())
{
@ -98,22 +97,40 @@ Block reorderColumns(Block block, const Block & header)
return res;
}
Block TTLBlockInputStream::readImpl()
void TTLTransform::consume(Chunk chunk)
{
if (all_data_dropped)
return {};
{
finishConsume();
return;
}
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
auto block = children.at(0)->read();
for (const auto & algorithm : algorithms)
algorithm->execute(block);
if (!block)
return block;
return;
return reorderColumns(std::move(block), header);
size_t num_rows = block.rows();
setReadyChunk(Chunk(reorderColumns(std::move(block), getOutputPort().getHeader()).getColumns(), num_rows));
}
void TTLBlockInputStream::readSuffixImpl()
Chunk TTLTransform::generate()
{
Block block;
for (const auto & algorithm : algorithms)
algorithm->execute(block);
if (!block)
return {};
size_t num_rows = block.rows();
return Chunk(reorderColumns(std::move(block), getOutputPort().getHeader()).getColumns(), num_rows);
}
void TTLTransform::finalize()
{
data_part->ttl_infos = {};
for (const auto & algorithm : algorithms)
@ -126,4 +143,13 @@ void TTLBlockInputStream::readSuffixImpl()
}
}
IProcessor::Status TTLTransform::prepare()
{
auto status = IAccumulatingTransform::prepare();
if (status == Status::Finished)
finalize();
return status;
}
}

View File

@ -1,5 +1,5 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Processors/IAccumulatingTransform.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h>
@ -12,11 +12,11 @@
namespace DB
{
class TTLBlockInputStream : public IBlockInputStream
class TTLTransform : public IAccumulatingTransform
{
public:
TTLBlockInputStream(
const BlockInputStreamPtr & input_,
TTLTransform(
const Block & header_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_,
@ -25,13 +25,15 @@ public:
);
String getName() const override { return "TTL"; }
Block getHeader() const override { return header; }
Status prepare() override;
protected:
Block readImpl() override;
void consume(Chunk chunk) override;
Chunk generate() override;
/// Finalizes ttl infos and updates data part
void readSuffixImpl() override;
void finalize();
private:
std::vector<TTLAlgorithmPtr> algorithms;
@ -41,7 +43,6 @@ private:
/// ttl_infos and empty_columns are updating while reading
const MergeTreeData::MutableDataPartPtr & data_part;
Poco::Logger * log;
Block header;
};
}

View File

@ -4,18 +4,17 @@
namespace DB
{
TTLCalcInputStream::TTLCalcInputStream(
const BlockInputStreamPtr & input_,
TTLCalcTransform::TTLCalcTransform(
const Block & header_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time_,
bool force_)
: data_part(data_part_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLCalcInputStream)"))
: IAccumulatingTransform(header_, header_)
, data_part(data_part_)
, log(&Poco::Logger::get(storage_.getLogName() + " (TTLCalcTransform)"))
{
children.push_back(input_);
header = children.at(0)->getHeader();
auto old_ttl_infos = data_part->ttl_infos;
if (metadata_snapshot_->hasRowsTTL())
@ -51,27 +50,52 @@ TTLCalcInputStream::TTLCalcInputStream(
recompression_ttl, TTLUpdateField::RECOMPRESSION_TTL, recompression_ttl.result_column, old_ttl_infos.recompression_ttl[recompression_ttl.result_column], current_time_, force_));
}
Block TTLCalcInputStream::readImpl()
void TTLCalcTransform::consume(Chunk chunk)
{
auto block = children.at(0)->read();
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
for (const auto & algorithm : algorithms)
algorithm->execute(block);
if (!block)
return block;
return;
Block res;
for (const auto & col : header)
res.insert(block.getByName(col.name));
Chunk res;
for (const auto & col : getOutputPort().getHeader())
res.addColumn(block.getByName(col.name).column);
setReadyChunk(std::move(res));
}
Chunk TTLCalcTransform::generate()
{
Block block;
for (const auto & algorithm : algorithms)
algorithm->execute(block);
if (!block)
return {};
Chunk res;
for (const auto & col : getOutputPort().getHeader())
res.addColumn(block.getByName(col.name).column);
return res;
}
void TTLCalcInputStream::readSuffixImpl()
void TTLCalcTransform::finalize()
{
data_part->ttl_infos = {};
for (const auto & algorithm : algorithms)
algorithm->finalize(data_part);
}
IProcessor::Status TTLCalcTransform::prepare()
{
auto status = IAccumulatingTransform::prepare();
if (status == Status::Finished)
finalize();
return status;
}
}

View File

@ -1,5 +1,5 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Processors/IAccumulatingTransform.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h>
@ -11,11 +11,11 @@
namespace DB
{
class TTLCalcInputStream : public IBlockInputStream
class TTLCalcTransform : public IAccumulatingTransform
{
public:
TTLCalcInputStream(
const BlockInputStreamPtr & input_,
TTLCalcTransform(
const Block & header_,
const MergeTreeData & storage_,
const StorageMetadataPtr & metadata_snapshot_,
const MergeTreeData::MutableDataPartPtr & data_part_,
@ -24,13 +24,14 @@ public:
);
String getName() const override { return "TTL_CALC"; }
Block getHeader() const override { return header; }
Status prepare() override;
protected:
Block readImpl() override;
void consume(Chunk chunk) override;
Chunk generate() override;
/// Finalizes ttl infos and updates data part
void readSuffixImpl() override;
void finalize();
private:
std::vector<TTLAlgorithmPtr> algorithms;
@ -38,7 +39,6 @@ private:
/// ttl_infos and empty_columns are updating while reading
const MergeTreeData::MutableDataPartPtr & data_part;
Poco::Logger * log;
Block header;
};
}

View File

@ -1,7 +1,7 @@
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/NativeReader.h>
#include <DataStreams/NativeWriter.h>
#include <DataStreams/copyData.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/ISource.h>
@ -17,13 +17,13 @@ namespace DB
TemporaryFileStream::TemporaryFileStream(const std::string & path)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, DBMS_TCP_PROTOCOL_VERSION))
, block_in(std::make_unique<NativeReader>(compressed_in, DBMS_TCP_PROTOCOL_VERSION))
{}
TemporaryFileStream::TemporaryFileStream(const std::string & path, const Block & header_)
: file_in(path)
, compressed_in(file_in)
, block_in(std::make_shared<NativeBlockInputStream>(compressed_in, header_, 0))
, block_in(std::make_unique<NativeReader>(compressed_in, header_, 0))
{}
/// Flush data from input stream into file for future reading
@ -31,18 +31,15 @@ void TemporaryFileStream::write(const std::string & path, const Block & header,
{
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf, CompressionCodecFactory::instance().get(codec, {}));
NativeBlockOutputStream output(compressed_buf, 0, header);
NativeWriter output(compressed_buf, 0, header);
auto pipeline = QueryPipelineBuilder::getPipeline(std::move(builder));
PullingPipelineExecutor executor(pipeline);
output.writePrefix();
Block block;
while (executor.pull(block))
output.write(block);
output.writeSuffix();
compressed_buf.finalize();
}

View File

@ -5,6 +5,7 @@
#include <Compression/CompressedReadBuffer.h>
#include <IO/ReadBufferFromFile.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/NativeReader.h>
namespace DB
{
@ -14,7 +15,7 @@ struct TemporaryFileStream
{
ReadBufferFromFile file_in;
CompressedReadBuffer compressed_in;
BlockInputStreamPtr block_in;
std::unique_ptr<NativeReader> block_in;
explicit TemporaryFileStream(const std::string & path);
TemporaryFileStream(const std::string & path, const Block & header_);

Some files were not shown because too many files have changed in this diff Show More