Merge branch 'master' into mysql

This commit is contained in:
alexey-milovidov 2019-08-04 06:41:58 +03:00 committed by GitHub
commit 9a695a4113
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
349 changed files with 7203 additions and 2365 deletions

5
.gitmodules vendored
View File

@ -93,10 +93,13 @@
url = https://github.com/ClickHouse-Extras/libunwind.git
[submodule "contrib/simdjson"]
path = contrib/simdjson
url = https://github.com/lemire/simdjson.git
url = https://github.com/ClickHouse-Extras/simdjson.git
[submodule "contrib/rapidjson"]
path = contrib/rapidjson
url = https://github.com/Tencent/rapidjson
[submodule "contrib/mimalloc"]
path = contrib/mimalloc
url = https://github.com/ClickHouse-Extras/mimalloc
[submodule "contrib/fastops"]
path = contrib/fastops
url = https://github.com/ClickHouse-Extras/fastops

View File

@ -106,6 +106,10 @@ endif ()
if (COMPILER_CLANG)
# clang: warning: argument unused during compilation: '-specs=/usr/share/dpkg/no-pie-compile.specs' [-Wunused-command-line-argument]
set (COMMON_WARNING_FLAGS "${COMMON_WARNING_FLAGS} -Wno-unused-command-line-argument")
# generate ranges for fast "addr2line" search
if (NOT CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE")
set(COMPILER_FLAGS "${COMPILER_FLAGS} -gdwarf-aranges")
endif ()
endif ()
option (ENABLE_TESTS "Enables tests" ON)
@ -190,10 +194,13 @@ endif ()
option(WITH_COVERAGE "Build with coverage." 0)
if(WITH_COVERAGE AND COMPILER_CLANG)
set(COMPILER_FLAGS "${COMPILER_FLAGS} -fprofile-instr-generate -fcoverage-mapping")
# If we want to disable coverage for specific translation units
set(WITHOUT_COVERAGE "-fno-profile-instr-generate -fno-coverage-mapping")
endif()
if(WITH_COVERAGE AND COMPILER_GCC)
set(COMPILER_FLAGS "${COMPILER_FLAGS} -fprofile-arcs -ftest-coverage")
set(COVERAGE_OPTION "-lgcov")
set(WITHOUT_COVERAGE "-fno-profile-arcs -fno-test-coverage")
endif()
set (CMAKE_BUILD_COLOR_MAKEFILE ON)
@ -255,10 +262,10 @@ if (USE_STATIC_LIBRARIES AND HAVE_NO_PIE)
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${FLAG_NO_PIE}")
endif ()
if (NOT SANITIZE)
if (NOT SANITIZE AND NOT SPLIT_SHARED_LIBRARIES)
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -Wl,--no-undefined")
set (CMAKE_SHARED_LINKER_FLAGS "${CMAKE_SHARED_LINKER_FLAGS} -Wl,--no-undefined")
endif()
endif ()
include (cmake/find_unwind.cmake)
@ -304,8 +311,11 @@ if (OS_LINUX AND NOT UNBUNDLED AND (GLIBC_COMPATIBILITY OR USE_INTERNAL_UNWIND_L
# There are two variants of C++ library: libc++ (from LLVM compiler infrastructure) and libstdc++ (from GCC).
if (USE_INTERNAL_UNWIND_LIBRARY_FOR_EXCEPTION_HANDLING)
# TODO: Allow to use non-static library as well.
set (EXCEPTION_HANDLING_LIBRARY "${ClickHouse_BINARY_DIR}/contrib/libunwind-cmake/libunwind_static${${CMAKE_POSTFIX_VARIABLE}}.a")
if (USE_STATIC_LIBRARIES)
set (EXCEPTION_HANDLING_LIBRARY "${ClickHouse_BINARY_DIR}/contrib/libunwind-cmake/libunwind_static${${CMAKE_POSTFIX_VARIABLE}}.a")
else ()
set (EXCEPTION_HANDLING_LIBRARY "${ClickHouse_BINARY_DIR}/contrib/libunwind-cmake/libunwind_shared${${CMAKE_POSTFIX_VARIABLE}}.so")
endif ()
else ()
set (EXCEPTION_HANDLING_LIBRARY "-lgcc_eh")
endif ()
@ -321,7 +331,7 @@ if (OS_LINUX AND NOT UNBUNDLED AND (GLIBC_COMPATIBILITY OR USE_INTERNAL_UNWIND_L
set (DEFAULT_LIBS "${DEFAULT_LIBS} -Wl,-Bstatic ${LIBCXX_LIBS} ${EXCEPTION_HANDLING_LIBRARY} ${BUILTINS_LIB_PATH} -Wl,-Bdynamic")
else ()
set (DEFAULT_LIBS "${DEFAULT_LIBS} -Wl,-Bstatic -lstdc++ ${EXCEPTION_HANDLING_LIBRARY} ${COVERAGE_OPTION} ${BUILTINS_LIB_PATH} -Wl,-Bdynamic")
set (DEFAULT_LIBS "${DEFAULT_LIBS} -Wl,-Bstatic -lstdc++ -lstdc++fs ${EXCEPTION_HANDLING_LIBRARY} ${COVERAGE_OPTION} ${BUILTINS_LIB_PATH} -Wl,-Bdynamic")
endif ()
# Linking with GLIBC prevents portability of binaries to older systems.
@ -345,6 +355,10 @@ if (OS_LINUX AND NOT UNBUNDLED AND (GLIBC_COMPATIBILITY OR USE_INTERNAL_UNWIND_L
message(STATUS "Default libraries: ${DEFAULT_LIBS}")
endif ()
if (NOT GLIBC_COMPATIBILITY)
set (M_LIBRARY m)
endif ()
if (DEFAULT_LIBS)
# Add default libs to all targets as the last dependency.
set(CMAKE_CXX_STANDARD_LIBRARIES ${DEFAULT_LIBS})
@ -462,6 +476,7 @@ include (cmake/find_hyperscan.cmake)
include (cmake/find_mimalloc.cmake)
include (cmake/find_simdjson.cmake)
include (cmake/find_rapidjson.cmake)
include (cmake/find_fastops.cmake)
find_contrib_lib(cityhash)
find_contrib_lib(farmhash)
@ -554,4 +569,5 @@ if (GLIBC_COMPATIBILITY OR USE_INTERNAL_UNWIND_LIBRARY_FOR_EXCEPTION_HANDLING)
add_default_dependencies(base64)
add_default_dependencies(readpassphrase)
add_default_dependencies(unwind_static)
add_default_dependencies(fastops)
endif ()

View File

@ -13,6 +13,7 @@ ClickHouse is an open-source column-oriented database management system that all
* You can also [fill this form](https://forms.yandex.com/surveys/meet-yandex-clickhouse-team/) to meet Yandex ClickHouse team in person.
## Upcoming Events
* [ClickHouse Meetup in Saint Petersburg](https://yandex.ru/promo/clickhouse/saint-petersburg-2019) on July 27.
* [ClickHouse Meetup in Mountain View](https://www.eventbrite.com/e/meetup-clickhouse-in-the-south-bay-registration-65935505873) on August 13.
* [ClickHouse Meetup in Moscow](https://yandex.ru/promo/clickhouse/moscow-2019) on September 5.
* [ClickHouse Meetup in Shenzhen](https://www.huodongxing.com/event/3483759917300) on October 20.
* [ClickHouse Meetup in Shanghai](https://www.huodongxing.com/event/4483760336000) on October 27.

View File

@ -129,7 +129,7 @@ find_package_handle_standard_args(ODBC
)
if(ODBC_FOUND)
set(ODBC_LIBRARIES ${ODBC_LIBRARY} ${_odbc_required_libs_paths})
set(ODBC_LIBRARIES ${ODBC_LIBRARY} ${_odbc_required_libs_paths} ${LTDL_LIBRARY})
set(ODBC_INCLUDE_DIRS ${ODBC_INCLUDE_DIR})
set(ODBC_DEFINITIONS ${PC_ODBC_CFLAGS_OTHER})
endif()

15
cmake/find_fastops.cmake Normal file
View File

@ -0,0 +1,15 @@
option (ENABLE_FASTOPS "Enable fast vectorized mathematical functions library by Michael Parakhin" ${NOT_UNBUNDLED})
if (ENABLE_FASTOPS)
if(NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/fastops/fastops/fastops.h")
message(FATAL_ERROR "submodule contrib/fastops is missing. to fix try run: \n git submodule update --init --recursive")
set(USE_FASTOPS 0)
endif()
set (USE_FASTOPS 1)
set (FASTOPS_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/fastops/)
set (FASTOPS_LIBRARY fastops)
else ()
set(USE_FASTOPS 0)
endif ()
message (STATUS "Using fastops")

View File

@ -16,7 +16,12 @@ list(APPEND dirs ${dirs1})
get_property (dirs1 TARGET roaring PROPERTY INCLUDE_DIRECTORIES)
list(APPEND dirs ${dirs1})
if (USE_INTERNAL_BOOST_LIBRARY)
if (TARGET double-conversion)
get_property (dirs1 TARGET double-conversion PROPERTY INCLUDE_DIRECTORIES)
list(APPEND dirs ${dirs1})
endif ()
if (TARGET ${Boost_PROGRAM_OPTIONS_LIBRARY})
get_property (dirs1 TARGET ${Boost_PROGRAM_OPTIONS_LIBRARY} PROPERTY INCLUDE_DIRECTORIES)
list(APPEND dirs ${dirs1})
endif ()

View File

@ -330,3 +330,7 @@ endif()
if (USE_MIMALLOC)
add_subdirectory (mimalloc)
endif()
if (USE_FASTOPS)
add_subdirectory (fastops-cmake)
endif()

View File

@ -44,6 +44,7 @@ set( thriftcpp_threads_SOURCES
add_library(${THRIFT_LIBRARY} ${thriftcpp_SOURCES} ${thriftcpp_threads_SOURCES})
set_target_properties(${THRIFT_LIBRARY} PROPERTIES CXX_STANDARD 14) # REMOVE after https://github.com/apache/thrift/pull/1641
target_include_directories(${THRIFT_LIBRARY} SYSTEM PUBLIC ${ClickHouse_SOURCE_DIR}/contrib/thrift/lib/cpp/src PRIVATE ${Boost_INCLUDE_DIRS})
target_link_libraries(${THRIFT_LIBRARY} PRIVATE Threads::Threads)

View File

@ -31,3 +31,7 @@ set(SRCS
add_library(brotli ${SRCS})
target_include_directories(brotli PUBLIC ${BROTLI_SOURCE_DIR}/include)
if(M_LIBRARY)
target_link_libraries(brotli PRIVATE ${M_LIBRARY})
endif()

View File

@ -10,5 +10,4 @@ ${LIBRARY_DIR}/double-conversion/fast-dtoa.cc
${LIBRARY_DIR}/double-conversion/fixed-dtoa.cc
${LIBRARY_DIR}/double-conversion/strtod.cc)
target_include_directories(double-conversion SYSTEM PUBLIC "${LIBRARY_DIR}")
target_include_directories(double-conversion SYSTEM BEFORE PUBLIC "${LIBRARY_DIR}")

1
contrib/fastops vendored Submodule

@ -0,0 +1 @@
Subproject commit d2c85c5d6549cfd648a7f31ef7b14341881ff8ae

View File

@ -0,0 +1,20 @@
set(LIBRARY_DIR ${ClickHouse_SOURCE_DIR}/contrib/fastops)
set(SRCS "")
if(HAVE_AVX)
set (SRCS ${SRCS} ${LIBRARY_DIR}/fastops/avx/ops_avx.cpp ${LIBRARY_DIR}/fastops/core/FastIntrinsics.cpp)
set_source_files_properties(${LIBRARY_DIR}/fastops/avx/ops_avx.cpp PROPERTIES COMPILE_FLAGS "-mavx -DNO_AVX2")
set_source_files_properties(${LIBRARY_DIR}/fastops/core/FastIntrinsics.cpp PROPERTIES COMPILE_FLAGS "-mavx -DNO_AVX2")
endif()
if(HAVE_AVX2)
set (SRCS ${SRCS} ${LIBRARY_DIR}/fastops/avx2/ops_avx2.cpp)
set_source_files_properties(${LIBRARY_DIR}/fastops/avx2/ops_avx2.cpp PROPERTIES COMPILE_FLAGS "-mavx2 -mfma")
endif()
set (SRCS ${SRCS} ${LIBRARY_DIR}/fastops/plain/ops_plain.cpp ${LIBRARY_DIR}/fastops/core/avx_id.cpp ${LIBRARY_DIR}/fastops/fastops.cpp)
add_library(fastops ${SRCS})
target_include_directories(fastops SYSTEM PUBLIC "${LIBRARY_DIR}")

View File

@ -25,3 +25,6 @@ add_library(h3 ${SRCS})
target_include_directories(h3 SYSTEM PUBLIC ${H3_SOURCE_DIR}/include)
target_include_directories(h3 SYSTEM PUBLIC ${H3_BINARY_DIR}/include)
target_compile_definitions(h3 PRIVATE H3_HAVE_VLA)
if(M_LIBRARY)
target_link_libraries(h3 PRIVATE ${M_LIBRARY})
endif()

2
contrib/hyperscan vendored

@ -1 +1 @@
Subproject commit 01e6b83f9fbdb4020cd68a5287bf3a0471eeb272
Subproject commit 3058c9c20cba3accdf92544d8513a26240c4ff70

View File

@ -65,7 +65,7 @@ add_library(rdkafka ${SRCS})
target_include_directories(rdkafka SYSTEM PUBLIC include)
target_include_directories(rdkafka SYSTEM PUBLIC ${RDKAFKA_SOURCE_DIR}) # Because weird logic with "include_next" is used.
target_include_directories(rdkafka SYSTEM PRIVATE ${ZSTD_INCLUDE_DIR}/common) # Because wrong path to "zstd_errors.h" is used.
target_link_libraries(rdkafka PUBLIC ${ZLIB_LIBRARIES} ${ZSTD_LIBRARY} ${LZ4_LIBRARY} ${LIBGSASL_LIBRARY})
target_link_libraries(rdkafka PRIVATE ${ZLIB_LIBRARIES} ${ZSTD_LIBRARY} ${LZ4_LIBRARY} ${LIBGSASL_LIBRARY} Threads::Threads)
if(OPENSSL_SSL_LIBRARY AND OPENSSL_CRYPTO_LIBRARY)
target_link_libraries(rdkafka PUBLIC ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY})
target_link_libraries(rdkafka PRIVATE ${OPENSSL_SSL_LIBRARY} ${OPENSSL_CRYPTO_LIBRARY})
endif()

View File

@ -26,6 +26,7 @@ set(LIBUNWIND_SOURCES
add_library(unwind_static ${LIBUNWIND_SOURCES})
target_include_directories(unwind_static PUBLIC ${LIBUNWIND_SOURCE_DIR}/include)
target_include_directories(unwind_static SYSTEM BEFORE PUBLIC ${LIBUNWIND_SOURCE_DIR}/include)
target_compile_definitions(unwind_static PRIVATE -D_LIBUNWIND_NO_HEAP=1 -D_DEBUG -D_LIBUNWIND_IS_NATIVE_ONLY)
target_compile_options(unwind_static PRIVATE -fno-exceptions -funwind-tables -fno-sanitize=all -nostdinc++ -fno-rtti)
target_link_libraries(unwind_static PRIVATE Threads::Threads ${CMAKE_DL_LIBS})

View File

@ -52,7 +52,10 @@ set(SRCS
)
add_library(libxml2 ${SRCS})
target_link_libraries(libxml2 ${ZLIB_LIBRARIES})
target_link_libraries(libxml2 PRIVATE ${ZLIB_LIBRARIES} ${CMAKE_DL_LIBS})
if(M_LIBRARY)
target_link_libraries(libxml2 PRIVATE ${M_LIBRARY})
endif()
target_include_directories(libxml2 PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64/include)
target_include_directories(libxml2 PUBLIC ${LIBXML2_SOURCE_DIR}/include)

View File

@ -60,6 +60,11 @@ endif()
add_library(mysqlclient ${SRCS})
target_link_libraries(mysqlclient PRIVATE ${CMAKE_DL_LIBS} Threads::Threads)
if(M_LIBRARY)
target_link_libraries(mysqlclient PRIVATE ${M_LIBRARY})
endif()
if(OPENSSL_LIBRARIES)
target_link_libraries(mysqlclient PRIVATE ${OPENSSL_LIBRARIES})
target_compile_definitions(mysqlclient PRIVATE -D HAVE_OPENSSL -D HAVE_TLS)

2
contrib/simdjson vendored

@ -1 +1 @@
Subproject commit 3bd3116cf8faf6d482dc31423b16533bfa2696f7
Subproject commit e3f6322af762213ff2087ce3366bf9541c7fd355

View File

@ -32,6 +32,7 @@ target_include_directories(ltdl PUBLIC ${ODBC_SOURCE_DIR}/libltdl/libltdl)
target_compile_definitions(ltdl PRIVATE -DHAVE_CONFIG_H -DLTDL -DLTDLOPEN=libltdlc)
target_compile_options(ltdl PRIVATE -Wno-constant-logical-operand -Wno-unknown-warning-option -O2)
target_link_libraries(ltdl PRIVATE ${CMAKE_DL_LIBS})
set(SRCS

View File

@ -159,6 +159,12 @@ if (OS_FREEBSD)
target_compile_definitions (clickhouse_common_io PUBLIC CLOCK_MONOTONIC_COARSE=CLOCK_MONOTONIC_FAST)
endif ()
if (USE_UNWIND)
if (NOT USE_INTERNAL_UNWIND_LIBRARY_FOR_EXCEPTION_HANDLING)
target_link_libraries (clickhouse_common_io PRIVATE ${UNWIND_LIBRARY})
endif ()
endif ()
add_subdirectory(src/Common/ZooKeeper)
add_subdirectory(src/Common/Config)
@ -195,6 +201,13 @@ if (CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE" OR CMAKE_BUILD_TYPE_UC STREQUAL "RELW
PROPERTIES COMPILE_FLAGS -g0)
endif ()
# Otherwise it will slow down stack traces printing too much.
set_source_files_properties(
src/Common/Elf.cpp
src/Common/Dwarf.cpp
src/Common/SymbolIndex.cpp
PROPERTIES COMPILE_FLAGS "-O3 ${WITHOUT_COVERAGE}")
target_link_libraries (clickhouse_common_io
PUBLIC
common
@ -237,10 +250,6 @@ target_link_libraries(clickhouse_common_io
roaring
)
if(ZSTD_LIBRARY)
target_link_libraries(clickhouse_common_io PUBLIC ${ZSTD_LIBRARY})
endif()
if (USE_RDKAFKA)
target_link_libraries(dbms PRIVATE ${CPPKAFKA_LIBRARY} ${RDKAFKA_LIBRARY})
if(NOT USE_INTERNAL_RDKAFKA_LIBRARY)
@ -296,11 +305,14 @@ target_include_directories(dbms SYSTEM PUBLIC ${PCG_RANDOM_INCLUDE_DIR})
if (NOT USE_INTERNAL_LZ4_LIBRARY)
target_include_directories(dbms SYSTEM BEFORE PRIVATE ${LZ4_INCLUDE_DIR})
endif ()
if (ZSTD_LIBRARY)
target_link_libraries(dbms PRIVATE ${ZSTD_LIBRARY})
endif()
if (NOT USE_INTERNAL_ZSTD_LIBRARY AND ZSTD_INCLUDE_DIR)
target_include_directories(dbms SYSTEM BEFORE PRIVATE ${ZSTD_INCLUDE_DIR})
endif ()
if (NOT USE_INTERNAL_BOOST_LIBRARY)
target_include_directories (clickhouse_common_io SYSTEM BEFORE PUBLIC ${Boost_INCLUDE_DIRS})
endif ()

View File

@ -9,10 +9,11 @@
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <boost/filesystem.hpp>
#include <filesystem>
#include "executeQuery.h"
namespace DB
{
@ -48,7 +49,7 @@ void waitQuery(Connection & connection)
}
}
namespace fs = boost::filesystem;
namespace fs = std::filesystem;
PerformanceTest::PerformanceTest(
const XMLConfigurationPtr & config_,

View File

@ -3,10 +3,11 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteBufferFromFile.h>
#include <boost/filesystem.hpp>
#include "applySubstitutions.h"
#include <filesystem>
#include <iostream>
namespace DB
{
namespace ErrorCodes
@ -39,7 +40,7 @@ void extractSettings(
}
namespace fs = boost::filesystem;
namespace fs = std::filesystem;
PerformanceTestInfo::PerformanceTestInfo(
XMLConfigurationPtr config,

View File

@ -4,11 +4,11 @@
#include <regex>
#include <thread>
#include <memory>
#include <filesystem>
#include <port/unistd.h>
#include <sys/stat.h>
#include <boost/filesystem.hpp>
#include <boost/program_options.hpp>
#include <Poco/AutoPtr.h>
@ -36,7 +36,7 @@
#include "ReportBuilder.h"
namespace fs = boost::filesystem;
namespace fs = std::filesystem;
namespace po = boost::program_options;
namespace DB

View File

@ -8,6 +8,7 @@
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/getFQDNOrHostName.h>
#include <common/getMemoryAmount.h>
#include <Common/StringUtils/StringUtils.h>
#include "JSONString.h"
@ -29,6 +30,10 @@ std::string getMainMetric(const PerformanceTestInfo & test_info)
main_metric = test_info.main_metric;
return main_metric;
}
bool isASCIIString(const std::string & str)
{
return std::all_of(str.begin(), str.end(), isASCII);
}
}
ReportBuilder::ReportBuilder(const std::string & server_version_)
@ -109,7 +114,12 @@ std::string ReportBuilder::buildFullReport(
runJSON.set("query", query);
runJSON.set("query_index", query_index);
if (!statistics.exception.empty())
runJSON.set("exception", statistics.exception);
{
if (isASCIIString(statistics.exception))
runJSON.set("exception", std::regex_replace(statistics.exception, QUOTE_REGEX, "\\\""));
else
runJSON.set("exception", "Some exception occured with non ASCII message. This may produce invalid JSON. Try reproduce locally.");
}
if (test_info.exec_type == ExecutionType::Loop)
{

View File

@ -61,6 +61,9 @@ namespace ErrorCodes
extern const int SYNTAX_ERROR;
extern const int INCORRECT_DATA;
extern const int TYPE_MISMATCH;
extern const int UNKNOWN_TABLE;
extern const int UNKNOWN_FUNCTION;
extern const int UNKNOWN_IDENTIFIER;
@ -99,15 +102,18 @@ static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int excepti
exception_code == ErrorCodes::CANNOT_PARSE_QUOTED_STRING ||
exception_code == ErrorCodes::CANNOT_PARSE_DATE ||
exception_code == ErrorCodes::CANNOT_PARSE_DATETIME ||
exception_code == ErrorCodes::CANNOT_PARSE_NUMBER)
return HTTPResponse::HTTP_BAD_REQUEST;
else if (exception_code == ErrorCodes::UNKNOWN_ELEMENT_IN_AST ||
exception_code == ErrorCodes::CANNOT_PARSE_NUMBER ||
exception_code == ErrorCodes::UNKNOWN_ELEMENT_IN_AST ||
exception_code == ErrorCodes::UNKNOWN_TYPE_OF_AST_NODE ||
exception_code == ErrorCodes::TOO_DEEP_AST ||
exception_code == ErrorCodes::TOO_BIG_AST ||
exception_code == ErrorCodes::UNEXPECTED_AST_STRUCTURE)
return HTTPResponse::HTTP_BAD_REQUEST;
else if (exception_code == ErrorCodes::SYNTAX_ERROR)
exception_code == ErrorCodes::UNEXPECTED_AST_STRUCTURE ||
exception_code == ErrorCodes::SYNTAX_ERROR ||
exception_code == ErrorCodes::INCORRECT_DATA ||
exception_code == ErrorCodes::TYPE_MISMATCH)
return HTTPResponse::HTTP_BAD_REQUEST;
else if (exception_code == ErrorCodes::UNKNOWN_TABLE ||
exception_code == ErrorCodes::UNKNOWN_FUNCTION ||
@ -119,9 +125,9 @@ static Poco::Net::HTTPResponse::HTTPStatus exceptionCodeToHTTPStatus(int excepti
exception_code == ErrorCodes::UNKNOWN_DIRECTION_OF_SORTING ||
exception_code == ErrorCodes::UNKNOWN_AGGREGATE_FUNCTION ||
exception_code == ErrorCodes::UNKNOWN_FORMAT ||
exception_code == ErrorCodes::UNKNOWN_DATABASE_ENGINE)
return HTTPResponse::HTTP_NOT_FOUND;
else if (exception_code == ErrorCodes::UNKNOWN_TYPE_OF_QUERY)
exception_code == ErrorCodes::UNKNOWN_DATABASE_ENGINE ||
exception_code == ErrorCodes::UNKNOWN_TYPE_OF_QUERY)
return HTTPResponse::HTTP_NOT_FOUND;
else if (exception_code == ErrorCodes::QUERY_IS_TOO_LARGE)
return HTTPResponse::HTTP_REQUESTENTITYTOOLARGE;

View File

@ -15,6 +15,7 @@
#include <ext/scope_guard.h>
#include <common/logger_useful.h>
#include <common/phdr_cache.h>
#include <common/config_common.h>
#include <common/ErrorHandlers.h>
#include <common/getMemoryAmount.h>
#include <Common/ClickHouseRevision.h>
@ -510,8 +511,12 @@ int Server::main(const std::vector<std::string> & /*args*/)
LOG_DEBUG(log, "Loaded metadata.");
/// Init trace collector only after trace_log system table was created
/// Disable it if we collect test coverage information, because it will work extremely slow.
#if USE_INTERNAL_UNWIND_LIBRARY && !WITH_COVERAGE
/// QueryProfiler cannot work reliably with any other libunwind or without PHDR cache.
if (hasPHDRCache())
global_context->initializeTraceCollector();
#endif
global_context->setCurrentDatabase(default_database);
@ -626,161 +631,164 @@ int Server::main(const std::vector<std::string> & /*args*/)
for (const auto & listen_host : listen_hosts)
{
/// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file.
uint16_t listen_port = 0;
try
auto create_server = [&](const char * port_name, auto && func)
{
/// HTTP
if (config().has("http_port"))
{
Poco::Net::ServerSocket socket;
listen_port = config().getInt("http_port");
auto address = socket_bind_listen(socket, listen_host, listen_port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
new HTTPHandlerFactory(*this, "HTTPHandler-factory"),
server_pool,
socket,
http_params));
/// For testing purposes, user may omit tcp_port or http_port or https_port in configuration file.
if (!config().has(port_name))
return;
LOG_INFO(log, "Listening http://" + address.toString());
auto port = config().getInt(port_name);
try
{
func(port);
}
/// HTTPS
if (config().has("https_port"))
catch (const Poco::Exception &)
{
#if USE_POCO_NETSSL
Poco::Net::SecureServerSocket socket;
listen_port = config().getInt("https_port");
auto address = socket_bind_listen(socket, listen_host, listen_port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
new HTTPHandlerFactory(*this, "HTTPSHandler-factory"),
server_pool,
socket,
http_params));
std::string message = "Listen [" + listen_host + "]:" + std::to_string(port) + " failed: " + getCurrentExceptionMessage(false);
LOG_INFO(log, "Listening https://" + address.toString());
if (listen_try)
{
LOG_ERROR(log, message
<< ". If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to "
"specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "
"file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> ."
" Example for disabled IPv4: <listen_host>::</listen_host>");
}
else
{
throw Exception{message, ErrorCodes::NETWORK_ERROR};
}
}
};
/// HTTP
create_server("http_port", [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
new HTTPHandlerFactory(*this, "HTTPHandler-factory"),
server_pool,
socket,
http_params));
LOG_INFO(log, "Listening http://" + address.toString());
});
/// HTTPS
create_server("https_port", [&](UInt16 port)
{
#if USE_POCO_NETSSL
Poco::Net::SecureServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
new HTTPHandlerFactory(*this, "HTTPSHandler-factory"),
server_pool,
socket,
http_params));
LOG_INFO(log, "Listening https://" + address.toString());
#else
throw Exception{"HTTPS protocol is disabled because Poco library was built without NetSSL support.",
throw Exception{"HTTPS protocol is disabled because Poco library was built without NetSSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
});
/// TCP
create_server("tcp_port", [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this),
server_pool,
socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for connections with native protocol (tcp): " + address.toString());
});
/// TCP with SSL
create_server("tcp_port_secure", [&](UInt16 port)
{
#if USE_POCO_NETSSL
Poco::Net::SecureServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this, /* secure= */ true),
server_pool,
socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for connections with secure native protocol (tcp_secure): " + address.toString());
#else
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
});
/// Interserver IO HTTP
create_server("interserver_http_port", [&](UInt16 port)
{
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"),
server_pool,
socket,
http_params));
LOG_INFO(log, "Listening for replica communication (interserver) http://" + address.toString());
});
create_server("interserver_https_port", [&](UInt16 port)
{
#if USE_POCO_NETSSL
Poco::Net::SecureServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"),
server_pool,
socket,
http_params));
LOG_INFO(log, "Listening for secure replica communication (interserver) https://" + address.toString());
#else
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
}
});
/// TCP
if (config().has("tcp_port"))
{
Poco::Net::ServerSocket socket;
listen_port = config().getInt("tcp_port");
auto address = socket_bind_listen(socket, listen_host, listen_port);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this),
server_pool,
socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for connections with native protocol (tcp): " + address.toString());
}
/// TCP with SSL
if (config().has("tcp_port_secure"))
{
create_server("mysql_port", [&](UInt16 port)
{
#if USE_POCO_NETSSL
Poco::Net::SecureServerSocket socket;
listen_port = config().getInt("tcp_port_secure");
auto address = socket_bind_listen(socket, listen_host, listen_port, /* secure = */ true);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new TCPHandlerFactory(*this, /* secure= */ true),
server_pool,
socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for connections with secure native protocol (tcp_secure): " + address.toString());
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, port, /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new MySQLHandlerFactory(*this),
server_pool,
socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for MySQL compatibility protocol: " + address.toString());
#else
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
throw Exception{"SSL support for MySQL protocol is disabled because Poco library was built without NetSSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
}
/// At least one of TCP and HTTP servers must be created.
if (servers.empty())
throw Exception("No 'tcp_port' and 'http_port' is specified in configuration file.", ErrorCodes::NO_ELEMENTS_IN_CONFIG);
/// Interserver IO HTTP
if (config().has("interserver_http_port"))
{
Poco::Net::ServerSocket socket;
listen_port = config().getInt("interserver_http_port");
auto address = socket_bind_listen(socket, listen_host, listen_port);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"),
server_pool,
socket,
http_params));
LOG_INFO(log, "Listening for replica communication (interserver) http://" + address.toString());
}
if (config().has("interserver_https_port"))
{
#if USE_POCO_NETSSL
Poco::Net::SecureServerSocket socket;
listen_port = config().getInt("interserver_https_port");
auto address = socket_bind_listen(socket, listen_host, listen_port, /* secure = */ true);
socket.setReceiveTimeout(settings.http_receive_timeout);
socket.setSendTimeout(settings.http_send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::HTTPServer>(
new InterserverIOHTTPHandlerFactory(*this, "InterserverIOHTTPHandler-factory"),
server_pool,
socket,
http_params));
LOG_INFO(log, "Listening for secure replica communication (interserver) https://" + address.toString());
#else
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
}
if (config().has("mysql_port"))
{
#if USE_POCO_NETSSL
Poco::Net::ServerSocket socket;
auto address = socket_bind_listen(socket, listen_host, config().getInt("mysql_port"), /* secure = */ true);
socket.setReceiveTimeout(Poco::Timespan());
socket.setSendTimeout(settings.send_timeout);
servers.emplace_back(std::make_unique<Poco::Net::TCPServer>(
new MySQLHandlerFactory(*this),
server_pool,
socket,
new Poco::Net::TCPServerParams));
LOG_INFO(log, "Listening for MySQL compatibility protocol: " + address.toString());
#else
throw Exception{"SSL support for MySQL protocol is disabled because Poco library was built without NetSSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
}
}
catch (const Poco::Exception & e)
{
std::string message = "Listen [" + listen_host + "]:" + std::to_string(listen_port) + " failed: " + std::to_string(e.code()) + ": " + e.what() + ": " + e.message();
if (listen_try)
LOG_ERROR(log, message
<< " If it is an IPv6 or IPv4 address and your host has disabled IPv6 or IPv4, then consider to "
"specify not disabled IPv4 or IPv6 address to listen in <listen_host> element of configuration "
"file. Example for disabled IPv6: <listen_host>0.0.0.0</listen_host> ."
" Example for disabled IPv4: <listen_host>::</listen_host>");
else
throw Exception{message, ErrorCodes::NETWORK_ERROR};
}
});
}
if (servers.empty())
@ -818,10 +826,13 @@ int Server::main(const std::vector<std::string> & /*args*/)
current_connections += server->currentConnections();
}
LOG_DEBUG(log,
LOG_INFO(log,
"Closed all listening sockets."
<< (current_connections ? " Waiting for " + toString(current_connections) + " outstanding connections." : ""));
/// Killing remaining queries.
global_context->getProcessList().killAllQueries();
if (current_connections)
{
const int sleep_max_ms = 1000 * config().getInt("shutdown_wait_unfinished", 5);
@ -839,13 +850,24 @@ int Server::main(const std::vector<std::string> & /*args*/)
}
}
LOG_DEBUG(
LOG_INFO(
log, "Closed connections." << (current_connections ? " But " + toString(current_connections) + " remains."
" Tip: To increase wait time add to config: <shutdown_wait_unfinished>60</shutdown_wait_unfinished>" : ""));
dns_cache_updater.reset();
main_config_reloader.reset();
users_config_reloader.reset();
if (current_connections)
{
/// There is no better way to force connections to close in Poco.
/// Otherwise connection handlers will continue to live
/// (they are effectively dangling objects, but they use global thread pool
/// and global thread pool destructor will wait for threads, preventing server shutdown).
LOG_INFO(log, "Will shutdown forcefully.");
_exit(Application::EXIT_OK);
}
});
/// try to load dictionaries immediately, throw on error and die

View File

@ -59,10 +59,13 @@ void TCPHandler::runImpl()
connection_context = server.context();
connection_context.makeSessionContext();
Settings global_settings = connection_context.getSettings();
/// These timeouts can be changed after receiving query.
socket().setReceiveTimeout(global_settings.receive_timeout);
socket().setSendTimeout(global_settings.send_timeout);
auto global_receive_timeout = connection_context.getSettingsRef().receive_timeout;
auto global_send_timeout = connection_context.getSettingsRef().send_timeout;
socket().setReceiveTimeout(global_receive_timeout);
socket().setSendTimeout(global_send_timeout);
socket().setNoDelay(true);
in = std::make_shared<ReadBufferFromPocoSocket>(socket());
@ -74,6 +77,7 @@ void TCPHandler::runImpl()
return;
}
/// User will be authenticated here. It will also set settings from user profile into connection_context.
try
{
receiveHello();
@ -117,6 +121,8 @@ void TCPHandler::runImpl()
connection_context.setCurrentDatabase(default_database);
}
Settings connection_settings = connection_context.getSettings();
sendHello();
connection_context.setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });
@ -126,9 +132,10 @@ void TCPHandler::runImpl()
/// We are waiting for a packet from the client. Thus, every `poll_interval` seconds check whether we need to shut down.
{
Stopwatch idle_time;
while (!static_cast<ReadBufferFromPocoSocket &>(*in).poll(global_settings.poll_interval * 1000000) && !server.isCancelled())
while (!server.isCancelled() && !static_cast<ReadBufferFromPocoSocket &>(*in).poll(
std::min(connection_settings.poll_interval, connection_settings.idle_connection_timeout) * 1000000))
{
if (idle_time.elapsedSeconds() > global_settings.idle_connection_timeout)
if (idle_time.elapsedSeconds() > connection_settings.idle_connection_timeout)
{
LOG_TRACE(log, "Closing idle connection");
return;
@ -182,13 +189,13 @@ void TCPHandler::runImpl()
CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level.value);
}
query_context->setExternalTablesInitializer([&global_settings, this] (Context & context)
query_context->setExternalTablesInitializer([&connection_settings, this] (Context & context)
{
if (&context != &*query_context)
throw Exception("Unexpected context in external tables initializer", ErrorCodes::LOGICAL_ERROR);
/// Get blocks of temporary tables
readData(global_settings);
readData(connection_settings);
/// Reset the input stream, as we received an empty block while receiving external table data.
/// So, the stream has been marked as cancelled and we can't read from it anymore.
@ -210,7 +217,7 @@ void TCPHandler::runImpl()
/// Does the request require receive data from client?
if (state.need_receive_data_for_insert)
processInsertQuery(global_settings);
processInsertQuery(connection_settings);
else if (state.io.pipeline.initialized())
processOrdinaryQueryWithProcessors(query_context->getSettingsRef().max_threads);
else
@ -317,12 +324,12 @@ void TCPHandler::runImpl()
}
void TCPHandler::readData(const Settings & global_settings)
void TCPHandler::readData(const Settings & connection_settings)
{
const auto receive_timeout = query_context->getSettingsRef().receive_timeout.value;
/// Poll interval should not be greater than receive_timeout
const size_t default_poll_interval = global_settings.poll_interval.value * 1000000;
const size_t default_poll_interval = connection_settings.poll_interval.value * 1000000;
size_t current_poll_interval = static_cast<size_t>(receive_timeout.totalMicroseconds());
constexpr size_t min_poll_interval = 5000; // 5 ms
size_t poll_interval = std::max(min_poll_interval, std::min(default_poll_interval, current_poll_interval));
@ -372,7 +379,7 @@ void TCPHandler::readData(const Settings & global_settings)
}
void TCPHandler::processInsertQuery(const Settings & global_settings)
void TCPHandler::processInsertQuery(const Settings & connection_settings)
{
/** Made above the rest of the lines, so that in case of `writePrefix` function throws an exception,
* client receive exception before sending data.
@ -393,7 +400,7 @@ void TCPHandler::processInsertQuery(const Settings & global_settings)
/// Send block to the client - table structure.
sendData(state.io.out->getHeader());
readData(global_settings);
readData(connection_settings);
state.io.out->writeSuffix();
state.io.onFinish();
}

View File

@ -68,7 +68,7 @@ struct EntropyData
while (reader.next())
{
const auto & pair = reader.get();
map[pair.getFirst()] = pair.getSecond();
map[pair.first] = pair.second;
}
}

View File

@ -49,12 +49,7 @@ static DataTypes convertLowCardinalityTypesToNested(const DataTypes & types)
DataTypes res_types;
res_types.reserve(types.size());
for (const auto & type : types)
{
if (auto * low_cardinality_type = typeid_cast<const DataTypeLowCardinality *>(type.get()))
res_types.push_back(low_cardinality_type->getDictionaryType());
else
res_types.push_back(type);
}
res_types.emplace_back(recursiveRemoveLowCardinality(type));
return res_types;
}
@ -69,7 +64,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
/// If one of types is Nullable, we apply aggregate function combinator "Null".
if (std::any_of(argument_types.begin(), argument_types.end(),
if (std::any_of(type_without_low_cardinality.begin(), type_without_low_cardinality.end(),
[](const auto & type) { return type->isNullable(); }))
{
AggregateFunctionCombinatorPtr combinator = AggregateFunctionCombinatorFactory::instance().tryFindSuffix("Null");
@ -83,11 +78,11 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
/// A little hack - if we have NULL arguments, don't even create nested function.
/// Combinator will check if nested_function was created.
if (name == "count" || std::none_of(argument_types.begin(), argument_types.end(),
if (name == "count" || std::none_of(type_without_low_cardinality.begin(), type_without_low_cardinality.end(),
[](const auto & type) { return type->onlyNull(); }))
nested_function = getImpl(name, nested_types, nested_parameters, recursion_level);
return combinator->transformAggregateFunction(nested_function, argument_types, parameters);
return combinator->transformAggregateFunction(nested_function, type_without_low_cardinality, parameters);
}
auto res = getImpl(name, type_without_low_cardinality, parameters, recursion_level);

View File

@ -72,7 +72,7 @@ struct QuantileExactWeighted
while (reader.next())
{
const auto & pair = reader.get();
map[pair.getFirst()] = pair.getSecond();
map[pair.first] = pair.second;
}
}
@ -98,7 +98,7 @@ struct QuantileExactWeighted
++i;
}
std::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.getFirst() < b.getFirst(); });
std::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; });
UInt64 threshold = std::ceil(sum_weight * level);
UInt64 accumulated = 0;
@ -107,7 +107,7 @@ struct QuantileExactWeighted
const Pair * end = array + size;
while (it < end)
{
accumulated += it->getSecond();
accumulated += it->second;
if (accumulated >= threshold)
break;
@ -118,7 +118,7 @@ struct QuantileExactWeighted
if (it == end)
--it;
return it->getFirst();
return it->first;
}
/// Get the `size` values of `levels` quantiles. Write `size` results starting with `result` address.
@ -148,7 +148,7 @@ struct QuantileExactWeighted
++i;
}
std::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.getFirst() < b.getFirst(); });
std::sort(array, array + size, [](const Pair & a, const Pair & b) { return a.first < b.first; });
UInt64 accumulated = 0;
@ -160,11 +160,11 @@ struct QuantileExactWeighted
while (it < end)
{
accumulated += it->getSecond();
accumulated += it->second;
while (accumulated >= threshold)
{
result[indices[level_index]] = it->getFirst();
result[indices[level_index]] = it->first;
++level_index;
if (level_index == num_levels)
@ -178,7 +178,7 @@ struct QuantileExactWeighted
while (level_index < num_levels)
{
result[indices[level_index]] = array[size - 1].getFirst();
result[indices[level_index]] = array[size - 1].first;
++level_index;
}
}

View File

@ -11,7 +11,7 @@
#endif
#include <pcg_random.hpp>
#include <Common/randomSeed.h>
#include <Common/thread_local_rng.h>
#if !defined(__APPLE__) && !defined(__FreeBSD__)
#include <malloc.h>
@ -86,10 +86,8 @@ struct RandomHint
{
void * mmap_hint()
{
return reinterpret_cast<void *>(std::uniform_int_distribution<intptr_t>(0x100000000000UL, 0x700000000000UL)(rng));
return reinterpret_cast<void *>(std::uniform_int_distribution<intptr_t>(0x100000000000UL, 0x700000000000UL)(thread_local_rng));
}
private:
pcg64 rng{randomSeed()};
};
}

View File

@ -5,7 +5,9 @@
#include <vector>
#include <boost/noncopyable.hpp>
#include <common/likely.h>
#include <sanitizer/asan_interface.h>
#if __has_include(<sanitizer/asan_interface.h>)
# include <sanitizer/asan_interface.h>
#endif
#include <Core/Defines.h>
#include <Common/memcpySmall.h>
#include <Common/ProfileEvents.h>

View File

@ -1,6 +1,9 @@
#pragma once
#include <sanitizer/asan_interface.h>
#if __has_include(<sanitizer/asan_interface.h>)
# include <sanitizer/asan_interface.h>
#endif
#include <Core/Defines.h>
#include <Common/Arena.h>
#include <Common/BitHelpers.h>

View File

@ -61,7 +61,7 @@ struct HashMethodOneNumber
/// Get StringRef from value which can be inserted into column.
static StringRef getValueRef(const Value & value)
{
return StringRef(reinterpret_cast<const char *>(&value.getFirst()), sizeof(value.getFirst()));
return StringRef(reinterpret_cast<const char *>(&value.first), sizeof(value.first));
}
};
@ -90,7 +90,7 @@ struct HashMethodString
return StringRef(chars + offsets[row - 1], offsets[row] - offsets[row - 1] - 1);
}
static StringRef getValueRef(const Value & value) { return StringRef(value.getFirst().data, value.getFirst().size); }
static StringRef getValueRef(const Value & value) { return value.first; }
protected:
friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;
@ -127,7 +127,7 @@ struct HashMethodFixedString
StringRef getKey(size_t row, Arena &) const { return StringRef(&(*chars)[row * n], n); }
static StringRef getValueRef(const Value & value) { return StringRef(value.getFirst().data, value.getFirst().size); }
static StringRef getValueRef(const Value & value) { return value.first; }
protected:
friend class columns_hashing_impl::HashMethodBase<Self, Value, Mapped, use_cache>;

View File

@ -39,7 +39,7 @@ struct LastElementCache
bool check(const Value & value_) { return !empty && value == value_; }
template <typename Key>
bool check(const Key & key) { return !empty && value.getFirst() == key; }
bool check(const Key & key) { return !empty && value.first == key; }
};
template <typename Data>
@ -147,8 +147,8 @@ protected:
if constexpr (has_mapped)
{
/// Init PairNoInit elements.
cache.value.getSecond() = Mapped();
cache.value.getFirstMutable() = {};
cache.value.second = Mapped();
cache.value.first = {};
}
else
cache.value = Value();
@ -170,7 +170,7 @@ protected:
static_cast<Derived &>(*this).onExistingKey(key, pool);
if constexpr (has_mapped)
return EmplaceResult(cache.value.getSecond(), cache.value.getSecond(), false);
return EmplaceResult(cache.value.second, cache.value.second, false);
else
return EmplaceResult(false);
}
@ -204,7 +204,7 @@ protected:
cache.empty = false;
if constexpr (has_mapped)
cached = &cache.value.getSecond();
cached = &cache.value.second;
}
if constexpr (has_mapped)
@ -221,7 +221,7 @@ protected:
if (cache.check(key))
{
if constexpr (has_mapped)
return FindResult(&cache.value.getSecond(), cache.found);
return FindResult(&cache.value.second, cache.found);
else
return FindResult(cache.found);
}
@ -240,7 +240,7 @@ protected:
else
{
if constexpr (has_mapped)
cache.value.getFirstMutable() = key;
cache.value.first = key;
else
cache.value = key;
}

View File

@ -6,13 +6,13 @@
M(Query, "Number of executing queries") \
M(Merge, "Number of executing background merges") \
M(PartMutation, "Number of mutations (ALTER DELETE/UPDATE)") \
M(ReplicatedFetch, "Number of data parts fetching from replica") \
M(ReplicatedSend, "Number of data parts sending to replicas") \
M(ReplicatedFetch, "Number of data parts being fetched from replica") \
M(ReplicatedSend, "Number of data parts being sent to replicas") \
M(ReplicatedChecks, "Number of data parts checking for consistency") \
M(BackgroundPoolTask, "Number of active tasks in BackgroundProcessingPool (merges, mutations, fetches or replication queue bookkeeping)") \
M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic tasks of ReplicatedMergeTree like cleaning old data parts, altering data parts, replica re-initialization, etc.") \
M(DiskSpaceReservedForMerge, "Disk space reserved for currently running background merges. It is slightly more than total size of currently merging parts.") \
M(DistributedSend, "Number of connections sending data, that was INSERTed to Distributed tables, to remote servers. Both synchronous and asynchronous mode.") \
M(BackgroundPoolTask, "Number of active tasks in BackgroundProcessingPool (merges, mutations, fetches, or replication queue bookkeeping)") \
M(BackgroundSchedulePoolTask, "Number of active tasks in BackgroundSchedulePool. This pool is used for periodic ReplicatedMergeTree tasks, like cleaning old data parts, altering data parts, replica re-initialization, etc.") \
M(DiskSpaceReservedForMerge, "Disk space reserved for currently running background merges. It is slightly more than the total size of currently merging parts.") \
M(DistributedSend, "Number of connections to remote servers sending data that was INSERTed into Distributed tables. Both synchronous and asynchronous mode.") \
M(QueryPreempted, "Number of queries that are stopped and waiting due to 'priority' setting.") \
M(TCPConnection, "Number of connections to TCP server (clients with native interface)") \
M(HTTPConnection, "Number of connections to HTTP server") \
@ -45,6 +45,10 @@
M(RWLockWaitingWriters, "Number of threads waiting for write on a table RWLock.") \
M(RWLockActiveReaders, "Number of threads holding read lock in a table RWLock.") \
M(RWLockActiveWriters, "Number of threads holding write lock in a table RWLock.") \
M(GlobalThread, "Number of threads in global thread pool.") \
M(GlobalThreadActive, "Number of threads in global thread pool running a task.") \
M(LocalThread, "Number of threads in local thread pools. Should be similar to GlobalThreadActive.") \
M(LocalThreadActive, "Number of threads in local thread pools running a task.") \
namespace CurrentMetrics

1033
dbms/src/Common/Dwarf.cpp Normal file

File diff suppressed because it is too large Load Diff

287
dbms/src/Common/Dwarf.h Normal file
View File

@ -0,0 +1,287 @@
#pragma once
/*
* Copyright 2012-present Facebook, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/** This file was edited for ClickHouse.
*/
#include <string>
#include <string_view>
#include <variant>
namespace DB
{
class Elf;
/**
* DWARF record parser.
*
* We only implement enough DWARF functionality to convert from PC address
* to file and line number information.
*
* This means (although they're not part of the public API of this class), we
* can parse Debug Information Entries (DIEs), abbreviations, attributes (of
* all forms), and we can interpret bytecode for the line number VM.
*
* We can interpret DWARF records of version 2, 3, or 4, although we don't
* actually support many of the version 4 features (such as VLIW, multiple
* operations per instruction)
*
* Note that the DWARF record parser does not allocate heap memory at all.
* This is on purpose: you can use the parser from
* memory-constrained situations (such as an exception handler for
* std::out_of_memory) If it weren't for this requirement, some things would
* be much simpler: the Path class would be unnecessary and would be replaced
* with a std::string; the list of file names in the line number VM would be
* kept as a vector of strings instead of re-executing the program to look for
* DW_LNE_define_file instructions, etc.
*/
class Dwarf final
{
// Note that Dwarf uses (and returns) std::string_view a lot.
// The std::string_view point within sections in the ELF file, and so will
// be live for as long as the passed-in Elf is live.
public:
/** Create a DWARF parser around an ELF file. */
explicit Dwarf(const Elf & elf);
/**
* Represent a file path a s collection of three parts (base directory,
* subdirectory, and file).
*/
class Path
{
public:
Path() {}
Path(std::string_view baseDir, std::string_view subDir, std::string_view file);
std::string_view baseDir() const { return baseDir_; }
std::string_view subDir() const { return subDir_; }
std::string_view file() const { return file_; }
size_t size() const;
/**
* Copy the Path to a buffer of size bufSize.
*
* toBuffer behaves like snprintf: It will always null-terminate the
* buffer (so it will copy at most bufSize-1 bytes), and it will return
* the number of bytes that would have been written if there had been
* enough room, so, if toBuffer returns a value >= bufSize, the output
* was truncated.
*/
size_t toBuffer(char * buf, size_t bufSize) const;
void toString(std::string & dest) const;
std::string toString() const
{
std::string s;
toString(s);
return s;
}
// TODO(tudorb): Implement operator==, operator!=; not as easy as it
// seems as the same path can be represented in multiple ways
private:
std::string_view baseDir_;
std::string_view subDir_;
std::string_view file_;
};
enum class LocationInfoMode
{
// Don't resolve location info.
DISABLED,
// Perform CU lookup using .debug_aranges (might be incomplete).
FAST,
// Scan all CU in .debug_info (slow!) on .debug_aranges lookup failure.
FULL,
};
struct LocationInfo
{
bool hasMainFile = false;
Path mainFile;
bool hasFileAndLine = false;
Path file;
uint64_t line = 0;
};
/**
* Find the file and line number information corresponding to address.
*/
bool findAddress(uintptr_t address, LocationInfo & info, LocationInfoMode mode) const;
private:
static bool findDebugInfoOffset(uintptr_t address, std::string_view aranges, uint64_t & offset);
void init();
bool findLocation(uintptr_t address, std::string_view & infoEntry, LocationInfo & info) const;
const Elf * elf_;
// DWARF section made up of chunks, each prefixed with a length header.
// The length indicates whether the chunk is DWARF-32 or DWARF-64, which
// guides interpretation of "section offset" records.
// (yes, DWARF-32 and DWARF-64 sections may coexist in the same file)
class Section
{
public:
Section() : is64Bit_(false) {}
explicit Section(std::string_view d);
// Return next chunk, if any; the 4- or 12-byte length was already
// parsed and isn't part of the chunk.
bool next(std::string_view & chunk);
// Is the current chunk 64 bit?
bool is64Bit() const { return is64Bit_; }
private:
// Yes, 32- and 64- bit sections may coexist. Yikes!
bool is64Bit_;
std::string_view data_;
};
// Abbreviation for a Debugging Information Entry.
struct DIEAbbreviation
{
uint64_t code;
uint64_t tag;
bool hasChildren;
struct Attribute
{
uint64_t name;
uint64_t form;
};
std::string_view attributes;
};
// Interpreter for the line number bytecode VM
class LineNumberVM
{
public:
LineNumberVM(std::string_view data, std::string_view compilationDirectory);
bool findAddress(uintptr_t address, Path & file, uint64_t & line);
private:
void init();
void reset();
// Execute until we commit one new row to the line number matrix
bool next(std::string_view & program);
enum StepResult
{
CONTINUE, // Continue feeding opcodes
COMMIT, // Commit new <address, file, line> tuple
END, // End of sequence
};
// Execute one opcode
StepResult step(std::string_view & program);
struct FileName
{
std::string_view relativeName;
// 0 = current compilation directory
// otherwise, 1-based index in the list of include directories
uint64_t directoryIndex;
};
// Read one FileName object, remove_prefix sp
static bool readFileName(std::string_view & sp, FileName & fn);
// Get file name at given index; may be in the initial table
// (fileNames_) or defined using DW_LNE_define_file (and we reexecute
// enough of the program to find it, if so)
FileName getFileName(uint64_t index) const;
// Get include directory at given index
std::string_view getIncludeDirectory(uint64_t index) const;
// Execute opcodes until finding a DW_LNE_define_file and return true;
// return file at the end.
bool nextDefineFile(std::string_view & program, FileName & fn) const;
// Initialization
bool is64Bit_;
std::string_view data_;
std::string_view compilationDirectory_;
// Header
uint16_t version_;
uint8_t minLength_;
bool defaultIsStmt_;
int8_t lineBase_;
uint8_t lineRange_;
uint8_t opcodeBase_;
const uint8_t * standardOpcodeLengths_;
std::string_view includeDirectories_;
size_t includeDirectoryCount_;
std::string_view fileNames_;
size_t fileNameCount_;
// State machine registers
uint64_t address_;
uint64_t file_;
uint64_t line_;
uint64_t column_;
bool isStmt_;
bool basicBlock_;
bool endSequence_;
bool prologueEnd_;
bool epilogueBegin_;
uint64_t isa_;
uint64_t discriminator_;
};
// Read an abbreviation from a std::string_view, return true if at end; remove_prefix sp
static bool readAbbreviation(std::string_view & sp, DIEAbbreviation & abbr);
// Get abbreviation corresponding to a code, in the chunk starting at
// offset in the .debug_abbrev section
DIEAbbreviation getAbbreviation(uint64_t code, uint64_t offset) const;
// Read one attribute <name, form> pair, remove_prefix sp; returns <0, 0> at end.
static DIEAbbreviation::Attribute readAttribute(std::string_view & sp);
// Read one attribute value, remove_prefix sp
typedef std::variant<uint64_t, std::string_view> AttributeValue;
AttributeValue readAttributeValue(std::string_view & sp, uint64_t form, bool is64Bit) const;
// Get an ELF section by name, return true if found
bool getSection(const char * name, std::string_view * section) const;
// Get a string from the .debug_str section
std::string_view getStringFromStringSection(uint64_t offset) const;
std::string_view info_; // .debug_info
std::string_view abbrev_; // .debug_abbrev
std::string_view aranges_; // .debug_aranges
std::string_view line_; // .debug_line
std::string_view strings_; // .debug_str
};
}

130
dbms/src/Common/Elf.cpp Normal file
View File

@ -0,0 +1,130 @@
#include <Common/Elf.h>
#include <Common/Exception.h>
#include <string.h>
namespace DB
{
namespace ErrorCodes
{
extern const int CANNOT_PARSE_ELF;
}
Elf::Elf(const std::string & path)
: in(path, 0)
{
/// Check if it's an elf.
elf_size = in.buffer().size();
if (elf_size < sizeof(ElfEhdr))
throw Exception("The size of supposedly ELF file is too small", ErrorCodes::CANNOT_PARSE_ELF);
mapped = in.buffer().begin();
header = reinterpret_cast<const ElfEhdr *>(mapped);
if (memcmp(header->e_ident, "\x7F""ELF", 4) != 0)
throw Exception("The file is not ELF according to magic", ErrorCodes::CANNOT_PARSE_ELF);
/// Get section header.
ElfOff section_header_offset = header->e_shoff;
uint16_t section_header_num_entries = header->e_shnum;
if (!section_header_offset
|| !section_header_num_entries
|| section_header_offset + section_header_num_entries * sizeof(ElfShdr) > elf_size)
throw Exception("The ELF is truncated (section header points after end of file)", ErrorCodes::CANNOT_PARSE_ELF);
section_headers = reinterpret_cast<const ElfShdr *>(mapped + section_header_offset);
/// The string table with section names.
auto section_names_strtab = findSection([&](const Section & section, size_t idx)
{
return section.header.sh_type == SHT_STRTAB && header->e_shstrndx == idx;
});
if (!section_names_strtab)
throw Exception("The ELF doesn't have string table with section names", ErrorCodes::CANNOT_PARSE_ELF);
ElfOff section_names_offset = section_names_strtab->header.sh_offset;
if (section_names_offset >= elf_size)
throw Exception("The ELF is truncated (section names string table points after end of file)", ErrorCodes::CANNOT_PARSE_ELF);
section_names = reinterpret_cast<const char *>(mapped + section_names_offset);
}
Elf::Section::Section(const ElfShdr & header, const Elf & elf)
: header(header), elf(elf)
{
}
bool Elf::iterateSections(std::function<bool(const Section & section, size_t idx)> && pred) const
{
for (size_t idx = 0; idx < header->e_shnum; ++idx)
{
Section section(section_headers[idx], *this);
/// Sections spans after end of file.
if (section.header.sh_offset + section.header.sh_size > elf_size)
continue;
if (pred(section, idx))
return true;
}
return false;
}
std::optional<Elf::Section> Elf::findSection(std::function<bool(const Section & section, size_t idx)> && pred) const
{
std::optional<Elf::Section> result;
iterateSections([&](const Section & section, size_t idx)
{
if (pred(section, idx))
{
result.emplace(section);
return true;
}
return false;
});
return result;
}
std::optional<Elf::Section> Elf::findSectionByName(const char * name) const
{
return findSection([&](const Section & section, size_t) { return 0 == strcmp(name, section.name()); });
}
const char * Elf::Section::name() const
{
if (!elf.section_names)
throw Exception("Section names are not initialized", ErrorCodes::CANNOT_PARSE_ELF);
/// TODO buffer overflow is possible, we may need to check strlen.
return elf.section_names + header.sh_name;
}
const char * Elf::Section::begin() const
{
return elf.mapped + header.sh_offset;
}
const char * Elf::Section::end() const
{
return begin() + size();
}
size_t Elf::Section::size() const
{
return header.sh_size;
}
}

63
dbms/src/Common/Elf.h Normal file
View File

@ -0,0 +1,63 @@
#pragma once
#include <IO/MMapReadBufferFromFile.h>
#include <string>
#include <optional>
#include <functional>
#include <elf.h>
#include <link.h>
using ElfAddr = ElfW(Addr);
using ElfEhdr = ElfW(Ehdr);
using ElfOff = ElfW(Off);
using ElfPhdr = ElfW(Phdr);
using ElfShdr = ElfW(Shdr);
using ElfSym = ElfW(Sym);
namespace DB
{
/** Allow to navigate sections in ELF.
*/
class Elf final
{
public:
struct Section
{
const ElfShdr & header;
const char * name() const;
const char * begin() const;
const char * end() const;
size_t size() const;
Section(const ElfShdr & header, const Elf & elf);
private:
const Elf & elf;
};
explicit Elf(const std::string & path);
bool iterateSections(std::function<bool(const Section & section, size_t idx)> && pred) const;
std::optional<Section> findSection(std::function<bool(const Section & section, size_t idx)> && pred) const;
std::optional<Section> findSectionByName(const char * name) const;
const char * begin() const { return mapped; }
const char * end() const { return mapped + elf_size; }
size_t size() const { return elf_size; }
private:
MMapReadBufferFromFile in;
size_t elf_size;
const char * mapped;
const ElfEhdr * header;
const ElfShdr * section_headers;
const char * section_names = nullptr;
};
}

View File

@ -438,6 +438,10 @@ namespace ErrorCodes
extern const int CANNOT_SET_TIMER_PERIOD = 461;
extern const int CANNOT_DELETE_TIMER = 462;
extern const int CANNOT_FCNTL = 463;
extern const int CANNOT_PARSE_ELF = 464;
extern const int CANNOT_PARSE_DWARF = 465;
extern const int INSECURE_PATH = 466;
extern const int CANNOT_PARSE_BOOL = 467;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -6,7 +6,7 @@
#include <Poco/Exception.h>
#include <common/StackTrace.h>
#include <Common/StackTrace.h>
namespace Poco { class Logger; }

View File

@ -11,32 +11,28 @@
*/
struct NoInitTag {};
struct NoInitTag
{
};
/// A pair that does not initialize the elements, if not needed.
template <typename First, typename Second>
class PairNoInit
struct PairNoInit
{
First first;
Second second;
template <typename, typename, typename, typename>
friend class HashMapCell;
public:
PairNoInit() {}
template <typename First_>
PairNoInit(First_ && first_, NoInitTag)
: first(std::forward<First_>(first_)) {}
PairNoInit(First_ && first_, NoInitTag) : first(std::forward<First_>(first_))
{
}
template <typename First_, typename Second_>
PairNoInit(First_ && first_, Second_ && second_)
: first(std::forward<First_>(first_)), second(std::forward<Second_>(second_)) {}
First & getFirstMutable() { return first; }
const First & getFirst() const { return first; }
Second & getSecond() { return second; }
const Second & getSecond() const { return second; }
PairNoInit(First_ && first_, Second_ && second_) : first(std::forward<First_>(first_)), second(std::forward<Second_>(second_))
{
}
};
@ -123,8 +119,8 @@ struct HashMapCellWithSavedHash : public HashMapCell<Key, TMapped, Hash, TState>
using Base::Base;
bool keyEquals(const Key & key_) const { return this->value.getFirst() == key_; }
bool keyEquals(const Key & key_, size_t hash_) const { return saved_hash == hash_ && this->value.getFirst() == key_; }
bool keyEquals(const Key & key_) const { return this->value.first == key_; }
bool keyEquals(const Key & key_, size_t hash_) const { return saved_hash == hash_ && this->value.first == key_; }
bool keyEquals(const Key & key_, size_t hash_, const typename Base::State &) const { return keyEquals(key_, hash_); }
void setHash(size_t hash_value) { saved_hash = hash_value; }

View File

@ -1,8 +1,6 @@
#include <Common/config.h>
#include "MiAllocator.h"
#if USE_MIMALLOC
#include "MiAllocator.h"
#include <mimalloc.h>
#include <Common/Exception.h>

View File

@ -2,10 +2,7 @@
#include <Common/config.h>
#if !USE_MIMALLOC
#error "do not include this file until USE_MIMALLOC is set to 1"
#endif
#if USE_MIMALLOC
#include <cstddef>
namespace DB
@ -26,3 +23,5 @@ struct MiAllocator
};
}
#endif

View File

@ -5,14 +5,14 @@
/// Available events. Add something here as you wish.
#define APPLY_FOR_EVENTS(M) \
M(Query, "Number of queries started to be interpreted and maybe executed. Does not include queries that are failed to parse, that are rejected due to AST size limits; rejected due to quota limits or limits on number of simultaneously running queries. May include internal queries initiated by ClickHouse itself. Does not count subqueries.") \
M(Query, "Number of queries to be interpreted and potentially executed. Does not include queries that failed to parse or were rejected due to AST size limits, quota limits or limits on the number of simultaneously running queries. May include internal queries initiated by ClickHouse itself. Does not count subqueries.") \
M(SelectQuery, "Same as Query, but only for SELECT queries.") \
M(InsertQuery, "Same as Query, but only for INSERT queries.") \
M(FileOpen, "Number of files opened.") \
M(Seek, "Number of times the 'lseek' function was called.") \
M(ReadBufferFromFileDescriptorRead, "Number of reads (read/pread) from a file descriptor. Does not include sockets.") \
M(ReadBufferFromFileDescriptorReadFailed, "Number of times the read (read/pread) from a file descriptor have failed.") \
M(ReadBufferFromFileDescriptorReadBytes, "Number of bytes read from file descriptors. If the file is compressed, this will show compressed data size.") \
M(ReadBufferFromFileDescriptorReadBytes, "Number of bytes read from file descriptors. If the file is compressed, this will show the compressed data size.") \
M(WriteBufferFromFileDescriptorWrite, "Number of writes (write/pwrite) to a file descriptor. Does not include sockets.") \
M(WriteBufferFromFileDescriptorWriteFailed, "Number of times the write (write/pwrite) to a file descriptor have failed.") \
M(WriteBufferFromFileDescriptorWriteBytes, "Number of bytes written to file descriptors. If the file is compressed, this will show compressed data size.") \

View File

@ -1,16 +1,15 @@
#include "QueryProfiler.h"
#include <random>
#include <pcg_random.hpp>
#include <common/Pipe.h>
#include <common/phdr_cache.h>
#include <common/config_common.h>
#include <common/StackTrace.h>
#include <Common/StackTrace.h>
#include <common/StringRef.h>
#include <common/logger_useful.h>
#include <Common/CurrentThread.h>
#include <Common/Exception.h>
#include <Common/randomSeed.h>
#include <Common/thread_local_rng.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromFileDescriptor.h>
@ -63,7 +62,6 @@ namespace
constexpr size_t QUERY_ID_MAX_LEN = 1024;
thread_local size_t write_trace_iteration = 0;
thread_local pcg64 rng{randomSeed()};
void writeTraceInfo(TimerType timer_type, int /* sig */, siginfo_t * info, void * context)
{
@ -87,7 +85,8 @@ namespace
constexpr size_t buf_size = sizeof(char) + // TraceCollector stop flag
8 * sizeof(char) + // maximum VarUInt length for string size
QUERY_ID_MAX_LEN * sizeof(char) + // maximum query_id length
sizeof(StackTrace) + // collected stack trace
sizeof(UInt8) + // number of stack frames
sizeof(StackTrace::Frames) + // collected stack trace, maximum capacity
sizeof(TimerType) + // timer type
sizeof(UInt32); // thread_number
char buffer[buf_size];
@ -103,13 +102,19 @@ namespace
writeChar(false, out);
writeStringBinary(query_id, out);
writePODBinary(stack_trace, out);
size_t stack_trace_size = stack_trace.getSize();
size_t stack_trace_offset = stack_trace.getOffset();
writeIntBinary(UInt8(stack_trace_size - stack_trace_offset), out);
for (size_t i = stack_trace_offset; i < stack_trace_size; ++i)
writePODBinary(stack_trace.getFrames()[i], out);
writePODBinary(timer_type, out);
writePODBinary(thread_number, out);
out.next();
}
const UInt32 TIMER_PRECISION = 1e9;
[[maybe_unused]] const UInt32 TIMER_PRECISION = 1e9;
}
namespace ErrorCodes
@ -153,7 +158,12 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const Int32 thread_id, const
struct sigevent sev;
sev.sigev_notify = SIGEV_THREAD_ID;
sev.sigev_signo = pause_signal;
#if defined(__FreeBSD__)
sev._sigev_un._threadid = thread_id;
#else
sev._sigev_un._tid = thread_id;
#endif
if (timer_create(clock_type, &sev, &timer_id))
throwFromErrno("Failed to create thread timer", ErrorCodes::CANNOT_CREATE_TIMER);
@ -161,7 +171,7 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const Int32 thread_id, const
/// It will allow to sample short queries even if timer period is large.
/// (For example, with period of 1 second, query with 50 ms duration will be sampled with 1 / 20 probability).
/// It also helps to avoid interference (moire).
UInt32 period_rand = std::uniform_int_distribution<UInt32>(0, period)(rng);
UInt32 period_rand = std::uniform_int_distribution<UInt32>(0, period)(thread_local_rng);
struct timespec interval{.tv_sec = period / TIMER_PRECISION, .tv_nsec = period % TIMER_PRECISION};
struct timespec offset{.tv_sec = period_rand / TIMER_PRECISION, .tv_nsec = period_rand % TIMER_PRECISION};
@ -176,7 +186,11 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const Int32 thread_id, const
throw;
}
#else
UNUSED(thread_id, clock_type, period, pause_signal);
UNUSED(thread_id);
UNUSED(clock_type);
UNUSED(period);
UNUSED(pause_signal);
throw Exception("QueryProfiler cannot work with stock libunwind", ErrorCodes::NOT_IMPLEMENTED);
#endif
}

View File

@ -20,6 +20,9 @@ SharedLibrary::SharedLibrary(const std::string & path, int flags)
throw Exception(std::string("Cannot dlopen: ") + dlerror(), ErrorCodes::CANNOT_DLOPEN);
updatePHDRCache();
/// NOTE: race condition exists when loading multiple shared libraries concurrently.
/// We don't care (or add global mutex for this method).
}
SharedLibrary::~SharedLibrary()

View File

@ -1,16 +1,15 @@
#include <common/StackTrace.h>
#include <common/SimpleCache.h>
#include <common/demangle.h>
#include <Common/config.h>
#include <Common/StackTrace.h>
#include <Common/SymbolIndex.h>
#include <Common/Dwarf.h>
#include <Common/Elf.h>
#include <sstream>
#include <filesystem>
#include <unordered_map>
#include <cstring>
#include <cxxabi.h>
#include <execinfo.h>
#if USE_UNWIND
#define UNW_LOCAL_ONLY
#include <libunwind.h>
#endif
std::string signalToErrorMessage(int sig, const siginfo_t & info, const ucontext_t & context)
{
@ -31,6 +30,8 @@ std::string signalToErrorMessage(int sig, const siginfo_t & info, const ucontext
error << " Access: write.";
else
error << " Access: read.";
#else
UNUSED(context);
#endif
switch (info.si_code)
@ -155,7 +156,7 @@ std::string signalToErrorMessage(int sig, const siginfo_t & info, const ucontext
return error.str();
}
void * getCallerAddress(const ucontext_t & context)
static void * getCallerAddress(const ucontext_t & context)
{
#if defined(__x86_64__)
/// Get the address at the time the signal was raised from the RIP (x86-64)
@ -168,9 +169,9 @@ void * getCallerAddress(const ucontext_t & context)
#endif
#elif defined(__aarch64__)
return reinterpret_cast<void *>(context.uc_mcontext.pc);
#endif
#else
return nullptr;
#endif
}
StackTrace::StackTrace()
@ -182,12 +183,25 @@ StackTrace::StackTrace(const ucontext_t & signal_context)
{
tryCapture();
if (size == 0)
void * caller_address = getCallerAddress(signal_context);
if (size == 0 && caller_address)
{
/// No stack trace was captured. At least we can try parsing caller address
void * caller_address = getCallerAddress(signal_context);
if (caller_address)
frames[size++] = reinterpret_cast<void *>(caller_address);
frames[0] = caller_address;
size = 1;
}
else
{
/// Skip excessive stack frames that we have created while finding stack trace.
for (size_t i = 0; i < size; ++i)
{
if (frames[i] == caller_address)
{
offset = i;
break;
}
}
}
}
@ -195,6 +209,12 @@ StackTrace::StackTrace(NoCapture)
{
}
#if USE_UNWIND
extern "C" int unw_backtrace(void **, int);
#endif
void StackTrace::tryCapture()
{
size = 0;
@ -208,69 +228,70 @@ size_t StackTrace::getSize() const
return size;
}
size_t StackTrace::getOffset() const
{
return offset;
}
const StackTrace::Frames & StackTrace::getFrames() const
{
return frames;
}
static std::string toStringImpl(const StackTrace::Frames & frames, size_t offset, size_t size)
{
if (size == 0)
return "<Empty trace>";
const DB::SymbolIndex & symbol_index = DB::SymbolIndex::instance();
std::unordered_map<std::string, DB::Dwarf> dwarfs;
std::stringstream out;
for (size_t i = offset; i < size; ++i)
{
const void * addr = frames[i];
out << "#" << i << " " << addr << " ";
auto symbol = symbol_index.findSymbol(addr);
if (symbol)
{
int status = 0;
out << demangle(symbol->name, status);
}
else
out << "?";
out << " ";
if (auto object = symbol_index.findObject(addr))
{
if (std::filesystem::exists(object->name))
{
auto dwarf_it = dwarfs.try_emplace(object->name, *object->elf).first;
DB::Dwarf::LocationInfo location;
if (dwarf_it->second.findAddress(uintptr_t(addr) - uintptr_t(object->address_begin), location, DB::Dwarf::LocationInfoMode::FAST))
out << location.file.toString() << ":" << location.line;
else
out << object->name;
}
}
else
out << "?";
out << "\n";
}
return out.str();
}
std::string StackTrace::toString() const
{
/// Calculation of stack trace text is extremely slow.
/// We use simple cache because otherwise the server could be overloaded by trash queries.
static SimpleCache<decltype(StackTrace::toStringImpl), &StackTrace::toStringImpl> func_cached;
return func_cached(frames, size);
}
std::string StackTrace::toStringImpl(const Frames & frames, size_t size)
{
if (size == 0)
return "<Empty trace>";
char ** symbols = backtrace_symbols(frames.data(), size);
if (!symbols)
return "<Invalid trace>";
std::stringstream backtrace;
try
{
for (size_t i = 0; i < size; i++)
{
/// We do "demangling" of names. The name is in parenthesis, before the '+' character.
char * name_start = nullptr;
char * name_end = nullptr;
std::string demangled_name;
int status = 0;
if (nullptr != (name_start = strchr(symbols[i], '('))
&& nullptr != (name_end = strchr(name_start, '+')))
{
++name_start;
*name_end = '\0';
demangled_name = demangle(name_start, status);
*name_end = '+';
}
backtrace << i << ". ";
if (0 == status && name_start && name_end)
{
backtrace.write(symbols[i], name_start - symbols[i]);
backtrace << demangled_name << name_end;
}
else
backtrace << symbols[i];
backtrace << std::endl;
}
}
catch (...)
{
free(symbols);
throw;
}
free(symbols);
return backtrace.str();
static SimpleCache<decltype(toStringImpl), &toStringImpl> func_cached;
return func_cached(frames, offset, size);
}

View File

@ -34,28 +34,17 @@ public:
/// Creates empty object for deferred initialization
StackTrace(NoCapture);
/// Fills stack trace frames with provided sequence
template <typename Iterator>
StackTrace(Iterator it, Iterator end)
{
while (size < capacity && it != end)
{
frames[size++] = *(it++);
}
}
size_t getSize() const;
size_t getOffset() const;
const Frames & getFrames() const;
std::string toString() const;
protected:
void tryCapture();
static std::string toStringImpl(const Frames & frames, size_t size);
size_t size = 0;
size_t offset = 0; /// How many frames to skip while displaying.
Frames frames{};
};
std::string signalToErrorMessage(int sig, const siginfo_t & info, const ucontext_t & context);
void * getCallerAddress(const ucontext_t & context);

View File

@ -0,0 +1,318 @@
#include <Common/SymbolIndex.h>
#include <algorithm>
#include <optional>
#include <link.h>
//#include <iostream>
#include <filesystem>
namespace DB
{
namespace
{
/// Notes: "PHDR" is "Program Headers".
/// To look at program headers, run:
/// readelf -l ./clickhouse-server
/// To look at section headers, run:
/// readelf -S ./clickhouse-server
/// Also look at: https://wiki.osdev.org/ELF
/// Also look at: man elf
/// http://www.linker-aliens.org/blogs/ali/entry/inside_elf_symbol_tables/
/// https://stackoverflow.com/questions/32088140/multiple-string-tables-in-elf-object
/// Based on the code of musl-libc and the answer of Kanalpiroge on
/// https://stackoverflow.com/questions/15779185/list-all-the-functions-symbols-on-the-fly-in-c-code-on-a-linux-architecture
/// It does not extract all the symbols (but only public - exported and used for dynamic linking),
/// but will work if we cannot find or parse ELF files.
void collectSymbolsFromProgramHeaders(dl_phdr_info * info,
std::vector<SymbolIndex::Symbol> & symbols)
{
/* Iterate over all headers of the current shared lib
* (first call is for the executable itself)
*/
for (size_t header_index = 0; header_index < info->dlpi_phnum; ++header_index)
{
/* Further processing is only needed if the dynamic section is reached
*/
if (info->dlpi_phdr[header_index].p_type != PT_DYNAMIC)
continue;
/* Get a pointer to the first entry of the dynamic section.
* It's address is the shared lib's address + the virtual address
*/
const ElfW(Dyn) * dyn_begin = reinterpret_cast<const ElfW(Dyn) *>(info->dlpi_addr + info->dlpi_phdr[header_index].p_vaddr);
/// For unknown reason, addresses are sometimes relative sometimes absolute.
auto correct_address = [](ElfW(Addr) base, ElfW(Addr) ptr)
{
return ptr > base ? ptr : base + ptr;
};
/* Iterate over all entries of the dynamic section until the
* end of the symbol table is reached. This is indicated by
* an entry with d_tag == DT_NULL.
*/
size_t sym_cnt = 0;
for (auto it = dyn_begin; it->d_tag != DT_NULL; ++it)
{
if (it->d_tag == DT_HASH)
{
const ElfW(Word) * hash = reinterpret_cast<const ElfW(Word) *>(correct_address(info->dlpi_addr, it->d_un.d_ptr));
sym_cnt = hash[1];
break;
}
else if (it->d_tag == DT_GNU_HASH)
{
/// This code based on Musl-libc.
const uint32_t * buckets = nullptr;
const uint32_t * hashval = nullptr;
const ElfW(Word) * hash = reinterpret_cast<const ElfW(Word) *>(correct_address(info->dlpi_addr, it->d_un.d_ptr));
buckets = hash + 4 + (hash[2] * sizeof(size_t) / 4);
for (ElfW(Word) i = 0; i < hash[0]; ++i)
if (buckets[i] > sym_cnt)
sym_cnt = buckets[i];
if (sym_cnt)
{
sym_cnt -= hash[1];
hashval = buckets + hash[0] + sym_cnt;
do
{
++sym_cnt;
}
while (!(*hashval++ & 1));
}
break;
}
}
if (!sym_cnt)
continue;
const char * strtab = nullptr;
for (auto it = dyn_begin; it->d_tag != DT_NULL; ++it)
{
if (it->d_tag == DT_STRTAB)
{
strtab = reinterpret_cast<const char *>(correct_address(info->dlpi_addr, it->d_un.d_ptr));
break;
}
}
if (!strtab)
continue;
for (auto it = dyn_begin; it->d_tag != DT_NULL; ++it)
{
if (it->d_tag == DT_SYMTAB)
{
/* Get the pointer to the first entry of the symbol table */
const ElfW(Sym) * elf_sym = reinterpret_cast<const ElfW(Sym) *>(correct_address(info->dlpi_addr, it->d_un.d_ptr));
/* Iterate over the symbol table */
for (ElfW(Word) sym_index = 0; sym_index < sym_cnt; ++sym_index)
{
/// We are not interested in empty symbols.
if (!elf_sym[sym_index].st_size)
continue;
/* Get the name of the sym_index-th symbol.
* This is located at the address of st_name relative to the beginning of the string table.
*/
const char * sym_name = &strtab[elf_sym[sym_index].st_name];
if (!sym_name)
continue;
SymbolIndex::Symbol symbol;
symbol.address_begin = reinterpret_cast<const void *>(info->dlpi_addr + elf_sym[sym_index].st_value);
symbol.address_end = reinterpret_cast<const void *>(info->dlpi_addr + elf_sym[sym_index].st_value + elf_sym[sym_index].st_size);
symbol.name = sym_name;
symbols.push_back(std::move(symbol));
}
break;
}
}
}
}
void collectSymbolsFromELFSymbolTable(
dl_phdr_info * info,
const Elf & elf,
const Elf::Section & symbol_table,
const Elf::Section & string_table,
std::vector<SymbolIndex::Symbol> & symbols)
{
/// Iterate symbol table.
const ElfSym * symbol_table_entry = reinterpret_cast<const ElfSym *>(symbol_table.begin());
const ElfSym * symbol_table_end = reinterpret_cast<const ElfSym *>(symbol_table.end());
const char * strings = string_table.begin();
for (; symbol_table_entry < symbol_table_end; ++symbol_table_entry)
{
if (!symbol_table_entry->st_name
|| !symbol_table_entry->st_value
|| !symbol_table_entry->st_size
|| strings + symbol_table_entry->st_name >= elf.end())
continue;
/// Find the name in strings table.
const char * symbol_name = strings + symbol_table_entry->st_name;
if (!symbol_name)
continue;
SymbolIndex::Symbol symbol;
symbol.address_begin = reinterpret_cast<const void *>(info->dlpi_addr + symbol_table_entry->st_value);
symbol.address_end = reinterpret_cast<const void *>(info->dlpi_addr + symbol_table_entry->st_value + symbol_table_entry->st_size);
symbol.name = symbol_name;
symbols.push_back(std::move(symbol));
}
}
bool searchAndCollectSymbolsFromELFSymbolTable(
dl_phdr_info * info,
const Elf & elf,
unsigned section_header_type,
const char * string_table_name,
std::vector<SymbolIndex::Symbol> & symbols)
{
std::optional<Elf::Section> symbol_table;
std::optional<Elf::Section> string_table;
if (!elf.iterateSections([&](const Elf::Section & section, size_t)
{
if (section.header.sh_type == section_header_type)
symbol_table.emplace(section);
else if (section.header.sh_type == SHT_STRTAB && 0 == strcmp(section.name(), string_table_name))
string_table.emplace(section);
if (symbol_table && string_table)
return true;
return false;
}))
{
return false;
}
collectSymbolsFromELFSymbolTable(info, elf, *symbol_table, *string_table, symbols);
return true;
}
void collectSymbolsFromELF(dl_phdr_info * info,
std::vector<SymbolIndex::Symbol> & symbols,
std::vector<SymbolIndex::Object> & objects)
{
std::string object_name = info->dlpi_name;
/// If the name is empty - it's main executable.
/// Find a elf file for the main executable.
if (object_name.empty())
object_name = "/proc/self/exe";
std::error_code ec;
std::filesystem::path canonical_path = std::filesystem::canonical(object_name, ec);
if (ec)
return;
/// Debug info and symbol table sections may be splitted to separate binary.
std::filesystem::path debug_info_path = std::filesystem::path("/usr/lib/debug") / canonical_path.relative_path();
object_name = std::filesystem::exists(debug_info_path) ? debug_info_path : canonical_path;
SymbolIndex::Object object;
object.elf = std::make_unique<Elf>(object_name);
object.address_begin = reinterpret_cast<const void *>(info->dlpi_addr);
object.address_end = reinterpret_cast<const void *>(info->dlpi_addr + object.elf->size());
object.name = object_name;
objects.push_back(std::move(object));
searchAndCollectSymbolsFromELFSymbolTable(info, *objects.back().elf, SHT_SYMTAB, ".strtab", symbols);
/// Unneeded because they were parsed from "program headers" of loaded objects.
//searchAndCollectSymbolsFromELFSymbolTable(info, *objects.back().elf, SHT_DYNSYM, ".dynstr", symbols);
}
/* Callback for dl_iterate_phdr.
* Is called by dl_iterate_phdr for every loaded shared lib until something
* else than 0 is returned by one call of this function.
*/
int collectSymbols(dl_phdr_info * info, size_t, void * data_ptr)
{
SymbolIndex::Data & data = *reinterpret_cast<SymbolIndex::Data *>(data_ptr);
collectSymbolsFromProgramHeaders(info, data.symbols);
collectSymbolsFromELF(info, data.symbols, data.objects);
/* Continue iterations */
return 0;
}
template <typename T>
const T * find(const void * address, const std::vector<T> & vec)
{
/// First range that has left boundary greater than address.
auto it = std::lower_bound(vec.begin(), vec.end(), address,
[](const T & symbol, const void * addr) { return symbol.address_begin <= addr; });
if (it == vec.begin())
return nullptr;
else
--it; /// Last range that has left boundary less or equals than address.
if (address >= it->address_begin && address < it->address_end)
return &*it;
else
return nullptr;
}
}
void SymbolIndex::update()
{
dl_iterate_phdr(collectSymbols, &data.symbols);
std::sort(data.objects.begin(), data.objects.end(), [](const Object & a, const Object & b) { return a.address_begin < b.address_begin; });
std::sort(data.symbols.begin(), data.symbols.end(), [](const Symbol & a, const Symbol & b) { return a.address_begin < b.address_begin; });
/// We found symbols both from loaded program headers and from ELF symbol tables.
data.symbols.erase(std::unique(data.symbols.begin(), data.symbols.end(), [](const Symbol & a, const Symbol & b)
{
return a.address_begin == b.address_begin && a.address_end == b.address_end;
}), data.symbols.end());
}
const SymbolIndex::Symbol * SymbolIndex::findSymbol(const void * address) const
{
return find(address, data.symbols);
}
const SymbolIndex::Object * SymbolIndex::findObject(const void * address) const
{
return find(address, data.objects);
}
}

View File

@ -0,0 +1,55 @@
#pragma once
#include <vector>
#include <string>
#include <ext/singleton.h>
#include <Common/Elf.h>
namespace DB
{
/** Allow to quickly find symbol name from address.
* Used as a replacement for "dladdr" function which is extremely slow.
* It works better than "dladdr" because it also allows to search private symbols, that are not participated in shared linking.
*/
class SymbolIndex : public ext::singleton<SymbolIndex>
{
protected:
friend class ext::singleton<SymbolIndex>;
SymbolIndex() { update(); }
public:
struct Symbol
{
const void * address_begin;
const void * address_end;
const char * name;
};
struct Object
{
const void * address_begin;
const void * address_end;
std::string name;
std::unique_ptr<Elf> elf;
};
const Symbol * findSymbol(const void * address) const;
const Object * findObject(const void * address) const;
const std::vector<Symbol> & symbols() const { return data.symbols; }
const std::vector<Object> & objects() const { return data.objects; }
struct Data
{
std::vector<Symbol> symbols;
std::vector<Object> objects;
};
private:
Data data;
void update();
};
}

View File

@ -1,7 +1,6 @@
#include <Common/ThreadPool.h>
#include <Common/Exception.h>
#include <iostream>
#include <type_traits>
@ -13,6 +12,14 @@ namespace DB
}
}
namespace CurrentMetrics
{
extern const Metric GlobalThread;
extern const Metric GlobalThreadActive;
extern const Metric LocalThread;
extern const Metric LocalThreadActive;
}
template <typename Thread>
ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads)
@ -26,6 +33,28 @@ ThreadPoolImpl<Thread>::ThreadPoolImpl(size_t max_threads, size_t max_free_threa
{
}
template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxThreads(size_t value)
{
std::lock_guard lock(mutex);
max_threads = value;
}
template <typename Thread>
void ThreadPoolImpl<Thread>::setMaxFreeThreads(size_t value)
{
std::lock_guard lock(mutex);
max_free_threads = value;
}
template <typename Thread>
void ThreadPoolImpl<Thread>::setQueueSize(size_t value)
{
std::lock_guard lock(mutex);
queue_size = value;
}
template <typename Thread>
template <typename ReturnType>
ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::optional<uint64_t> wait_microseconds)
@ -51,7 +80,7 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
auto pred = [this] { return !queue_size || scheduled_jobs < queue_size || shutdown; };
if (wait_microseconds)
if (wait_microseconds) /// Check for optional. Condition is true if the optional is set and the value is zero.
{
if (!job_finished.wait_for(lock, std::chrono::microseconds(*wait_microseconds), pred))
return on_error();
@ -75,6 +104,15 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
catch (...)
{
threads.pop_front();
/// Remove the job and return error to caller.
/// Note that if we have allocated at least one thread, we may continue
/// (one thread is enough to process all jobs).
/// But this condition indicate an error nevertheless and better to refuse.
jobs.pop();
--scheduled_jobs;
return on_error();
}
}
}
@ -148,6 +186,9 @@ size_t ThreadPoolImpl<Thread>::active() const
template <typename Thread>
void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_it)
{
CurrentMetrics::Increment metric_all_threads(
std::is_same_v<Thread, std::thread> ? CurrentMetrics::GlobalThread : CurrentMetrics::LocalThread);
while (true)
{
Job job;
@ -174,6 +215,9 @@ void ThreadPoolImpl<Thread>::worker(typename std::list<Thread>::iterator thread_
{
try
{
CurrentMetrics::Increment metric_active_threads(
std::is_same_v<Thread, std::thread> ? CurrentMetrics::GlobalThreadActive : CurrentMetrics::LocalThreadActive);
job();
}
catch (...)

View File

@ -60,14 +60,18 @@ public:
/// Returns number of running and scheduled jobs.
size_t active() const;
void setMaxThreads(size_t value);
void setMaxFreeThreads(size_t value);
void setQueueSize(size_t value);
private:
mutable std::mutex mutex;
std::condition_variable job_finished;
std::condition_variable new_job_or_shutdown;
const size_t max_threads;
const size_t max_free_threads;
const size_t queue_size;
size_t max_threads;
size_t max_free_threads;
size_t queue_size;
size_t scheduled_jobs = 0;
bool shutdown = false;

View File

@ -3,7 +3,7 @@
#include <Core/Field.h>
#include <Poco/Logger.h>
#include <common/Pipe.h>
#include <common/StackTrace.h>
#include <Common/StackTrace.h>
#include <common/logger_useful.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromFileDescriptor.h>
@ -46,6 +46,7 @@ TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log)
if (-1 == fcntl(trace_pipe.fds_rw[1], F_SETFL, flags | O_NONBLOCK))
throwFromErrno("Cannot set non-blocking mode of pipe", ErrorCodes::CANNOT_FCNTL);
#if !defined(__FreeBSD__)
/** Increase pipe size to avoid slowdown during fine-grained trace collection.
*/
constexpr int max_pipe_capacity_to_set = 1048576;
@ -57,6 +58,7 @@ TraceCollector::TraceCollector(std::shared_ptr<TraceLog> & trace_log)
throwFromErrno("Cannot increase pipe capacity to " + toString(pipe_size * 2), ErrorCodes::CANNOT_FCNTL);
LOG_TRACE(log, "Pipe capacity is " << formatReadableSizeWithBinarySuffix(std::min(pipe_size, max_pipe_capacity_to_set)));
#endif
thread = ThreadFromGlobalPool(&TraceCollector::run, this);
}
@ -103,25 +105,28 @@ void TraceCollector::run()
break;
std::string query_id;
StackTrace stack_trace(NoCapture{});
TimerType timer_type;
UInt32 thread_number;
readStringBinary(query_id, in);
readPODBinary(stack_trace, in);
readPODBinary(timer_type, in);
readPODBinary(thread_number, in);
const auto size = stack_trace.getSize();
const auto & frames = stack_trace.getFrames();
UInt8 size = 0;
readIntBinary(size, in);
Array trace;
trace.reserve(size);
for (size_t i = 0; i < size; i++)
trace.emplace_back(UInt64(reinterpret_cast<uintptr_t>(frames[i])));
{
uintptr_t addr = 0;
readPODBinary(addr, in);
trace.emplace_back(UInt64(addr));
}
TimerType timer_type;
readPODBinary(timer_type, in);
UInt32 thread_number;
readPODBinary(thread_number, in);
TraceLogElement element{std::time(nullptr), timer_type, thread_number, query_id, trace};
trace_log->add(element);
}
}

View File

@ -4,14 +4,13 @@
#include "TestKeeper.h"
#include <random>
#include <pcg_random.hpp>
#include <functional>
#include <boost/algorithm/string.hpp>
#include <common/logger_useful.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/PODArray.h>
#include <Common/randomSeed.h>
#include <Common/thread_local_rng.h>
#include <Common/Exception.h>
#include <Poco/Net/NetException.h>
@ -159,8 +158,7 @@ struct ZooKeeperArgs
}
/// Shuffle the hosts to distribute the load among ZooKeeper nodes.
pcg64 rng(randomSeed());
std::shuffle(hosts_strings.begin(), hosts_strings.end(), rng);
std::shuffle(hosts_strings.begin(), hosts_strings.end(), thread_local_rng);
for (auto & host : hosts_strings)
{

View File

@ -9,5 +9,5 @@
#cmakedefine01 USE_CPUINFO
#cmakedefine01 USE_BROTLI
#cmakedefine01 USE_MIMALLOC
#cmakedefine01 USE_UNWIND
#cmakedefine01 CLICKHOUSE_SPLIT_BINARY

View File

@ -78,3 +78,6 @@ target_link_libraries (stopwatch PRIVATE clickhouse_common_io)
add_executable (mi_malloc_test mi_malloc_test.cpp)
target_link_libraries (mi_malloc_test PRIVATE clickhouse_common_io)
add_executable (symbol_index symbol_index.cpp)
target_link_libraries (symbol_index PRIVATE clickhouse_common_io)

View File

@ -7,7 +7,7 @@
#include <Common/CompactArray.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <boost/filesystem.hpp>
#include <filesystem>
#include <string>
#include <iostream>
#include <fstream>
@ -15,7 +15,7 @@
#include <cstdlib>
#include <port/unistd.h>
namespace fs = boost::filesystem;
namespace fs = std::filesystem;
std::string createTmpPath(const std::string & filename)
{

View File

@ -0,0 +1,89 @@
#include <atomic>
#include <Common/ThreadPool.h>
#include <gtest/gtest.h>
/// Test what happens if local ThreadPool cannot create a ThreadFromGlobalPool.
/// There was a bug: if local ThreadPool cannot allocate even a single thread,
/// the job will be scheduled but never get executed.
TEST(ThreadPool, GlobalFull1)
{
GlobalThreadPool & global_pool = GlobalThreadPool::instance();
static constexpr size_t capacity = 5;
global_pool.setMaxThreads(capacity);
global_pool.setMaxFreeThreads(1);
global_pool.setQueueSize(capacity);
global_pool.wait();
std::atomic<size_t> counter = 0;
static constexpr size_t num_jobs = capacity + 1;
auto func = [&] { ++counter; while (counter != num_jobs) {} };
ThreadPool pool(num_jobs);
for (size_t i = 0; i < capacity; ++i)
pool.schedule(func);
for (size_t i = capacity; i < num_jobs; ++i)
{
EXPECT_THROW(pool.schedule(func), DB::Exception);
++counter;
}
pool.wait();
EXPECT_EQ(counter, num_jobs);
global_pool.setMaxThreads(10000);
global_pool.setMaxFreeThreads(1000);
global_pool.setQueueSize(10000);
}
TEST(ThreadPool, GlobalFull2)
{
GlobalThreadPool & global_pool = GlobalThreadPool::instance();
static constexpr size_t capacity = 5;
global_pool.setMaxThreads(capacity);
global_pool.setMaxFreeThreads(1);
global_pool.setQueueSize(capacity);
/// ThreadFromGlobalPool from local thread pools from previous test case have exited
/// but their threads from global_pool may not have finished (they still have to exit).
/// If we will not wait here, we can get "Cannot schedule a task exception" earlier than we expect in this test.
global_pool.wait();
std::atomic<size_t> counter = 0;
auto func = [&] { ++counter; while (counter != capacity + 1) {} };
ThreadPool pool(capacity, 0, capacity);
for (size_t i = 0; i < capacity; ++i)
pool.schedule(func);
ThreadPool another_pool(1);
EXPECT_THROW(another_pool.schedule(func), DB::Exception);
++counter;
pool.wait();
global_pool.wait();
for (size_t i = 0; i < capacity; ++i)
another_pool.schedule([&] { ++counter; });
another_pool.wait();
EXPECT_EQ(counter, capacity * 2 + 1);
global_pool.setMaxThreads(10000);
global_pool.setMaxFreeThreads(1000);
global_pool.setQueueSize(10000);
}

View File

@ -0,0 +1,58 @@
#include <Common/SymbolIndex.h>
#include <Common/Elf.h>
#include <Common/Dwarf.h>
#include <Core/Defines.h>
#include <common/demangle.h>
#include <iostream>
#include <dlfcn.h>
NO_INLINE const void * getAddress()
{
return __builtin_return_address(0);
}
using namespace DB;
int main(int argc, char ** argv)
{
if (argc < 2)
{
std::cerr << "Usage: ./symbol_index address\n";
return 1;
}
const SymbolIndex & symbol_index = SymbolIndex::instance();
for (const auto & elem : symbol_index.symbols())
std::cout << elem.name << ": " << elem.address_begin << " ... " << elem.address_end << "\n";
std::cout << "\n";
const void * address = reinterpret_cast<void*>(std::stoull(argv[1], nullptr, 16));
auto symbol = symbol_index.findSymbol(address);
if (symbol)
std::cerr << symbol->name << ": " << symbol->address_begin << " ... " << symbol->address_end << "\n";
else
std::cerr << "SymbolIndex: Not found\n";
Dl_info info;
if (dladdr(address, &info) && info.dli_sname)
std::cerr << demangle(info.dli_sname) << ": " << info.dli_saddr << "\n";
else
std::cerr << "dladdr: Not found\n";
auto object = symbol_index.findObject(getAddress());
Dwarf dwarf(*object->elf);
Dwarf::LocationInfo location;
if (dwarf.findAddress(uintptr_t(address), location, Dwarf::LocationInfoMode::FAST))
std::cerr << location.file.toString() << ":" << location.line << "\n";
else
std::cerr << "Dwarf: Not found\n";
std::cerr << "\n";
std::cerr << StackTrace().toString() << "\n";
return 0;
}

View File

@ -0,0 +1,4 @@
#include <Common/thread_local_rng.h>
#include <Common/randomSeed.h>
thread_local pcg64 thread_local_rng{randomSeed()};

View File

@ -0,0 +1,5 @@
#pragma once
#include <pcg_random.hpp>
/// Fairly good thread-safe random number generator, but probably slow-down thread creation a little.
extern thread_local pcg64 thread_local_rng;

View File

@ -1,11 +1,8 @@
#include "CompressedReadBufferBase.h"
#include <vector>
#include <string.h>
#include <city.h>
#include <zstd.h>
#include <Common/PODArray.h>
#include <Common/ProfileEvents.h>
#include <Common/Exception.h>

View File

@ -1,8 +1,5 @@
#include <memory>
#include <city.h>
#include <lz4.h>
#include <lz4hc.h>
#include <zstd.h>
#include <string.h>
#include <common/unaligned.h>

View File

@ -7,7 +7,6 @@
#include <IO/ReadBufferFromFileBase.h>
#include <Common/typeid_cast.h>
#include <Compression/CompressionFactory.h>
#include <zstd.h>
namespace ProfileEvents
{

View File

@ -101,6 +101,7 @@ struct PerformanceStatistics
Element data[NUM_ELEMENTS];
/// It's Ok that generator is not seeded.
pcg64 rng;
/// To select from different algorithms we use a kind of "bandits" algorithm.

View File

@ -140,6 +140,11 @@
/// It could be any magic number.
#define DBMS_DISTRIBUTED_SENDS_MAGIC_NUMBER 0xCAFECABE
#if !__has_include(<sanitizer/asan_interface.h>)
# define ASAN_UNPOISON_MEMORY_REGION(a, b)
# define ASAN_POISON_MEMORY_REGION(a, b)
#endif
/// A macro for suppressing warnings about unused variables or function results.
/// Useful for structured bindings which have no standard way to declare this.
#define UNUSED(...) (void)(__VA_ARGS__)

View File

@ -255,22 +255,20 @@ public:
PacketPayloadWriteBuffer(WriteBuffer & out, size_t payload_length, uint8_t & sequence_id)
: WriteBuffer(out.position(), 0), out(out), sequence_id(sequence_id), total_left(payload_length)
{
startPacket();
startNewPacket();
setWorkingBuffer();
pos = out.position();
}
void checkPayloadSize()
bool remainingPayloadSize()
{
if (bytes_written + offset() < payload_length)
{
std::stringstream ss;
ss << "Incomplete payload. Written " << bytes << " bytes, expected " << payload_length << " bytes.";
throw Exception(ss.str(), 0);
}
return total_left;
}
~PacketPayloadWriteBuffer() override
{ next(); }
{
next();
}
private:
WriteBuffer & out;
@ -279,8 +277,9 @@ private:
size_t total_left = 0;
size_t payload_length = 0;
size_t bytes_written = 0;
bool eof = false;
void startPacket()
void startNewPacket()
{
payload_length = std::min(total_left, MAX_PACKET_LENGTH);
bytes_written = 0;
@ -288,34 +287,38 @@ private:
out.write(reinterpret_cast<char *>(&payload_length), 3);
out.write(sequence_id++);
bytes += 4;
}
/// Sets working buffer to the rest of current packet payload.
void setWorkingBuffer()
{
out.nextIfAtEnd();
working_buffer = WriteBuffer::Buffer(out.position(), out.position() + std::min(payload_length - bytes_written, out.available()));
pos = working_buffer.begin();
if (payload_length - bytes_written == 0)
{
/// Finished writing packet. Due to an implementation of WriteBuffer, working_buffer cannot be empty. Further write attempts will throw Exception.
eof = true;
working_buffer.resize(1);
}
}
protected:
void nextImpl() override
{
int written = pos - working_buffer.begin();
const int written = pos - working_buffer.begin();
if (eof)
throw Exception("Cannot write after end of buffer.", ErrorCodes::CANNOT_WRITE_AFTER_END_OF_BUFFER);
out.position() += written;
bytes_written += written;
if (bytes_written < payload_length)
{
out.nextIfAtEnd();
working_buffer = WriteBuffer::Buffer(out.position(), out.position() + std::min(payload_length - bytes_written, out.available()));
}
else if (total_left > 0 || payload_length == MAX_PACKET_LENGTH)
{
// Starting new packet, since packets of size greater than MAX_PACKET_LENGTH should be split.
startPacket();
}
else
{
// Finished writing packet. Buffer is set to empty to prevent rewriting (pos will be set to the beginning of a working buffer in next()).
// Further attempts to write will stall in the infinite loop.
working_buffer = WriteBuffer::Buffer(out.position(), out.position());
}
/// Packets of size greater than MAX_PACKET_LENGTH are split into few packets of size MAX_PACKET_LENGTH and las packet of size < MAX_PACKET_LENGTH.
if (bytes_written == payload_length && (total_left > 0 || payload_length == MAX_PACKET_LENGTH))
startNewPacket();
setWorkingBuffer();
}
};
@ -327,7 +330,13 @@ public:
{
PacketPayloadWriteBuffer buf(buffer, getPayloadSize(), sequence_id);
writePayloadImpl(buf);
buf.checkPayloadSize();
buf.next();
if (buf.remainingPayloadSize())
{
std::stringstream ss;
ss << "Incomplete payload. Written " << getPayloadSize() - buf.remainingPayloadSize() << " bytes, expected " << getPayloadSize() << " bytes.";
throw Exception(ss.str(), 0);
}
}
virtual ~WritePacket() = default;

View File

@ -171,6 +171,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, input_format_with_names_use_header, false, "For TSVWithNames and CSVWithNames input formats this controls whether format parser is to assume that column data appear in the input exactly as they are specified in the header.") \
M(SettingBool, input_format_import_nested_json, false, "Map nested JSON data to nested tables (it works for JSONEachRow format).") \
M(SettingBool, input_format_defaults_for_omitted_fields, true, "For input data calculate default expressions for omitted fields (it works for JSONEachRow format).") \
M(SettingBool, input_format_null_as_default, false, "For CSV format initialize null fields with default values if data type of this field is not nullable") \
\
M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.") \
\
@ -300,6 +301,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingChar, format_csv_delimiter, ',', "The character to be considered as a delimiter in CSV data. If setting with a string, a string has to have a length of 1.") \
M(SettingBool, format_csv_allow_single_quotes, 1, "If it is set to true, allow strings in single quotes.") \
M(SettingBool, format_csv_allow_double_quotes, 1, "If it is set to true, allow strings in double quotes.") \
M(SettingBool, input_format_csv_unquoted_null_literal_as_null, false, "Consider unquoted NULL literal as \N") \
\
M(SettingDateTimeInputFormat, date_time_input_format, FormatSettings::DateTimeInputFormat::Basic, "Method to read DateTime from text input formats. Possible values: 'basic' and 'best_effort'.") \
M(SettingBool, log_profile_events, true, "Log query performance statistics into the query_log and query_thread_log.") \
@ -322,7 +324,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, parallel_view_processing, false, "Enables pushing to attached views concurrently instead of sequentially.") \
M(SettingBool, enable_debug_queries, false, "Enables debug queries such as AST.") \
M(SettingBool, enable_unaligned_array_join, false, "Allow ARRAY JOIN with multiple arrays that have different sizes. When this settings is enabled, arrays will be resized to the longest one.") \
M(SettingBool, optimize_pk_order, true, "Enable order by optimization in reading in primary key order.") \
M(SettingBool, optimize_read_in_order, true, "Enable ORDER BY optimization for reading data in corresponding order in MergeTree tables.") \
M(SettingBool, low_cardinality_allow_in_native_format, true, "Use LowCardinality type in Native format. Otherwise, convert LowCardinality columns to ordinary for select query, and convert ordinary columns to required LowCardinality for insert query.") \
M(SettingBool, allow_experimental_multiple_joins_emulation, true, "Emulate multiple joins using subselects") \
M(SettingBool, allow_experimental_cross_to_join_conversion, true, "Convert CROSS JOIN to INNER JOIN if possible") \
@ -334,6 +336,7 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingBool, allow_hyperscan, true, "Allow functions that use Hyperscan library. Disable to avoid potentially long compilation times and excessive resource usage.") \
M(SettingBool, allow_simdjson, true, "Allow using simdjson library in 'JSON*' functions if AVX2 instructions are available. If disabled rapidjson will be used.") \
M(SettingBool, allow_introspection_functions, false, "Allow functions for introspection of ELF and DWARF for query profiling. These functions are slow and may impose security considerations.") \
\
M(SettingUInt64, max_partitions_per_insert_block, 100, "Limit maximum number of partitions in single INSERTed block. Zero means unlimited. Throw exception if the block contains too many partitions. This setting is a safety threshold, because using large number of partitions is a common misconception.") \
M(SettingBool, check_query_single_value_result, true, "Return check query result as single 1/0 value") \

View File

@ -4,6 +4,7 @@
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/FieldVisitors.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteHelpers.h>
@ -26,6 +27,7 @@ namespace ErrorCodes
extern const int SIZE_OF_FIXED_STRING_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_SETTING;
extern const int CANNOT_PARSE_BOOL;
}
@ -63,6 +65,30 @@ void SettingNumber<Type>::set(const String & x)
set(parse<Type>(x));
}
template <>
void SettingNumber<bool>::set(const String & x)
{
if (x.size() == 1)
{
if (x[0] == '0')
set(false);
else if (x[0] == '1')
set(true);
else
throw Exception("Cannot parse bool from string '" + x + "'", ErrorCodes::CANNOT_PARSE_BOOL);
}
else
{
ReadBufferFromString buf(x);
if (checkStringCaseInsensitive("true", buf))
set(true);
else if (checkStringCaseInsensitive("false", buf))
set(false);
else
throw Exception("Cannot parse bool from string '" + x + "'", ErrorCodes::CANNOT_PARSE_BOOL);
}
}
template <typename Type>
void SettingNumber<Type>::serialize(WriteBuffer & buf) const
{

View File

@ -4,6 +4,7 @@
#include <Common/Arena.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <DataTypes/DataTypeCustomSimpleAggregateFunction.h>
#include <DataTypes/DataTypeLowCardinality.h>
namespace DB
@ -15,10 +16,52 @@ namespace ErrorCodes
}
class RemovingLowCardinalityBlockInputStream : public IBlockInputStream
{
public:
RemovingLowCardinalityBlockInputStream(BlockInputStreamPtr input_, ColumnNumbers positions_)
: input(std::move(input_)), positions(std::move(positions_))
{
header = transform(input->getHeader());
}
Block transform(Block block)
{
if (block)
{
for (auto & pos : positions)
{
auto & col = block.safeGetByPosition(pos);
col.column = recursiveRemoveLowCardinality(col.column);
col.type = recursiveRemoveLowCardinality(col.type);
}
}
return block;
}
String getName() const override { return "RemovingLowCardinality"; }
Block getHeader() const override { return header; }
const BlockMissingValues & getMissingValues() const override { return input->getMissingValues(); }
bool isSortedOutput() const override { return input->isSortedOutput(); }
const SortDescription & getSortDescription() const override { return input->getSortDescription(); }
protected:
Block readImpl() override { return transform(input->read()); }
private:
Block header;
BlockInputStreamPtr input;
ColumnNumbers positions;
};
AggregatingSortedBlockInputStream::AggregatingSortedBlockInputStream(
const BlockInputStreams & inputs_, const SortDescription & description_, size_t max_block_size_)
: MergingSortedBlockInputStream(inputs_, description_, max_block_size_)
{
ColumnNumbers positions;
/// Fill in the column numbers that need to be aggregated.
for (size_t i = 0; i < num_columns; ++i)
{
@ -51,6 +94,9 @@ AggregatingSortedBlockInputStream::AggregatingSortedBlockInputStream(
allocatesMemoryInArena = true;
columns_to_simple_aggregate.emplace_back(std::move(desc));
if (recursiveRemoveLowCardinality(column.type).get() != column.type.get())
positions.emplace_back(i);
}
else
{
@ -58,6 +104,14 @@ AggregatingSortedBlockInputStream::AggregatingSortedBlockInputStream(
column_numbers_to_aggregate.push_back(i);
}
}
if (!positions.empty())
{
for (auto & input : children)
input = std::make_shared<RemovingLowCardinalityBlockInputStream>(input, positions);
header = children.at(0)->getHeader();
}
}

View File

@ -36,43 +36,58 @@ Block CubeBlockInputStream::getHeader() const
Block CubeBlockInputStream::readImpl()
{
/** After reading a block from input stream,
/** After reading all blocks from input stream,
* we will calculate all subsets of columns on next iterations of readImpl
* by zeroing columns at positions, where bits are zero in current bitmask.
*/
if (mask)
if (!is_data_read)
{
--mask;
Block cube_block = source_block;
for (size_t i = 0; i < keys.size(); ++i)
BlocksList source_blocks;
while (auto block = children[0]->read())
source_blocks.push_back(block);
if (source_blocks.empty())
return {};
is_data_read = true;
mask = (1 << keys.size()) - 1;
if (source_blocks.size() > 1)
source_block = aggregator.mergeBlocks(source_blocks, false);
else
source_block = std::move(source_blocks.front());
zero_block = source_block.cloneEmpty();
for (auto key : keys)
{
if (!((mask >> i) & 1))
{
size_t pos = keys.size() - i - 1;
auto & current = cube_block.getByPosition(keys[pos]);
current.column = zero_block.getByPosition(keys[pos]).column;
}
auto & current = zero_block.getByPosition(key);
current.column = current.column->cloneResized(source_block.rows());
}
BlocksList cube_blocks = { cube_block };
Block finalized = aggregator.mergeBlocks(cube_blocks, true);
auto finalized = source_block;
finalizeBlock(finalized);
return finalized;
}
source_block = children[0]->read();
if (!source_block)
return source_block;
if (!mask)
return {};
zero_block = source_block.cloneEmpty();
for (auto key : keys)
--mask;
auto cube_block = source_block;
for (size_t i = 0; i < keys.size(); ++i)
{
auto & current = zero_block.getByPosition(key);
current.column = current.column->cloneResized(source_block.rows());
if (!((mask >> i) & 1))
{
size_t pos = keys.size() - i - 1;
auto & current = cube_block.getByPosition(keys[pos]);
current.column = zero_block.getByPosition(keys[pos]).column;
}
}
Block finalized = source_block;
finalizeBlock(finalized);
mask = (1 << keys.size()) - 1;
BlocksList cube_blocks = { cube_block };
Block finalized = aggregator.mergeBlocks(cube_blocks, true);
return finalized;
}
}

View File

@ -36,6 +36,7 @@ private:
UInt32 mask = 0;
Block source_block;
Block zero_block;
bool is_data_read = false;
};
}

View File

@ -255,6 +255,10 @@ static void limitProgressingSpeed(size_t total_progress_size, size_t max_speed_i
if (desired_microseconds > total_elapsed_microseconds)
{
UInt64 sleep_microseconds = desired_microseconds - total_elapsed_microseconds;
/// Never sleep more than one second (it should be enough to limit speed for a reasonable amount, and otherwise it's too easy to make query hang).
sleep_microseconds = std::min(UInt64(1000000), sleep_microseconds);
sleepForMicroseconds(sleep_microseconds);
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_microseconds);
@ -349,7 +353,7 @@ void IBlockInputStream::progressImpl(const Progress & value)
ErrorCodes::TOO_SLOW);
/// If the predicted execution time is longer than `max_execution_time`.
if (limits.max_execution_time != 0 && total_rows)
if (limits.max_execution_time != 0 && total_rows && progress.read_rows)
{
double estimated_execution_time_seconds = elapsed_seconds * (static_cast<double>(total_rows) / progress.read_rows);

View File

@ -1,12 +1,16 @@
#include <DataStreams/AddingDefaultBlockOutputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataTypes/NestedUtils.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Parsers/ASTInsertQuery.h>
#include <Common/CurrentThread.h>
#include <Common/setThreadName.h>
#include <Common/getNumberOfPhysicalCPUCores.h>
#include <Common/ThreadPool.h>
#include <Storages/MergeTree/ReplicatedMergeTreeBlockOutputStream.h>
#include <Storages/StorageValues.h>
namespace DB
{
@ -44,13 +48,15 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
auto dependent_table = context.getTable(database_table.first, database_table.second);
auto & materialized_view = dynamic_cast<const StorageMaterializedView &>(*dependent_table);
if (StoragePtr inner_table = materialized_view.tryGetTargetTable())
addTableLock(inner_table->lockStructureForShare(true, context.getCurrentQueryId()));
StoragePtr inner_table = materialized_view.getTargetTable();
auto query = materialized_view.getInnerQuery();
BlockOutputStreamPtr out = std::make_shared<PushingToViewsBlockOutputStream>(
database_table.first, database_table.second, dependent_table, *views_context, ASTPtr());
views.emplace_back(ViewInfo{std::move(query), database_table.first, database_table.second, std::move(out)});
std::unique_ptr<ASTInsertQuery> insert = std::make_unique<ASTInsertQuery>();
insert->database = inner_table->getDatabaseName();
insert->table = inner_table->getTableName();
ASTPtr insert_query_ptr(insert.release());
InterpreterInsertQuery interpreter(insert_query_ptr, *views_context);
BlockIO io = interpreter.execute();
views.emplace_back(ViewInfo{query, database_table.first, database_table.second, io.out});
}
}
@ -173,8 +179,13 @@ void PushingToViewsBlockOutputStream::process(const Block & block, size_t view_n
try
{
BlockInputStreamPtr from = std::make_shared<OneBlockInputStream>(block);
InterpreterSelectQuery select(view.query, *views_context, from);
/// We create a table with the same name as original table and the same alias columns,
/// but it will contain single block (that is INSERT-ed into main table).
/// InterpreterSelectQuery will do processing of alias columns.
Context local_context = *views_context;
local_context.addViewSource(StorageValues::create(storage->getDatabaseName(), storage->getTableName(), storage->getColumns(), block));
InterpreterSelectQuery select(view.query, local_context, SelectQueryOptions());
BlockInputStreamPtr in = std::make_shared<MaterializingBlockInputStream>(select.execute().in);
/// Squashing is needed here because the materialized view query can generate a lot of blocks
/// even when only one block is inserted into the parent table (e.g. if the query is a GROUP BY

View File

@ -33,26 +33,40 @@ Block RollupBlockInputStream::readImpl()
* by zeroing out every column one-by-one and re-merging a block.
*/
if (current_key >= 0)
if (!is_data_read)
{
auto & current = rollup_block.getByPosition(keys[current_key]);
current.column = current.column->cloneEmpty()->cloneResized(rollup_block.rows());
--current_key;
BlocksList source_blocks;
while (auto block = children[0]->read())
source_blocks.push_back(block);
BlocksList rollup_blocks = { rollup_block };
rollup_block = aggregator.mergeBlocks(rollup_blocks, false);
if (source_blocks.empty())
return {};
Block finalized = rollup_block;
is_data_read = true;
if (source_blocks.size() > 1)
rollup_block = aggregator.mergeBlocks(source_blocks, false);
else
rollup_block = std::move(source_blocks.front());
current_key = keys.size() - 1;
auto finalized = rollup_block;
finalizeBlock(finalized);
return finalized;
}
Block block = children[0]->read();
current_key = keys.size() - 1;
if (current_key < 0)
return {};
rollup_block = block;
finalizeBlock(block);
auto & current = rollup_block.getByPosition(keys[current_key]);
current.column = current.column->cloneEmpty()->cloneResized(rollup_block.rows());
--current_key;
return block;
BlocksList rollup_blocks = { rollup_block };
rollup_block = aggregator.mergeBlocks(rollup_blocks, false);
auto finalized = rollup_block;
finalizeBlock(finalized);
return finalized;
}
}

View File

@ -35,6 +35,7 @@ private:
ColumnNumbers keys;
ssize_t current_key = -1;
Block rollup_block;
bool is_data_read = false;
};
}

View File

@ -17,10 +17,12 @@ TTLBlockInputStream::TTLBlockInputStream(
const BlockInputStreamPtr & input_,
const MergeTreeData & storage_,
const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time_)
time_t current_time_,
bool force_)
: storage(storage_)
, data_part(data_part_)
, current_time(current_time_)
, force(force_)
, old_ttl_infos(data_part->ttl_infos)
, log(&Logger::get(storage.getLogName() + " (TTLBlockInputStream)"))
, date_lut(DateLUT::instance())
@ -60,6 +62,10 @@ TTLBlockInputStream::TTLBlockInputStream(
}
}
bool TTLBlockInputStream::isTTLExpired(time_t ttl)
{
return (ttl && (ttl <= current_time));
}
Block TTLBlockInputStream::readImpl()
{
@ -70,13 +76,13 @@ Block TTLBlockInputStream::readImpl()
if (storage.hasTableTTL())
{
/// Skip all data if table ttl is expired for part
if (old_ttl_infos.table_ttl.max <= current_time)
if (isTTLExpired(old_ttl_infos.table_ttl.max))
{
rows_removed = data_part->rows_count;
return {};
}
if (old_ttl_infos.table_ttl.min <= current_time)
if (force || isTTLExpired(old_ttl_infos.table_ttl.min))
removeRowsWithExpiredTableTTL(block);
}
@ -96,15 +102,15 @@ void TTLBlockInputStream::readSuffixImpl()
data_part->empty_columns = std::move(empty_columns);
if (rows_removed)
LOG_INFO(log, "Removed " << rows_removed << " rows with expired ttl from part " << data_part->name);
LOG_INFO(log, "Removed " << rows_removed << " rows with expired TTL from part " << data_part->name);
}
void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
{
storage.ttl_table_entry.expression->execute(block);
const auto & current = block.getByName(storage.ttl_table_entry.result_column);
const IColumn * ttl_column = current.column.get();
const IColumn * ttl_column =
block.getByName(storage.ttl_table_entry.result_column).column.get();
const auto & column_names = header.getNames();
MutableColumns result_columns;
@ -112,15 +118,14 @@ void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
for (auto it = column_names.begin(); it != column_names.end(); ++it)
{
auto & column_with_type = block.getByName(*it);
const IColumn * values_column = column_with_type.column.get();
const IColumn * values_column = block.getByName(*it).column.get();
MutableColumnPtr result_column = values_column->cloneEmpty();
result_column->reserve(block.rows());
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
if (cur_ttl > current_time)
if (!isTTLExpired(cur_ttl))
{
new_ttl_infos.table_ttl.update(cur_ttl);
result_column->insertFrom(*values_column, i);
@ -148,10 +153,12 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
const auto & old_ttl_info = old_ttl_infos.columns_ttl[name];
auto & new_ttl_info = new_ttl_infos.columns_ttl[name];
if (old_ttl_info.min > current_time)
/// Nothing to do
if (!force && !isTTLExpired(old_ttl_info.min))
continue;
if (old_ttl_info.max <= current_time)
/// Later drop full column
if (isTTLExpired(old_ttl_info.max))
continue;
if (!block.has(ttl_entry.result_column))
@ -166,14 +173,12 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
MutableColumnPtr result_column = values_column->cloneEmpty();
result_column->reserve(block.rows());
const auto & current = block.getByName(ttl_entry.result_column);
const IColumn * ttl_column = current.column.get();
const IColumn * ttl_column = block.getByName(ttl_entry.result_column).column.get();
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
if (cur_ttl <= current_time)
if (isTTLExpired(cur_ttl))
{
if (default_column)
result_column->insertFrom(*default_column, i);

View File

@ -16,10 +16,11 @@ public:
const BlockInputStreamPtr & input_,
const MergeTreeData & storage_,
const MergeTreeData::MutableDataPartPtr & data_part_,
time_t current_time
time_t current_time,
bool force_
);
String getName() const override { return "TTLBlockInputStream"; }
String getName() const override { return "TTL"; }
Block getHeader() const override { return header; }
@ -36,6 +37,7 @@ private:
const MergeTreeData::MutableDataPartPtr & data_part;
time_t current_time;
bool force;
MergeTreeDataPart::TTLInfos old_ttl_infos;
MergeTreeDataPart::TTLInfos new_ttl_infos;
@ -50,13 +52,14 @@ private:
Block header;
private:
/// Removes values with expired ttl and computes new min_ttl and empty_columns for part
/// Removes values with expired ttl and computes new_ttl_infos and empty_columns for part
void removeValuesWithExpiredColumnTTL(Block & block);
/// Remove rows with expired table ttl and computes new min_ttl for part
/// Removes rows with expired table ttl and computes new ttl_infos for part
void removeRowsWithExpiredTableTTL(Block & block);
UInt32 getTimestampByIndex(const IColumn * column, size_t ind);
bool isTTLExpired(time_t ttl);
};
}

View File

@ -1,6 +1,5 @@
#include <random>
#include <pcg_random.hpp>
#include <Common/randomSeed.h>
#include <Common/thread_local_rng.h>
#include <DataStreams/ConcatBlockInputStream.h>
@ -21,8 +20,7 @@ BlockInputStreams narrowBlockInputStreams(BlockInputStreams & inputs, size_t wid
for (size_t i = 0; i < size; ++i)
distribution[i] = i % width;
pcg64 generator(randomSeed());
std::shuffle(distribution.begin(), distribution.end(), generator);
std::shuffle(distribution.begin(), distribution.end(), thread_local_rng);
for (size_t i = 0; i < size; ++i)
partitions[distribution[i]].push_back(inputs[i]);

View File

@ -272,9 +272,72 @@ void DataTypeNullable::serializeTextCSV(const IColumn & column, size_t row_num,
void DataTypeNullable::deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
safeDeserialize(column,
[&istr] { return checkStringByFirstCharacterAndAssertTheRest("\\N", istr); },
[this, &settings, &istr] (IColumn & nested) { nested_data_type->deserializeAsTextCSV(nested, istr, settings); });
constexpr char const * null_literal = "NULL";
constexpr size_t len = 4;
size_t null_prefix_len = 0;
auto check_for_null = [&istr, &settings, &null_prefix_len]
{
if (checkStringByFirstCharacterAndAssertTheRest("\\N", istr))
return true;
if (!settings.csv.unquoted_null_literal_as_null)
return false;
/// Check for unquoted NULL
while (!istr.eof() && null_prefix_len < len && null_literal[null_prefix_len] == *istr.position())
{
++null_prefix_len;
++istr.position();
}
if (null_prefix_len == len)
return true;
/// Value and "NULL" have common prefix, but value is not "NULL".
/// Restore previous buffer position if possible.
if (null_prefix_len <= istr.offset())
{
istr.position() -= null_prefix_len;
null_prefix_len = 0;
}
return false;
};
auto deserialize_nested = [this, &settings, &istr, &null_prefix_len] (IColumn & nested)
{
if (likely(!null_prefix_len))
nested_data_type->deserializeAsTextCSV(nested, istr, settings);
else
{
/// Previous buffer position was not restored,
/// so we need to prepend extracted characters (rare case)
ReadBufferFromMemory prepend(null_literal, null_prefix_len);
ConcatReadBuffer buf(prepend, istr);
nested_data_type->deserializeAsTextCSV(nested, buf, settings);
/// Check if all extracted characters were read by nested parser and update buffer position
if (null_prefix_len < buf.count())
istr.position() = buf.position();
else if (null_prefix_len > buf.count())
{
/// It can happen only if there is an unquoted string instead of a number
/// or if someone uses 'U' or 'L' as delimiter in CSV.
/// In the first case we cannot continue reading anyway. The second case seems to be unlikely.
if (settings.csv.delimiter == 'U' || settings.csv.delimiter == 'L')
throw DB::Exception("Enabled setting input_format_csv_unquoted_null_literal_as_null may not work correctly "
"with format_csv_delimiter = 'U' or 'L' for large input.", ErrorCodes::CANNOT_READ_ALL_DATA);
WriteBufferFromOwnString parsed_value;
nested_data_type->serializeAsTextCSV(nested, nested.size() - 1, parsed_value, settings);
throw DB::Exception("Error while parsing \"" + std::string(null_literal, null_prefix_len)
+ std::string(istr.position(), std::min(size_t{10}, istr.available())) + "\" as " + getName()
+ " at position " + std::to_string(istr.count()) + ": expected \"NULL\" or " + nested_data_type->getName()
+ ", got \"" + std::string(null_literal, buf.count()) + "\", which was deserialized as \""
+ parsed_value.str() + "\". It seems that input data is ill-formatted.",
ErrorCodes::CANNOT_READ_ALL_DATA);
}
}
};
safeDeserialize(column, check_for_null, deserialize_nested);
}
void DataTypeNullable::serializeText(const IColumn & column, size_t row_num, WriteBuffer & ostr, const FormatSettings & settings) const

View File

@ -61,7 +61,8 @@ public:
* 1. \N
* 2. empty string (without quotes)
* 3. NULL
* Now we support only first.
* We support all of them (however, second variant is supported by CSVRowInputStream, not by deserializeTextCSV).
* (see also input_format_defaults_for_omitted_fields and input_format_csv_unquoted_null_literal_as_null settings)
* In CSV, non-NULL string value, starting with \N characters, must be placed in quotes, to avoid ambiguity.
*/
void deserializeTextCSV(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const override;

View File

@ -463,9 +463,8 @@ struct WhichDataType
{
TypeIndex idx;
/// For late initialization.
WhichDataType()
: idx(TypeIndex::Nothing)
WhichDataType(TypeIndex idx_ = TypeIndex::Nothing)
: idx(idx_)
{}
WhichDataType(const IDataType & data_type)

View File

@ -1,5 +1,6 @@
#include <Core/Defines.h>
#include <IO/ConcatReadBuffer.h>
#include <IO/ReadHelpers.h>
#include <IO/Operators.h>
@ -8,6 +9,8 @@
#include <Formats/FormatFactory.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
@ -96,6 +99,7 @@ CSVRowInputStream::CSVRowInputStream(ReadBuffer & istr_, const Block & header_,
data_types.resize(num_columns);
column_indexes_by_names.reserve(num_columns);
column_idx_to_nullable_column_idx.resize(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
@ -103,6 +107,16 @@ CSVRowInputStream::CSVRowInputStream(ReadBuffer & istr_, const Block & header_,
data_types[i] = column_info.type;
column_indexes_by_names.emplace(column_info.name, i);
/// If input_format_null_as_default=1 we need ColumnNullable of type DataTypeNullable(nested_type)
/// to parse value as nullable before inserting it in corresponding column of not-nullable type.
/// Constructing temporary column for each row is slow, so we prepare it here
if (format_settings.csv.null_as_default && !column_info.type->isNullable() && column_info.type->canBeInsideNullable())
{
column_idx_to_nullable_column_idx[i] = nullable_columns.size();
nullable_types.emplace_back(std::make_shared<DataTypeNullable>(column_info.type));
nullable_columns.emplace_back(nullable_types.back()->createColumn());
}
}
}
@ -210,38 +224,16 @@ bool CSVRowInputStream::read(MutableColumns & columns, RowReadExtension & ext)
for (size_t file_column = 0; file_column < column_indexes_for_input_fields.size(); ++file_column)
{
const auto & table_column = column_indexes_for_input_fields[file_column];
const bool is_last_file_column =
file_column + 1 == column_indexes_for_input_fields.size();
const bool is_last_file_column = file_column + 1 == column_indexes_for_input_fields.size();
if (table_column)
{
const auto & type = data_types[*table_column];
const bool at_delimiter = *istr.position() == delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (*istr.position() == '\n' || *istr.position() == '\r'
|| istr.eof());
if (format_settings.csv.empty_as_default
&& (at_delimiter || at_last_column_line_end))
{
/// Treat empty unquoted column value as default value, if
/// specified in the settings. Tuple columns might seem
/// problematic, because they are never quoted but still contain
/// commas, which might be also used as delimiters. However,
/// they do not contain empty unquoted fields, so this check
/// works for tuples as well.
read_columns[*table_column] = false;
skipWhitespacesAndTabs(istr);
read_columns[*table_column] = readField(*columns[*table_column], data_types[*table_column],
is_last_file_column, *table_column);
if (!read_columns[*table_column])
have_default_columns = true;
}
else
{
/// Read the column normally.
read_columns[*table_column] = true;
skipWhitespacesAndTabs(istr);
type->deserializeAsTextCSV(*columns[*table_column], istr,
format_settings);
skipWhitespacesAndTabs(istr);
}
skipWhitespacesAndTabs(istr);
}
else
{
@ -380,7 +372,7 @@ bool OPTIMIZE(1) CSVRowInputStream::parseRowAndPrintDiagnosticInfo(MutableColumn
{
skipWhitespacesAndTabs(istr);
prev_position = istr.position();
current_column_type->deserializeAsTextCSV(*columns[table_column], istr, format_settings);
readField(*columns[table_column], current_column_type, is_last_file_column, table_column);
curr_position = istr.position();
skipWhitespacesAndTabs(istr);
}
@ -520,6 +512,45 @@ void CSVRowInputStream::updateDiagnosticInfo()
pos_of_current_row = istr.position();
}
bool CSVRowInputStream::readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column, size_t column_idx)
{
const bool at_delimiter = *istr.position() == format_settings.csv.delimiter;
const bool at_last_column_line_end = is_last_file_column
&& (*istr.position() == '\n' || *istr.position() == '\r'
|| istr.eof());
if (format_settings.csv.empty_as_default
&& (at_delimiter || at_last_column_line_end))
{
/// Treat empty unquoted column value as default value, if
/// specified in the settings. Tuple columns might seem
/// problematic, because they are never quoted but still contain
/// commas, which might be also used as delimiters. However,
/// they do not contain empty unquoted fields, so this check
/// works for tuples as well.
return false;
}
else if (column_idx_to_nullable_column_idx[column_idx])
{
/// If value is null but type is not nullable then use default value instead.
const size_t nullable_idx = *column_idx_to_nullable_column_idx[column_idx];
auto & tmp_col = *nullable_columns[nullable_idx];
nullable_types[nullable_idx]->deserializeAsTextCSV(tmp_col, istr, format_settings);
Field value = tmp_col[0];
tmp_col.popBack(1); /// do not store copy of values in memory
if (value.isNull())
return false;
column.insert(value);
return true;
}
else
{
/// Read the column normally.
type->deserializeAsTextCSV(column, istr, format_settings);
return true;
}
}
void registerInputFormatCSV(FormatFactory & factory)
{

View File

@ -67,10 +67,17 @@ private:
char * pos_of_current_row = nullptr;
char * pos_of_prev_row = nullptr;
/// For setting input_format_null_as_default
DataTypes nullable_types;
MutableColumns nullable_columns;
OptionalIndexes column_idx_to_nullable_column_idx;
void updateDiagnosticInfo();
bool parseRowAndPrintDiagnosticInfo(MutableColumns & columns,
WriteBuffer & out, size_t max_length_of_column_name, size_t max_length_of_data_type_name);
bool readField(IColumn & column, const DataTypePtr & type, bool is_last_file_column, size_t column_idx);
};
}

View File

@ -1,8 +1,6 @@
#include "config_formats.h"
#if USE_CAPNP
#include "CapnProtoRowInputStream.h"
#if USE_CAPNP
#include <IO/ReadBuffer.h>
#include <Interpreters/Context.h>
#include <Formats/FormatFactory.h>

View File

@ -41,7 +41,9 @@ static FormatSettings getInputFormatSetting(const Settings & settings)
format_settings.csv.delimiter = settings.format_csv_delimiter;
format_settings.csv.allow_single_quotes = settings.format_csv_allow_single_quotes;
format_settings.csv.allow_double_quotes = settings.format_csv_allow_double_quotes;
format_settings.csv.unquoted_null_literal_as_null = settings.input_format_csv_unquoted_null_literal_as_null;
format_settings.csv.empty_as_default = settings.input_format_defaults_for_omitted_fields;
format_settings.csv.null_as_default = settings.input_format_null_as_default;
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
format_settings.with_names_use_header = settings.input_format_with_names_use_header;
format_settings.skip_unknown_fields = settings.input_format_skip_unknown_fields;

View File

@ -27,7 +27,9 @@ struct FormatSettings
char delimiter = ',';
bool allow_single_quotes = true;
bool allow_double_quotes = true;
bool unquoted_null_literal_as_null = false;
bool empty_as_default = false;
bool null_as_default = false;
};
CSV csv;

View File

@ -12,9 +12,9 @@ using namespace MySQLProtocol;
MySQLWireBlockOutputStream::MySQLWireBlockOutputStream(WriteBuffer & buf, const Block & header, Context & context)
: header(header)
, context(context)
, packet_sender(std::make_shared<PacketSender>(buf, context.mysql.sequence_id))
, packet_sender(buf, context.mysql.sequence_id)
{
packet_sender->max_packet_size = context.mysql.max_packet_size;
packet_sender.max_packet_size = context.mysql.max_packet_size;
}
void MySQLWireBlockOutputStream::writePrefix()
@ -22,17 +22,17 @@ void MySQLWireBlockOutputStream::writePrefix()
if (header.columns() == 0)
return;
packet_sender->sendPacket(LengthEncodedNumber(header.columns()));
packet_sender.sendPacket(LengthEncodedNumber(header.columns()));
for (const ColumnWithTypeAndName & column : header.getColumnsWithTypeAndName())
{
ColumnDefinition column_definition(column.name, CharacterSet::binary, 0, ColumnType::MYSQL_TYPE_STRING, 0, 0);
packet_sender->sendPacket(column_definition);
packet_sender.sendPacket(column_definition);
}
if (!(context.mysql.client_capabilities & Capability::CLIENT_DEPRECATE_EOF))
{
packet_sender->sendPacket(EOF_Packet(0, 0));
packet_sender.sendPacket(EOF_Packet(0, 0));
}
}
@ -49,35 +49,37 @@ void MySQLWireBlockOutputStream::write(const Block & block)
column.type->serializeAsText(*column.column.get(), i, ostr, format_settings);
row_packet.appendColumn(std::move(ostr.str()));
}
packet_sender->sendPacket(row_packet);
packet_sender.sendPacket(row_packet);
}
}
void MySQLWireBlockOutputStream::writeSuffix()
{
QueryStatus * process_list_elem = context.getProcessListElement();
CurrentThread::finalizePerformanceCounters();
QueryStatusInfo info = process_list_elem->getInfo();
size_t affected_rows = info.written_rows;
size_t affected_rows = 0;
std::stringstream human_readable_info;
human_readable_info << std::fixed << std::setprecision(3)
<< "Read " << info.read_rows << " rows, " << formatReadableSizeWithBinarySuffix(info.read_bytes) << " in " << info.elapsed_seconds << " sec., "
<< static_cast<size_t>(info.read_rows / info.elapsed_seconds) << " rows/sec., "
<< formatReadableSizeWithBinarySuffix(info.read_bytes / info.elapsed_seconds) << "/sec.";
if (QueryStatus * process_list_elem = context.getProcessListElement())
{
CurrentThread::finalizePerformanceCounters();
QueryStatusInfo info = process_list_elem->getInfo();
affected_rows = info.written_rows;
human_readable_info << std::fixed << std::setprecision(3)
<< "Read " << info.read_rows << " rows, " << formatReadableSizeWithBinarySuffix(info.read_bytes) << " in " << info.elapsed_seconds << " sec., "
<< static_cast<size_t>(info.read_rows / info.elapsed_seconds) << " rows/sec., "
<< formatReadableSizeWithBinarySuffix(info.read_bytes / info.elapsed_seconds) << "/sec.";
}
if (header.columns() == 0)
packet_sender->sendPacket(OK_Packet(0x0, context.mysql.client_capabilities, affected_rows, 0, 0, "", human_readable_info.str()), true);
packet_sender.sendPacket(OK_Packet(0x0, context.mysql.client_capabilities, affected_rows, 0, 0, "", human_readable_info.str()), true);
else
if (context.mysql.client_capabilities & CLIENT_DEPRECATE_EOF)
packet_sender->sendPacket(OK_Packet(0xfe, context.mysql.client_capabilities, affected_rows, 0, 0, "", human_readable_info.str()), true);
packet_sender.sendPacket(OK_Packet(0xfe, context.mysql.client_capabilities, affected_rows, 0, 0, "", human_readable_info.str()), true);
else
packet_sender->sendPacket(EOF_Packet(0, 0), true);
packet_sender.sendPacket(EOF_Packet(0, 0), true);
}
void MySQLWireBlockOutputStream::flush()
{
packet_sender->out->next();
packet_sender.out->next();
}
}

View File

@ -27,7 +27,7 @@ public:
private:
Block header;
Context & context;
std::shared_ptr<MySQLProtocol::PacketSender> packet_sender;
MySQLProtocol::PacketSender packet_sender;
FormatSettings format_settings;
};

View File

@ -1,7 +1,6 @@
#include "config_formats.h"
#if USE_PARQUET
# include "ParquetBlockInputStream.h"
#include "ParquetBlockInputStream.h"
#if USE_PARQUET
# include <algorithm>
# include <iterator>
# include <vector>

View File

@ -1,7 +1,6 @@
#include "config_formats.h"
#if USE_PARQUET
# include "ParquetBlockOutputStream.h"
#include "ParquetBlockOutputStream.h"
#if USE_PARQUET
// TODO: clean includes
# include <Columns/ColumnDecimal.h>
# include <Columns/ColumnFixedString.h>

View File

@ -1,8 +1,5 @@
#include "config_formats.h"
#if USE_PROTOBUF
#include "ProtobufColumnMatcher.h"
#if USE_PROTOBUF
#include <Common/Exception.h>
#include <google/protobuf/descriptor.h>
#include <google/protobuf/descriptor.pb.h>

View File

@ -9,7 +9,7 @@
#if USE_PROTOBUF
#include <boost/noncopyable.hpp>
#include <Formats/ProtobufColumnMatcher.h>
#include "ProtobufColumnMatcher.h"
#include <IO/ReadBuffer.h>
#include <memory>

View File

@ -1,8 +1,6 @@
#include "config_formats.h"
#if USE_PROTOBUF
#include "ProtobufRowInputStream.h"
#if USE_PROTOBUF
#include <Core/Block.h>
#include <Formats/BlockInputStreamFromRowInputStream.h>
#include <Formats/FormatFactory.h>

View File

@ -7,7 +7,7 @@
#include "config_formats.h"
#if USE_PROTOBUF
#include <Formats/ProtobufColumnMatcher.h>
#include "ProtobufColumnMatcher.h"
#include <IO/WriteBufferFromString.h>
#include <boost/noncopyable.hpp>
#include <Common/PODArray.h>

View File

@ -20,8 +20,10 @@ target_link_libraries(clickhouse_functions
${METROHASH_LIBRARIES}
murmurhash
${BASE64_LIBRARY}
${FASTOPS_LIBRARY}
PRIVATE
${ZLIB_LIBRARIES}
${Boost_FILESYSTEM_LIBRARY}
)
@ -30,7 +32,7 @@ if (OPENSSL_CRYPTO_LIBRARY)
endif()
target_include_directories(clickhouse_functions PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/include)
target_include_directories(clickhouse_functions SYSTEM BEFORE PUBLIC ${DIVIDE_INCLUDE_DIR} ${METROHASH_INCLUDE_DIR})
target_include_directories(clickhouse_functions SYSTEM PRIVATE ${DIVIDE_INCLUDE_DIR} ${METROHASH_INCLUDE_DIR})
if (CONSISTENT_HASHING_INCLUDE_DIR)
target_include_directories (clickhouse_functions PRIVATE ${CONSISTENT_HASHING_INCLUDE_DIR})
@ -47,7 +49,11 @@ if (USE_ICU)
endif ()
if (USE_VECTORCLASS)
target_include_directories (clickhouse_functions SYSTEM BEFORE PUBLIC ${VECTORCLASS_INCLUDE_DIR})
target_include_directories (clickhouse_functions SYSTEM PRIVATE ${VECTORCLASS_INCLUDE_DIR})
endif ()
if (USE_FASTOPS)
target_include_directories (clickhouse_functions SYSTEM PRIVATE ${FASTOPS_INCLUDE_DIR})
endif ()
if (ENABLE_TESTS)

View File

@ -31,6 +31,14 @@
#endif
/** FastOps is a fast vector math library from Michael Parakhin (former Yandex CTO),
* Enabled by default.
*/
#if USE_FASTOPS
#include <fastops/fastops.h>
#endif
namespace DB
{
@ -41,16 +49,14 @@ namespace ErrorCodes
template <typename Impl>
class FunctionMathUnaryFloat64 : public IFunction
class FunctionMathUnary : public IFunction
{
public:
static constexpr auto name = Impl::name;
static FunctionPtr create(const Context &) { return std::make_shared<FunctionMathUnaryFloat64>(); }
static_assert(Impl::rows_per_iteration > 0, "Impl must process at least one row per iteration");
static FunctionPtr create(const Context &) { return std::make_shared<FunctionMathUnary>(); }
private:
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 1; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
@ -60,38 +66,63 @@ private:
throw Exception{"Illegal type " + arg->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
return std::make_shared<DataTypeFloat64>();
/// Integers are converted to Float64.
if (Impl::always_returns_float64 || !isFloat(arg))
return std::make_shared<DataTypeFloat64>();
else
return arg;
}
template <typename T>
static void executeInIterations(const T * src_data, Float64 * dst_data, size_t size)
template <typename T, typename ReturnType>
static void executeInIterations(const T * src_data, ReturnType * dst_data, size_t size)
{
const size_t rows_remaining = size % Impl::rows_per_iteration;
const size_t rows_size = size - rows_remaining;
for (size_t i = 0; i < rows_size; i += Impl::rows_per_iteration)
Impl::execute(&src_data[i], &dst_data[i]);
if (rows_remaining != 0)
if constexpr (Impl::rows_per_iteration == 0)
{
T src_remaining[Impl::rows_per_iteration];
memcpy(src_remaining, &src_data[rows_size], rows_remaining * sizeof(T));
memset(src_remaining + rows_remaining, 0, (Impl::rows_per_iteration - rows_remaining) * sizeof(T));
Float64 dst_remaining[Impl::rows_per_iteration];
/// Process all data as a whole and use FastOps implementation
Impl::execute(src_remaining, dst_remaining);
/// If the argument is integer, convert to Float64 beforehand
if constexpr (!std::is_floating_point_v<T>)
{
PODArray<Float64> tmp_vec(size);
for (size_t i = 0; i < size; ++i)
tmp_vec[i] = src_data[i];
memcpy(&dst_data[rows_size], dst_remaining, rows_remaining * sizeof(Float64));
Impl::execute(tmp_vec.data(), size, dst_data);
}
else
{
Impl::execute(src_data, size, dst_data);
}
}
else
{
const size_t rows_remaining = size % Impl::rows_per_iteration;
const size_t rows_size = size - rows_remaining;
for (size_t i = 0; i < rows_size; i += Impl::rows_per_iteration)
Impl::execute(&src_data[i], &dst_data[i]);
if (rows_remaining != 0)
{
T src_remaining[Impl::rows_per_iteration];
memcpy(src_remaining, &src_data[rows_size], rows_remaining * sizeof(T));
memset(src_remaining + rows_remaining, 0, (Impl::rows_per_iteration - rows_remaining) * sizeof(T));
ReturnType dst_remaining[Impl::rows_per_iteration];
Impl::execute(src_remaining, dst_remaining);
memcpy(&dst_data[rows_size], dst_remaining, rows_remaining * sizeof(ReturnType));
}
}
}
template <typename T>
template <typename T, typename ReturnType>
static bool execute(Block & block, const ColumnVector<T> * col, const size_t result)
{
const auto & src_data = col->getData();
const size_t size = src_data.size();
auto dst = ColumnVector<Float64>::create();
auto dst = ColumnVector<ReturnType>::create();
auto & dst_data = dst->getData();
dst_data.resize(size);
@ -101,19 +132,19 @@ private:
return true;
}
template <typename T>
template <typename T, typename ReturnType>
static bool execute(Block & block, const ColumnDecimal<T> * col, const size_t result)
{
const auto & src_data = col->getData();
const size_t size = src_data.size();
UInt32 scale = src_data.getScale();
auto dst = ColumnVector<Float64>::create();
auto dst = ColumnVector<ReturnType>::create();
auto & dst_data = dst->getData();
dst_data.resize(size);
for (size_t i = 0; i < size; ++i)
dst_data[i] = convertFromDecimal<DataTypeDecimal<T>, DataTypeNumber<Float64>>(src_data[i], scale);
dst_data[i] = convertFromDecimal<DataTypeDecimal<T>, DataTypeNumber<ReturnType>>(src_data[i], scale);
executeInIterations(dst_data.data(), dst_data.data(), size);
@ -131,10 +162,11 @@ private:
{
using Types = std::decay_t<decltype(types)>;
using Type = typename Types::RightType;
using ReturnType = std::conditional_t<Impl::always_returns_float64 || !std::is_floating_point_v<Type>, Float64, Type>;
using ColVecType = std::conditional_t<IsDecimalNumber<Type>, ColumnDecimal<Type>, ColumnVector<Type>>;
const auto col_vec = checkAndGetColumn<ColVecType>(col.column.get());
return execute<Type>(block, col_vec, result);
return execute<Type, ReturnType>(block, col_vec, result);
};
if (!callOnBasicType<void, true, true, true, false>(col.type->getTypeId(), call))
@ -149,6 +181,7 @@ struct UnaryFunctionPlain
{
static constexpr auto name = Name::name;
static constexpr auto rows_per_iteration = 1;
static constexpr bool always_returns_float64 = true;
template <typename T>
static void execute(const T * src, Float64 * dst)
@ -164,6 +197,7 @@ struct UnaryFunctionVectorized
{
static constexpr auto name = Name::name;
static constexpr auto rows_per_iteration = 2;
static constexpr bool always_returns_float64 = true;
template <typename T>
static void execute(const T * src, Float64 * dst)

Some files were not shown because too many files have changed in this diff Show More