Merge branch 'master' into reorganiza-contrib-ide-folders

This commit is contained in:
Denis Glazachev 2021-10-17 22:13:11 +04:00 committed by GitHub
commit 68a08ffed9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
446 changed files with 4325 additions and 3296 deletions

4
.gitmodules vendored
View File

@ -213,6 +213,7 @@
[submodule "contrib/boringssl"]
path = contrib/boringssl
url = https://github.com/ClickHouse-Extras/boringssl.git
branch = MergeWithUpstream
[submodule "contrib/NuRaft"]
path = contrib/NuRaft
url = https://github.com/ClickHouse-Extras/NuRaft.git
@ -249,6 +250,9 @@
[submodule "contrib/magic_enum"]
path = contrib/magic_enum
url = https://github.com/Neargye/magic_enum
[submodule "contrib/libprotobuf-mutator"]
path = contrib/libprotobuf-mutator
url = https://github.com/google/libprotobuf-mutator
[submodule "contrib/sysroot"]
path = contrib/sysroot
url = https://github.com/ClickHouse-Extras/sysroot.git

View File

@ -136,6 +136,21 @@ if (ENABLE_FUZZING)
message (STATUS "Fuzzing instrumentation enabled")
set (FUZZER "libfuzzer")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -nostdlib++")
set (ENABLE_CLICKHOUSE_ODBC_BRIDGE OFF)
set (ENABLE_LIBRARIES 0)
set (ENABLE_SSL 1)
set (USE_INTERNAL_SSL_LIBRARY 1)
set (USE_UNWIND ON)
set (ENABLE_EMBEDDED_COMPILER 0)
set (ENABLE_EXAMPLES 0)
set (ENABLE_UTILS 0)
set (ENABLE_THINLTO 0)
set (ENABLE_TCMALLOC 0)
set (ENABLE_JEMALLOC 0)
set (ENABLE_CHECK_HEAVY_BUILDS 1)
set (GLIBC_COMPATIBILITY OFF)
set (ENABLE_PROTOBUF ON)
set (USE_INTERNAL_PROTOBUF_LIBRARY ON)
endif()
# Global libraries
@ -188,7 +203,7 @@ endif ()
option(ENABLE_TESTS "Provide unit_test_dbms target with Google.Test unit tests" ON)
option(ENABLE_EXAMPLES "Build all example programs in 'examples' subdirectories" OFF)
if (OS_LINUX AND (ARCH_AMD64 OR ARCH_AARCH64) AND NOT UNBUNDLED AND MAKE_STATIC_LIBRARIES AND NOT SPLIT_SHARED_LIBRARIES AND CMAKE_VERSION VERSION_GREATER "3.9.0")
if (OS_LINUX AND (ARCH_AMD64 OR ARCH_AARCH64) AND NOT UNBUNDLED AND MAKE_STATIC_LIBRARIES AND NOT SPLIT_SHARED_LIBRARIES AND NOT USE_MUSL)
# Only for Linux, x86_64 or aarch64.
option(GLIBC_COMPATIBILITY "Enable compatibility with older glibc libraries." ON)
elseif(GLIBC_COMPATIBILITY)
@ -203,10 +218,6 @@ if (GLIBC_COMPATIBILITY)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -include ${CMAKE_CURRENT_SOURCE_DIR}/base/glibc-compatibility/glibc-compat-2.32.h")
endif()
if (NOT CMAKE_VERSION VERSION_GREATER "3.9.0")
message (WARNING "CMake version must be greater than 3.9.0 for production builds.")
endif ()
# Make sure the final executable has symbols exported
set (CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} -rdynamic")
@ -582,6 +593,7 @@ include (cmake/find/cassandra.cmake)
include (cmake/find/sentry.cmake)
include (cmake/find/stats.cmake)
include (cmake/find/datasketches.cmake)
include (cmake/find/libprotobuf-mutator.cmake)
set (USE_INTERNAL_CITYHASH_LIBRARY ON CACHE INTERNAL "")
find_contrib_lib(cityhash)

View File

@ -5,6 +5,10 @@
#include <string.h>
#include <unistd.h>
#include <sys/select.h>
#include <sys/time.h>
#include <sys/types.h>
#ifdef OS_LINUX
/// We can detect if code is linked with one or another readline variants or open the library dynamically.

View File

@ -6,7 +6,7 @@
#include <base/defines.h>
#if defined(__linux__) && !defined(THREAD_SANITIZER)
#if defined(__linux__) && !defined(THREAD_SANITIZER) && !defined(USE_MUSL)
#define USE_PHDR_CACHE 1
#endif

View File

@ -84,7 +84,7 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log
Poco::AutoPtr<DB::OwnFormattingChannel> log = new DB::OwnFormattingChannel(pf, log_file);
log->setLevel(log_level);
split->addChannel(log);
split->addChannel(log, "log");
}
const auto errorlog_path = config.getString("logger.errorlog", "");
@ -116,7 +116,7 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log
Poco::AutoPtr<DB::OwnFormattingChannel> errorlog = new DB::OwnFormattingChannel(pf, error_log_file);
errorlog->setLevel(errorlog_level);
errorlog->open();
split->addChannel(errorlog);
split->addChannel(errorlog, "errorlog");
}
if (config.getBool("logger.use_syslog", false))
@ -155,7 +155,7 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log
Poco::AutoPtr<DB::OwnFormattingChannel> log = new DB::OwnFormattingChannel(pf, syslog_channel);
log->setLevel(syslog_level);
split->addChannel(log);
split->addChannel(log, "syslog");
}
bool should_log_to_console = isatty(STDIN_FILENO) || isatty(STDERR_FILENO);
@ -177,7 +177,7 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log
Poco::AutoPtr<DB::OwnFormattingChannel> log = new DB::OwnFormattingChannel(pf, new Poco::ConsoleChannel);
logger.warning("Logging " + console_log_level_string + " to console");
log->setLevel(console_log_level);
split->addChannel(log);
split->addChannel(log, "console");
}
split->open();
@ -224,6 +224,89 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log
}
}
void Loggers::updateLevels(Poco::Util::AbstractConfiguration & config, Poco::Logger & logger)
{
int max_log_level = 0;
const auto log_level_string = config.getString("logger.level", "trace");
int log_level = Poco::Logger::parseLevel(log_level_string);
if (log_level > max_log_level)
max_log_level = log_level;
const auto log_path = config.getString("logger.log", "");
if (!log_path.empty())
split->setLevel("log", log_level);
else
split->setLevel("log", 0);
// Set level to console
bool is_daemon = config.getBool("application.runAsDaemon", false);
bool should_log_to_console = isatty(STDIN_FILENO) || isatty(STDERR_FILENO);
if (config.getBool("logger.console", false)
|| (!config.hasProperty("logger.console") && !is_daemon && should_log_to_console))
split->setLevel("console", log_level);
else
split->setLevel("console", 0);
// Set level to errorlog
int errorlog_level = 0;
const auto errorlog_path = config.getString("logger.errorlog", "");
if (!errorlog_path.empty())
{
errorlog_level = Poco::Logger::parseLevel(config.getString("logger.errorlog_level", "notice"));
if (errorlog_level > max_log_level)
max_log_level = errorlog_level;
}
split->setLevel("errorlog", errorlog_level);
// Set level to syslog
int syslog_level = 0;
if (config.getBool("logger.use_syslog", false))
{
syslog_level = Poco::Logger::parseLevel(config.getString("logger.syslog_level", log_level_string));
if (syslog_level > max_log_level)
max_log_level = syslog_level;
}
split->setLevel("syslog", syslog_level);
// Global logging level (it can be overridden for specific loggers).
logger.setLevel(max_log_level);
// Set level to all already created loggers
std::vector<std::string> names;
logger.root().names(names);
for (const auto & name : names)
logger.root().get(name).setLevel(max_log_level);
logger.root().setLevel(max_log_level);
// Explicitly specified log levels for specific loggers.
{
Poco::Util::AbstractConfiguration::Keys loggers_level;
config.keys("logger.levels", loggers_level);
if (!loggers_level.empty())
{
for (const auto & key : loggers_level)
{
if (key == "logger" || key.starts_with("logger["))
{
const std::string name(config.getString("logger.levels." + key + ".name"));
const std::string level(config.getString("logger.levels." + key + ".level"));
logger.root().get(name).setLevel(level);
}
else
{
// Legacy syntax
const std::string level(config.getString("logger.levels." + key, "trace"));
logger.root().get(key).setLevel(level);
}
}
}
}
}
void Loggers::closeLogs(Poco::Logger & logger)
{
if (log_file)

View File

@ -19,6 +19,8 @@ class Loggers
public:
void buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Logger & logger, const std::string & cmd_name = "");
void updateLevels(Poco::Util::AbstractConfiguration & config, Poco::Logger & logger);
/// Close log files. On next log write files will be reopened.
void closeLogs(Poco::Logger & logger);

View File

@ -1,4 +1,5 @@
#pragma once
#include <atomic>
#include <Poco/AutoPtr.h>
#include <Poco/Channel.h>
#include <Poco/FormattingChannel.h>
@ -14,7 +15,7 @@ class OwnFormattingChannel : public Poco::Channel, public ExtendedLogChannel
public:
explicit OwnFormattingChannel(
Poco::AutoPtr<OwnPatternFormatter> pFormatter_ = nullptr, Poco::AutoPtr<Poco::Channel> pChannel_ = nullptr)
: pFormatter(std::move(pFormatter_)), pChannel(std::move(pChannel_))
: pFormatter(std::move(pFormatter_)), pChannel(std::move(pChannel_)), priority(Poco::Message::PRIO_TRACE)
{
}
@ -45,7 +46,7 @@ public:
private:
Poco::AutoPtr<OwnPatternFormatter> pFormatter;
Poco::AutoPtr<Poco::Channel> pChannel;
Poco::Message::Priority priority = Poco::Message::PRIO_TRACE;
std::atomic<Poco::Message::Priority> priority;
};
}

View File

@ -1,4 +1,5 @@
#include "OwnSplitChannel.h"
#include "OwnFormattingChannel.h"
#include <iostream>
#include <Core/Block.h>
@ -75,7 +76,7 @@ void OwnSplitChannel::logSplit(const Poco::Message & msg)
ExtendedLogMessage msg_ext = ExtendedLogMessage::getFrom(msg);
/// Log data to child channels
for (auto & channel : channels)
for (auto & [name, channel] : channels)
{
if (channel.second)
channel.second->logExtended(msg_ext); // extended child
@ -137,9 +138,9 @@ void OwnSplitChannel::logSplit(const Poco::Message & msg)
}
void OwnSplitChannel::addChannel(Poco::AutoPtr<Poco::Channel> channel)
void OwnSplitChannel::addChannel(Poco::AutoPtr<Poco::Channel> channel, const std::string & name)
{
channels.emplace_back(std::move(channel), dynamic_cast<ExtendedLogChannel *>(channel.get()));
channels.emplace(name, ExtendedChannelPtrPair(std::move(channel), dynamic_cast<ExtendedLogChannel *>(channel.get())));
}
void OwnSplitChannel::addTextLog(std::shared_ptr<DB::TextLog> log, int max_priority)
@ -149,4 +150,14 @@ void OwnSplitChannel::addTextLog(std::shared_ptr<DB::TextLog> log, int max_prior
text_log_max_priority.store(max_priority, std::memory_order_relaxed);
}
void OwnSplitChannel::setLevel(const std::string & name, int level)
{
auto it = channels.find(name);
if (it != channels.end())
{
if (auto * channel = dynamic_cast<DB::OwnFormattingChannel *>(it->second.first.get()))
channel->setLevel(level);
}
}
}

View File

@ -18,10 +18,12 @@ public:
/// Makes an extended message from msg and passes it to the client logs queue and child (if possible)
void log(const Poco::Message & msg) override;
/// Adds a child channel
void addChannel(Poco::AutoPtr<Poco::Channel> channel);
void addChannel(Poco::AutoPtr<Poco::Channel> channel, const std::string & name);
void addTextLog(std::shared_ptr<DB::TextLog> log, int max_priority);
void setLevel(const std::string & name, int level);
private:
void logSplit(const Poco::Message & msg);
void tryLogSplit(const Poco::Message & msg);
@ -29,7 +31,7 @@ private:
using ChannelPtr = Poco::AutoPtr<Poco::Channel>;
/// Handler and its pointer casted to extended interface
using ExtendedChannelPtrPair = std::pair<ChannelPtr, ExtendedLogChannel *>;
std::vector<ExtendedChannelPtrPair> channels;
std::map<std::string, ExtendedChannelPtrPair> channels;
std::mutex text_log_mutex;

View File

@ -0,0 +1,11 @@
option(USE_LIBPROTOBUF_MUTATOR "Enable libprotobuf-mutator" ${ENABLE_FUZZING})
if (NOT USE_LIBPROTOBUF_MUTATOR)
return()
endif()
set(LibProtobufMutator_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/libprotobuf-mutator")
if (NOT EXISTS "${LibProtobufMutator_SOURCE_DIR}/README.md")
message (ERROR "submodule contrib/libprotobuf-mutator is missing. to fix try run: \n git submodule update --init --recursive")
endif()

View File

@ -14,6 +14,8 @@ endif ()
if (OS_ANDROID)
# pthread and rt are included in libc
set (DEFAULT_LIBS "${DEFAULT_LIBS} ${BUILTINS_LIBRARY} ${COVERAGE_OPTION} -lc -lm -ldl")
elseif (USE_MUSL)
set (DEFAULT_LIBS "${DEFAULT_LIBS} ${BUILTINS_LIBRARY} ${COVERAGE_OPTION} -static -lc")
else ()
set (DEFAULT_LIBS "${DEFAULT_LIBS} ${BUILTINS_LIBRARY} ${COVERAGE_OPTION} -lc -lm -lrt -lpthread -ldl")
endif ()
@ -26,7 +28,7 @@ set(CMAKE_C_STANDARD_LIBRARIES ${DEFAULT_LIBS})
# glibc-compatibility library relies to constant version of libc headers
# (because minor changes in function attributes between different glibc versions will introduce incompatibilities)
# This is for x86_64. For other architectures we have separate toolchains.
if (ARCH_AMD64 AND NOT_UNBUNDLED)
if (ARCH_AMD64 AND NOT_UNBUNDLED AND NOT CMAKE_CROSSCOMPILING)
set(CMAKE_C_STANDARD_INCLUDE_DIRECTORIES ${ClickHouse_SOURCE_DIR}/contrib/libc-headers/x86_64-linux-gnu ${ClickHouse_SOURCE_DIR}/contrib/libc-headers)
set(CMAKE_CXX_STANDARD_INCLUDE_DIRECTORIES ${ClickHouse_SOURCE_DIR}/contrib/libc-headers/x86_64-linux-gnu ${ClickHouse_SOURCE_DIR}/contrib/libc-headers)
endif ()
@ -37,8 +39,10 @@ set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads REQUIRED)
if (NOT OS_ANDROID)
if (NOT USE_MUSL)
# Our compatibility layer doesn't build under Android, many errors in musl.
add_subdirectory(base/glibc-compatibility)
endif ()
add_subdirectory(base/harmful)
endif ()

View File

@ -0,0 +1,32 @@
set (CMAKE_TRY_COMPILE_TARGET_TYPE STATIC_LIBRARY)
set (CMAKE_SYSTEM_NAME "Linux")
set (CMAKE_SYSTEM_PROCESSOR "riscv64")
set (CMAKE_C_COMPILER_TARGET "riscv64-linux-gnu")
set (CMAKE_CXX_COMPILER_TARGET "riscv64-linux-gnu")
set (CMAKE_ASM_COMPILER_TARGET "riscv64-linux-gnu")
set (TOOLCHAIN_PATH "${CMAKE_CURRENT_LIST_DIR}/../../contrib/sysroot/linux-riscv64")
set (CMAKE_SYSROOT "${TOOLCHAIN_PATH}")
find_program (LLVM_AR_PATH NAMES "llvm-ar" "llvm-ar-13" "llvm-ar-12" "llvm-ar-11" "llvm-ar-10" "llvm-ar-9" "llvm-ar-8")
find_program (LLVM_RANLIB_PATH NAMES "llvm-ranlib" "llvm-ranlib-13" "llvm-ranlib-12" "llvm-ranlib-11" "llvm-ranlib-10" "llvm-ranlib-9")
set (CMAKE_AR "${LLVM_AR_PATH}" CACHE FILEPATH "" FORCE)
set (CMAKE_RANLIB "${LLVM_RANLIB_PATH}" CACHE FILEPATH "" FORCE)
set (CMAKE_C_FLAGS_INIT "${CMAKE_C_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_CXX_FLAGS_INIT "${CMAKE_CXX_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_ASM_FLAGS_INIT "${CMAKE_ASM_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (LINKER_NAME "ld.lld" CACHE STRING "" FORCE)
set (CMAKE_EXE_LINKER_FLAGS_INIT "-fuse-ld=lld")
set (CMAKE_SHARED_LINKER_FLAGS_INIT "-fuse-ld=lld")
set (HAS_PRE_1970_EXITCODE "0" CACHE STRING "Result from TRY_RUN" FORCE)
set (HAS_PRE_1970_EXITCODE__TRYRUN_OUTPUT "" CACHE STRING "Output from TRY_RUN" FORCE)
set (HAS_POST_2038_EXITCODE "0" CACHE STRING "Result from TRY_RUN" FORCE)
set (HAS_POST_2038_EXITCODE__TRYRUN_OUTPUT "" CACHE STRING "Output from TRY_RUN" FORCE)

View File

@ -0,0 +1,35 @@
set (CMAKE_TRY_COMPILE_TARGET_TYPE STATIC_LIBRARY)
set (CMAKE_SYSTEM_NAME "Linux")
set (CMAKE_SYSTEM_PROCESSOR "x86_64")
set (CMAKE_C_COMPILER_TARGET "x86_64-linux-musl")
set (CMAKE_CXX_COMPILER_TARGET "x86_64-linux-musl")
set (CMAKE_ASM_COMPILER_TARGET "x86_64-linux-musl")
set (TOOLCHAIN_PATH "${CMAKE_CURRENT_LIST_DIR}/../../contrib/sysroot/linux-x86_64-musl")
set (CMAKE_SYSROOT "${TOOLCHAIN_PATH}")
find_program (LLVM_AR_PATH NAMES "llvm-ar" "llvm-ar-13" "llvm-ar-12" "llvm-ar-11" "llvm-ar-10" "llvm-ar-9" "llvm-ar-8")
find_program (LLVM_RANLIB_PATH NAMES "llvm-ranlib" "llvm-ranlib-13" "llvm-ranlib-12" "llvm-ranlib-11" "llvm-ranlib-10" "llvm-ranlib-9")
set (CMAKE_AR "${LLVM_AR_PATH}" CACHE FILEPATH "" FORCE)
set (CMAKE_RANLIB "${LLVM_RANLIB_PATH}" CACHE FILEPATH "" FORCE)
set (CMAKE_C_FLAGS_INIT "${CMAKE_C_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_CXX_FLAGS_INIT "${CMAKE_CXX_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (CMAKE_ASM_FLAGS_INIT "${CMAKE_ASM_FLAGS} --gcc-toolchain=${TOOLCHAIN_PATH}")
set (LINKER_NAME "ld.lld" CACHE STRING "" FORCE)
set (CMAKE_EXE_LINKER_FLAGS_INIT "-fuse-ld=lld")
set (CMAKE_SHARED_LINKER_FLAGS_INIT "-fuse-ld=lld")
set (HAS_PRE_1970_EXITCODE "0" CACHE STRING "Result from TRY_RUN" FORCE)
set (HAS_PRE_1970_EXITCODE__TRYRUN_OUTPUT "" CACHE STRING "Output from TRY_RUN" FORCE)
set (HAS_POST_2038_EXITCODE "0" CACHE STRING "Result from TRY_RUN" FORCE)
set (HAS_POST_2038_EXITCODE__TRYRUN_OUTPUT "" CACHE STRING "Output from TRY_RUN" FORCE)
set (USE_MUSL 1)
add_definitions(-DUSE_MUSL=1)

View File

@ -21,6 +21,15 @@ endif()
set_property(DIRECTORY PROPERTY EXCLUDE_FROM_ALL 1)
if (USE_INTERNAL_LIBCXX_LIBRARY AND NOT MISSING_INTERNAL_LIBCXX_LIBRARY)
add_subdirectory(libcxxabi-cmake)
add_subdirectory(libcxx-cmake)
endif ()
if (USE_UNWIND)
add_subdirectory(libunwind-cmake)
endif ()
add_subdirectory (abseil-cpp-cmake)
add_subdirectory (magic-enum-cmake)
add_subdirectory (boost-cmake)
@ -38,19 +47,14 @@ add_subdirectory (replxx-cmake)
add_subdirectory (unixodbc-cmake)
add_subdirectory (nanodbc-cmake)
if (USE_INTERNAL_LIBCXX_LIBRARY AND NOT MISSING_INTERNAL_LIBCXX_LIBRARY)
add_subdirectory(libcxxabi-cmake)
add_subdirectory(libcxx-cmake)
endif ()
if (USE_UNWIND)
add_subdirectory(libunwind-cmake)
endif ()
if (USE_INTERNAL_CAPNP_LIBRARY AND NOT MISSING_INTERNAL_CAPNP_LIBRARY)
add_subdirectory(capnproto-cmake)
endif ()
if (ENABLE_FUZZING)
add_subdirectory (libprotobuf-mutator-cmake)
endif()
if (USE_YAML_CPP)
add_subdirectory (yaml-cpp-cmake)
endif()

2
contrib/boringssl vendored

@ -1 +1 @@
Subproject commit a6a2e2ab3e44d97ce98e51c558e989f211de7eb3
Subproject commit c1e01a441d6db234f4f12e63a7657d1f9e6db9c1

View File

@ -4,7 +4,7 @@
# This file is created by generate_build_files.py and edited accordingly.
cmake_minimum_required(VERSION 3.0)
cmake_minimum_required(VERSION 3.5)
project(BoringSSL LANGUAGES C CXX)
@ -20,12 +20,7 @@ if(CMAKE_COMPILER_IS_GNUCXX OR CLANG)
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
endif()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fno-common")
if((CMAKE_C_COMPILER_VERSION VERSION_GREATER "4.8.99") OR CLANG)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c11")
else()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -std=c99")
endif()
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -fno-common -std=c11")
endif()
# pthread_rwlock_t requires a feature flag.
@ -55,7 +50,7 @@ add_definitions(-DBORINGSSL_IMPLEMENTATION)
# builds.
if(NOT OPENSSL_NO_ASM AND CMAKE_OSX_ARCHITECTURES)
list(LENGTH CMAKE_OSX_ARCHITECTURES NUM_ARCHES)
if(NOT ${NUM_ARCHES} EQUAL 1)
if(NOT NUM_ARCHES EQUAL 1)
message(FATAL_ERROR "Universal binaries not supported.")
endif()
list(GET CMAKE_OSX_ARCHITECTURES 0 CMAKE_SYSTEM_PROCESSOR)
@ -78,7 +73,13 @@ elseif(${CMAKE_SYSTEM_PROCESSOR} STREQUAL "AMD64")
elseif(${CMAKE_SYSTEM_PROCESSOR} STREQUAL "x86")
set(ARCH "x86")
elseif(${CMAKE_SYSTEM_PROCESSOR} STREQUAL "i386")
# cmake uses `uname -p` to set the system processor, but Solaris
# systems support multiple architectures.
if((${CMAKE_SYSTEM_NAME} STREQUAL "SunOS") AND CMAKE_SIZEOF_VOID_P EQUAL 8)
set(ARCH "x86_64")
else()
set(ARCH "x86")
endif()
elseif(${CMAKE_SYSTEM_PROCESSOR} STREQUAL "i686")
set(ARCH "x86")
elseif(${CMAKE_SYSTEM_PROCESSOR} STREQUAL "aarch64")
@ -289,6 +290,21 @@ set(
mac-x86_64/crypto/test/trampoline-x86_64.S
)
set(
CRYPTO_win_aarch64_SOURCES
win-aarch64/crypto/chacha/chacha-armv8.S
win-aarch64/crypto/fipsmodule/aesv8-armx64.S
win-aarch64/crypto/fipsmodule/armv8-mont.S
win-aarch64/crypto/fipsmodule/ghash-neon-armv8.S
win-aarch64/crypto/fipsmodule/ghashv8-armx64.S
win-aarch64/crypto/fipsmodule/sha1-armv8.S
win-aarch64/crypto/fipsmodule/sha256-armv8.S
win-aarch64/crypto/fipsmodule/sha512-armv8.S
win-aarch64/crypto/fipsmodule/vpaes-armv8.S
win-aarch64/crypto/test/trampoline-armv8.S
)
set(
CRYPTO_win_x86_SOURCES
@ -331,9 +347,9 @@ set(
win-x86_64/crypto/test/trampoline-x86_64.asm
)
if(APPLE AND ${ARCH} STREQUAL "aarch64")
if(APPLE AND ARCH STREQUAL "aarch64")
set(CRYPTO_ARCH_SOURCES ${CRYPTO_ios_aarch64_SOURCES})
elseif(APPLE AND ${ARCH} STREQUAL "arm")
elseif(APPLE AND ARCH STREQUAL "arm")
set(CRYPTO_ARCH_SOURCES ${CRYPTO_ios_arm_SOURCES})
elseif(APPLE)
set(CRYPTO_ARCH_SOURCES ${CRYPTO_mac_${ARCH}_SOURCES})
@ -360,6 +376,7 @@ add_library(
"${BORINGSSL_SOURCE_DIR}/crypto/asn1/a_object.c"
"${BORINGSSL_SOURCE_DIR}/crypto/asn1/a_octet.c"
"${BORINGSSL_SOURCE_DIR}/crypto/asn1/a_print.c"
"${BORINGSSL_SOURCE_DIR}/crypto/asn1/a_strex.c"
"${BORINGSSL_SOURCE_DIR}/crypto/asn1/a_strnid.c"
"${BORINGSSL_SOURCE_DIR}/crypto/asn1/a_time.c"
"${BORINGSSL_SOURCE_DIR}/crypto/asn1/a_type.c"
@ -389,6 +406,7 @@ add_library(
"${BORINGSSL_SOURCE_DIR}/crypto/bio/printf.c"
"${BORINGSSL_SOURCE_DIR}/crypto/bio/socket.c"
"${BORINGSSL_SOURCE_DIR}/crypto/bio/socket_helper.c"
"${BORINGSSL_SOURCE_DIR}/crypto/blake2/blake2.c"
"${BORINGSSL_SOURCE_DIR}/crypto/bn_extra/bn_asn1.c"
"${BORINGSSL_SOURCE_DIR}/crypto/bn_extra/convert.c"
"${BORINGSSL_SOURCE_DIR}/crypto/buf/buf.c"
@ -413,6 +431,7 @@ add_library(
"${BORINGSSL_SOURCE_DIR}/crypto/conf/conf.c"
"${BORINGSSL_SOURCE_DIR}/crypto/cpu-aarch64-fuchsia.c"
"${BORINGSSL_SOURCE_DIR}/crypto/cpu-aarch64-linux.c"
"${BORINGSSL_SOURCE_DIR}/crypto/cpu-aarch64-win.c"
"${BORINGSSL_SOURCE_DIR}/crypto/cpu-arm-linux.c"
"${BORINGSSL_SOURCE_DIR}/crypto/cpu-arm.c"
"${BORINGSSL_SOURCE_DIR}/crypto/cpu-intel.c"
@ -452,7 +471,6 @@ add_library(
"${BORINGSSL_SOURCE_DIR}/crypto/ex_data.c"
"${BORINGSSL_SOURCE_DIR}/crypto/fipsmodule/bcm.c"
"${BORINGSSL_SOURCE_DIR}/crypto/fipsmodule/fips_shared_support.c"
"${BORINGSSL_SOURCE_DIR}/crypto/fipsmodule/is_fips.c"
"${BORINGSSL_SOURCE_DIR}/crypto/hkdf/hkdf.c"
"${BORINGSSL_SOURCE_DIR}/crypto/hpke/hpke.c"
"${BORINGSSL_SOURCE_DIR}/crypto/hrss/hrss.c"
@ -499,13 +517,13 @@ add_library(
"${BORINGSSL_SOURCE_DIR}/crypto/trust_token/voprf.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/a_digest.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/a_sign.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/a_strex.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/a_verify.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/algorithm.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/asn1_gen.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/by_dir.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/by_file.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/i2d_pr.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/name_print.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/rsa_pss.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/t_crl.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/t_req.c"
@ -519,7 +537,6 @@ add_library(
"${BORINGSSL_SOURCE_DIR}/crypto/x509/x509_ext.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/x509_lu.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/x509_obj.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/x509_r2x.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/x509_req.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/x509_set.c"
"${BORINGSSL_SOURCE_DIR}/crypto/x509/x509_trs.c"
@ -589,6 +606,8 @@ add_library(
"${BORINGSSL_SOURCE_DIR}/ssl/d1_srtp.cc"
"${BORINGSSL_SOURCE_DIR}/ssl/dtls_method.cc"
"${BORINGSSL_SOURCE_DIR}/ssl/dtls_record.cc"
"${BORINGSSL_SOURCE_DIR}/ssl/encrypted_client_hello.cc"
"${BORINGSSL_SOURCE_DIR}/ssl/extensions.cc"
"${BORINGSSL_SOURCE_DIR}/ssl/handoff.cc"
"${BORINGSSL_SOURCE_DIR}/ssl/handshake.cc"
"${BORINGSSL_SOURCE_DIR}/ssl/handshake_client.cc"
@ -611,7 +630,6 @@ add_library(
"${BORINGSSL_SOURCE_DIR}/ssl/ssl_versions.cc"
"${BORINGSSL_SOURCE_DIR}/ssl/ssl_x509.cc"
"${BORINGSSL_SOURCE_DIR}/ssl/t1_enc.cc"
"${BORINGSSL_SOURCE_DIR}/ssl/t1_lib.cc"
"${BORINGSSL_SOURCE_DIR}/ssl/tls13_both.cc"
"${BORINGSSL_SOURCE_DIR}/ssl/tls13_client.cc"
"${BORINGSSL_SOURCE_DIR}/ssl/tls13_enc.cc"
@ -633,6 +651,7 @@ add_executable(
"${BORINGSSL_SOURCE_DIR}/tool/digest.cc"
"${BORINGSSL_SOURCE_DIR}/tool/fd.cc"
"${BORINGSSL_SOURCE_DIR}/tool/file.cc"
"${BORINGSSL_SOURCE_DIR}/tool/generate_ech.cc"
"${BORINGSSL_SOURCE_DIR}/tool/generate_ed25519.cc"
"${BORINGSSL_SOURCE_DIR}/tool/genrsa.cc"
"${BORINGSSL_SOURCE_DIR}/tool/pkcs12.cc"

2
contrib/fastops vendored

@ -1 +1 @@
Subproject commit 012b777df9e2d145a24800a6c8c3d4a0249bb09e
Subproject commit 1460583af7d13c0e980ce46aec8ee9400314669a

View File

@ -18,8 +18,10 @@
* Define overrides for non-standard allocator-related functions if they are
* present on the system.
*/
#if !defined(USE_MUSL)
#define JEMALLOC_OVERRIDE_MEMALIGN
#define JEMALLOC_OVERRIDE_VALLOC
#endif
/*
* At least Linux omits the "const" in:

View File

@ -1,6 +1,6 @@
// OSX does not have this for system alloc functions, so you will get
// "exception specification in declaration" error.
#if defined(__APPLE__) || defined(__FreeBSD__)
#if defined(__APPLE__) || defined(__FreeBSD__) || defined(USE_MUSL)
# undef JEMALLOC_NOTHROW
# define JEMALLOC_NOTHROW

View File

@ -13,12 +13,14 @@
* Define overrides for non-standard allocator-related functions if they are
* present on the system.
*/
#if !defined(USE_MUSL)
#define JEMALLOC_OVERRIDE___LIBC_CALLOC
#define JEMALLOC_OVERRIDE___LIBC_FREE
#define JEMALLOC_OVERRIDE___LIBC_MALLOC
#define JEMALLOC_OVERRIDE___LIBC_MEMALIGN
#define JEMALLOC_OVERRIDE___LIBC_REALLOC
#define JEMALLOC_OVERRIDE___LIBC_VALLOC
#endif
/* #undef JEMALLOC_OVERRIDE___POSIX_MEMALIGN */
/*

View File

@ -56,6 +56,10 @@ if (USE_UNWIND)
target_compile_definitions(cxx PUBLIC -DSTD_EXCEPTION_HAS_STACK_TRACE=1)
endif ()
if (USE_MUSL)
target_compile_definitions(cxx PUBLIC -D_LIBCPP_HAS_MUSL_LIBC=1)
endif ()
# Override the deduced attribute support that causes error.
if (OS_DARWIN AND COMPILER_GCC)
add_compile_definitions(_LIBCPP_INIT_PRIORITY_MAX)

1
contrib/libprotobuf-mutator vendored Submodule

@ -0,0 +1 @@
Subproject commit ffd86a32874e5c08a143019aad1aaf0907294c9f

View File

@ -0,0 +1,14 @@
set(LIBRARY_DIR ${ClickHouse_SOURCE_DIR}/contrib/libprotobuf-mutator)
add_library(protobuf-mutator
${LIBRARY_DIR}/src/libfuzzer/libfuzzer_macro.cc
${LIBRARY_DIR}/src/libfuzzer/libfuzzer_mutator.cc
${LIBRARY_DIR}/src/binary_format.cc
${LIBRARY_DIR}/src/mutator.cc
${LIBRARY_DIR}/src/text_format.cc
${LIBRARY_DIR}/src/utf8_fix.cc)
target_include_directories(protobuf-mutator BEFORE PRIVATE "${LIBRARY_DIR}")
target_include_directories(protobuf-mutator BEFORE PRIVATE "${ClickHouse_SOURCE_DIR}/contrib/protobuf/src")
target_link_libraries(protobuf-mutator ${Protobuf_LIBRARY})

View File

@ -98,7 +98,9 @@
#define HAVE_BCOPY 1
/* Define to 1 if you have the <bits/types.h> header file. */
#if !defined(USE_MUSL)
#define HAVE_BITS_TYPES_H 1
#endif
/* Define to 1 if you have the `chroot' function. */
#define HAVE_CHROOT 1

2
contrib/sysroot vendored

@ -1 +1 @@
Subproject commit 002415524b5d14124bb8a61a3ce7ac65774f5479
Subproject commit 6172893931e19b028f9cabb7095a44361be863df

View File

@ -47,13 +47,17 @@ then
fi
URL="https://builds.clickhouse.com/master/${DIR}/clickhouse"
echo
echo "Will download ${URL}"
echo
curl -O "${URL}" && chmod a+x clickhouse &&
echo
echo "Successfully downloaded the ClickHouse binary, you can run it as:
./clickhouse"
if [ "${OS}" = "Linux" ]
then
echo
echo "You can also install it:
sudo ./clickhouse install"
fi

View File

@ -27,10 +27,11 @@ It is recommended to use official pre-compiled `deb` packages for Debian or Ubun
{% include 'install/deb.sh' %}
```
If you want to use the most recent version, replace `stable` with `testing` (this is recommended for your testing environments).
You can replace `stable` with `lts` or `testing` to use different [“release trains”](../faq/operations/production.md) based on your needs.
You can also download and install packages manually from [here](https://repo.clickhouse.com/deb/stable/main/).
#### Packages {#packages}
- `clickhouse-common-static` — Installs ClickHouse compiled binary files.

View File

@ -127,6 +127,7 @@ You can pass parameters to `clickhouse-client` (all parameters have a default va
- `--secure` If specified, will connect to server over secure connection.
- `--history_file` — Path to a file containing command history.
- `--param_<name>` — Value for a [query with parameters](#cli-queries-with-parameters).
- `--hardware-utilization` — Print hardware utilization information in progress bar.
Since version 20.5, `clickhouse-client` has automatic syntax highlighting (always enabled).

View File

@ -10,7 +10,7 @@ Columns:
- `[]` — All users share the same quota.
- `['user_name']` — Connections with the same user name share the same quota.
- `['ip_address']` — Connections from the same IP share the same quota.
- `['client_key']` — Connections with the same key share the same quota. A key must be explicitly provided by a client. When using [clickhouse-client](../../interfaces/cli.md), pass a key value in the `--quota-key` parameter, or use the `quota_key` parameter in the client configuration file. When using HTTP interface, use the `X-ClickHouse-Quota` header.
- `['client_key']` — Connections with the same key share the same quota. A key must be explicitly provided by a client. When using [clickhouse-client](../../interfaces/cli.md), pass a key value in the `--quota_key` parameter, or use the `quota_key` parameter in the client configuration file. When using HTTP interface, use the `X-ClickHouse-Quota` header.
- `['user_name', 'client_key']` — Connections with the same `client_key` share the same quota. If a key isnt provided by a client, the qouta is tracked for `user_name`.
- `['client_key', 'ip_address']` — Connections with the same `client_key` share the same quota. If a key isnt provided by a client, the qouta is tracked for `ip_address`.
- `durations` ([Array](../../sql-reference/data-types/array.md)([UInt64](../../sql-reference/data-types/int-uint.md))) — Time interval lengths in seconds.

View File

@ -155,6 +155,60 @@ Configuration example:
LAYOUT(COMPLEX_KEY_HASHED())
```
### complex_key_sparse_hashed {#complex-key-sparse-hashed}
This type of storage is for use with composite [keys](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-structure.md). Similar to `sparse_hashed`.
Configuration example:
``` xml
<layout>
<complex_key_sparse_hashed />
</layout>
```
``` sql
LAYOUT(COMPLEX_KEY_SPARSE_HASHED())
```
### hashed_array {#dicts-external_dicts_dict_layout-hashed-array}
The dictionary is completely stored in memory. Each attribute is stored in array. Key attribute is stored in the form of hashed table where value is index in attributes array. The dictionary can contain any number of elements with any identifiers In practice, the number of keys can reach tens of millions of items.
All types of sources are supported. When updating, data (from a file or from a table) is read in its entirety.
Configuration example:
``` xml
<layout>
<hashed_array>
</hashed_array>
</layout>
```
or
``` sql
LAYOUT(HASHED_ARRAY())
```
### complex_key_hashed_array {#complex-key-hashed-array}
This type of storage is for use with composite [keys](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-structure.md). Similar to `hashed_array`.
Configuration example:
``` xml
<layout>
<complex_key_hashed_array />
</layout>
```
``` sql
LAYOUT(COMPLEX_KEY_HASHED_ARRAY())
```
### range_hashed {#range-hashed}
The dictionary is stored in memory in the form of a hash table with an ordered array of ranges and their corresponding values.

View File

@ -11,7 +11,7 @@
- `[]`Все пользователи используют одну и ту же квоту.
- `['user_name']` — Соединения с одинаковым именем пользователя используют одну и ту же квоту.
- `['ip_address']` — Соединения с одинаковым IP-адресом используют одну и ту же квоту.
- `['client_key']` — Соединения с одинаковым ключом используют одну и ту же квоту. Ключ может быть явно задан клиентом. При использовании [clickhouse-client](../../interfaces/cli.md), передайте ключевое значение в параметре `--quota-key`, или используйте параметр `quota_key` файле настроек клиента. В случае использования HTTP интерфейса, используйте заголовок `X-ClickHouse-Quota`.
- `['client_key']` — Соединения с одинаковым ключом используют одну и ту же квоту. Ключ может быть явно задан клиентом. При использовании [clickhouse-client](../../interfaces/cli.md), передайте ключевое значение в параметре `--quota_key`, или используйте параметр `quota_key` файле настроек клиента. В случае использования HTTP интерфейса, используйте заголовок `X-ClickHouse-Quota`.
- `['user_name', 'client_key']` — Соединения с одинаковым ключом используют одну и ту же квоту. Если ключ не предоставлен клиентом, то квота отслеживается для `user_name`.
- `['client_key', 'ip_address']` — Соединения с одинаковым ключом используют одну и ту же квоту. Если ключ не предоставлен клиентом, то квота отслеживается для `ip_address`.
- `durations` ([Array](../../sql-reference/data-types/array.md)([UInt64](../../sql-reference/data-types/int-uint.md))) — Длины временных интервалов для расчета потребления ресурсов, в секундах.

View File

@ -21,8 +21,6 @@
- [`sumMap`](../../sql-reference/aggregate-functions/reference/summap.md#agg_functions-summap)
- [`minMap`](../../sql-reference/aggregate-functions/reference/minmap.md#agg_functions-minmap)
- [`maxMap`](../../sql-reference/aggregate-functions/reference/maxmap.md#agg_functions-maxmap)
- [`argMin`](../../sql-reference/aggregate-functions/reference/argmin.md)
- [`argMax`](../../sql-reference/aggregate-functions/reference/argmax.md)
!!! note "Примечание"
Значения `SimpleAggregateFunction(func, Type)` отображаются и хранятся так же, как и `Type`, поэтому комбинаторы [-Merge](../../sql-reference/aggregate-functions/combinators.md#aggregate_functions_combinators-merge) и [-State](../../sql-reference/aggregate-functions/combinators.md#agg-functions-combinator-state) не требуются.

View File

@ -28,7 +28,7 @@
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/UseSSL.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Interpreters/Context.h>
#include <Client/Connection.h>
#include <Common/InterruptListener.h>
@ -432,7 +432,7 @@ private:
Progress progress;
executor.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
BlockStreamProfileInfo info;
ProfileInfo info;
while (Block block = executor.read())
info.update(block);

View File

@ -9,8 +9,8 @@
#include <IO/ConnectionTimeoutsContext.h>
#include <Interpreters/InterpreterInsertQuery.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/QueryPipelineBuilder.h>
#include <Processors/Chain.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/Chain.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/Sources/RemoteSource.h>

View File

@ -57,14 +57,6 @@ std::shared_ptr<ASTStorage> createASTStorageDistributed(
}
BlockInputStreamPtr squashStreamIntoOneBlock(const BlockInputStreamPtr & stream)
{
return std::make_shared<SquashingBlockInputStream>(
stream,
std::numeric_limits<size_t>::max(),
std::numeric_limits<size_t>::max());
}
Block getBlockWithAllStreamData(QueryPipeline pipeline)
{
QueryPipelineBuilder builder;
@ -82,7 +74,6 @@ Block getBlockWithAllStreamData(QueryPipeline pipeline)
return block;
}
bool isExtendedDefinitionStorage(const ASTPtr & storage_ast)
{
const auto & storage = storage_ast->as<ASTStorage &>();

View File

@ -49,9 +49,7 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTExpressionList.h>
#include <Formats/FormatSettings.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/copyData.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/Operators.h>
#include <IO/ReadBufferFromString.h>

View File

@ -66,6 +66,7 @@ namespace ErrorCodes
extern const int CANNOT_OPEN_FILE;
extern const int SYSTEM_ERROR;
extern const int NOT_ENOUGH_SPACE;
extern const int NOT_IMPLEMENTED;
extern const int CANNOT_KILL;
}
@ -75,8 +76,18 @@ namespace ErrorCodes
#define HILITE "\033[1m"
#define END_HILITE "\033[0m"
static constexpr auto CLICKHOUSE_BRIDGE_USER = "clickhouse-bridge";
static constexpr auto CLICKHOUSE_BRIDGE_GROUP = "clickhouse-bridge";
#if defined(OS_DARWIN)
/// Until createUser() and createGroup() are implemented, only sudo-less installations are supported/default for macOS.
static constexpr auto DEFAULT_CLICKHOUSE_SERVER_USER = "";
static constexpr auto DEFAULT_CLICKHOUSE_SERVER_GROUP = "";
static constexpr auto DEFAULT_CLICKHOUSE_BRIDGE_USER = "";
static constexpr auto DEFAULT_CLICKHOUSE_BRIDGE_GROUP = "";
#else
static constexpr auto DEFAULT_CLICKHOUSE_SERVER_USER = "clickhouse";
static constexpr auto DEFAULT_CLICKHOUSE_SERVER_GROUP = "clickhouse";
static constexpr auto DEFAULT_CLICKHOUSE_BRIDGE_USER = "clickhouse-bridge";
static constexpr auto DEFAULT_CLICKHOUSE_BRIDGE_GROUP = "clickhouse-bridge";
#endif
using namespace DB;
namespace po = boost::program_options;
@ -127,20 +138,68 @@ static bool filesEqual(std::string path1, std::string path2)
&& 0 == memcmp(in1.buffer().begin(), in2.buffer().begin(), in1.buffer().size());
}
static void changeOwnership(const String & file_name, const String & user_name, const String & group_name = {}, bool recursive = true)
{
if (!user_name.empty() || !group_name.empty())
{
std::string command = fmt::format("chown {} {}:{} '{}'", (recursive ? "-R" : ""), user_name, group_name, file_name);
fmt::print(" {}\n", command);
executeScript(command);
}
}
static void createGroup(const String & group_name)
{
if (!group_name.empty())
{
#if defined(OS_DARWIN)
// TODO: implement.
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unable to create a group in macOS");
#else
std::string command = fmt::format("groupadd -r {}", group_name);
fmt::print(" {}\n", command);
executeScript(command);
#endif
}
}
static void createUser(const String & user_name, [[maybe_unused]] const String & group_name)
{
if (!user_name.empty())
{
#if defined(OS_DARWIN)
// TODO: implement.
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unable to create a user in macOS");
#else
std::string command = group_name.empty()
? fmt::format("useradd -r --shell /bin/false --home-dir /nonexistent --user-group {}", user_name)
: fmt::format("useradd -r --shell /bin/false --home-dir /nonexistent -g {} {}", group_name, user_name);
fmt::print(" {}\n", command);
executeScript(command);
#endif
}
}
int mainEntryClickHouseInstall(int argc, char ** argv)
{
try
{
po::options_description desc;
desc.add_options()
("help,h", "produce help message")
("prefix", po::value<std::string>()->default_value(""), "prefix for all paths")
("binary-path", po::value<std::string>()->default_value("/usr/bin"), "where to install binaries")
("config-path", po::value<std::string>()->default_value("/etc/clickhouse-server"), "where to install configs")
("log-path", po::value<std::string>()->default_value("/var/log/clickhouse-server"), "where to create log directory")
("data-path", po::value<std::string>()->default_value("/var/lib/clickhouse"), "directory for data")
("pid-path", po::value<std::string>()->default_value("/var/run/clickhouse-server"), "directory for pid file")
("user", po::value<std::string>()->default_value("clickhouse"), "clickhouse user to create")
("group", po::value<std::string>()->default_value("clickhouse"), "clickhouse group to create")
("prefix", po::value<std::string>()->default_value("/"), "prefix for all paths")
("binary-path", po::value<std::string>()->default_value("usr/bin"), "where to install binaries")
("config-path", po::value<std::string>()->default_value("etc/clickhouse-server"), "where to install configs")
("log-path", po::value<std::string>()->default_value("var/log/clickhouse-server"), "where to create log directory")
("data-path", po::value<std::string>()->default_value("var/lib/clickhouse"), "directory for data")
("pid-path", po::value<std::string>()->default_value("var/run/clickhouse-server"), "directory for pid file")
("user", po::value<std::string>()->default_value(DEFAULT_CLICKHOUSE_SERVER_USER), "clickhouse user to create")
("group", po::value<std::string>()->default_value(DEFAULT_CLICKHOUSE_SERVER_GROUP), "clickhouse group to create")
;
po::variables_map options;
@ -153,10 +212,9 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
<< argv[0]
<< " install [options]\n";
std::cout << desc << '\n';
return 1;
}
try
{
/// We need to copy binary to the binary directory.
/// The binary is currently run. We need to obtain its path from procfs (on Linux).
@ -171,6 +229,9 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
if (res != 0)
Exception(ErrorCodes::FILE_DOESNT_EXIST, "Cannot obtain path to the binary");
if (path.back() == '\0')
path.pop_back();
fs::path binary_self_path(path);
#else
fs::path binary_self_path = "/proc/self/exe";
@ -186,8 +247,8 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
/// TODO An option to link instead of copy - useful for developers.
fs::path prefix = fs::path(options["prefix"].as<std::string>());
fs::path bin_dir = prefix / fs::path(options["binary-path"].as<std::string>());
fs::path prefix = options["prefix"].as<std::string>();
fs::path bin_dir = prefix / options["binary-path"].as<std::string>();
fs::path main_bin_path = bin_dir / "clickhouse";
fs::path main_bin_tmp_path = bin_dir / "clickhouse.new";
@ -225,6 +286,12 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
}
else
{
if (!fs::exists(bin_dir))
{
fmt::print("Creating binary directory {}.\n", bin_dir.string());
fs::create_directories(bin_dir);
}
size_t available_space = fs::space(bin_dir).available;
if (available_space < binary_size)
throw Exception(ErrorCodes::NOT_ENOUGH_SPACE, "Not enough space for clickhouse binary in {}, required {}, available {}.",
@ -326,34 +393,18 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
std::string user = options["user"].as<std::string>();
std::string group = options["group"].as<std::string>();
auto create_group = [](const String & group_name)
{
std::string command = fmt::format("groupadd -r {}", group_name);
fmt::print(" {}\n", command);
executeScript(command);
};
if (!group.empty())
{
fmt::print("Creating clickhouse group if it does not exist.\n");
create_group(group);
createGroup(group);
}
else
fmt::print("Will not create clickhouse group");
auto create_user = [](const String & user_name, const String & group_name)
{
std::string command = group_name.empty()
? fmt::format("useradd -r --shell /bin/false --home-dir /nonexistent --user-group {}", user_name)
: fmt::format("useradd -r --shell /bin/false --home-dir /nonexistent -g {} {}", group_name, user_name);
fmt::print(" {}\n", command);
executeScript(command);
};
fmt::print("Will not create a dedicated clickhouse group.\n");
if (!user.empty())
{
fmt::print("Creating clickhouse user if it does not exist.\n");
create_user(user, group);
createUser(user, group);
if (group.empty())
group = user;
@ -361,6 +412,11 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
/// Setting ulimits.
try
{
#if defined(OS_DARWIN)
/// TODO Set ulimits on macOS.
#else
fs::path ulimits_dir = "/etc/security/limits.d";
fs::path ulimits_file = ulimits_dir / fmt::format("{}.conf", user);
fmt::print("Will set ulimits for {} user in {}.\n", user, ulimits_file.string());
@ -374,16 +430,15 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
out.write(ulimits_content.data(), ulimits_content.size());
out.sync();
out.finalize();
#endif
}
catch (...)
{
std::cerr << "Cannot set ulimits: " << getCurrentExceptionMessage(false) << "\n";
}
/// TODO Set ulimits on Mac OS X
}
else
fmt::print("Will not create clickhouse user.\n");
fmt::print("Will not create a dedicated clickhouse user.\n");
/// Creating configuration files and directories.
@ -400,9 +455,9 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
fs::path config_d = config_dir / "config.d";
fs::path users_d = config_dir / "users.d";
std::string log_path = prefix / options["log-path"].as<std::string>();
std::string data_path = prefix / options["data-path"].as<std::string>();
std::string pid_path = prefix / options["pid-path"].as<std::string>();
fs::path log_path = prefix / options["log-path"].as<std::string>();
fs::path data_path = prefix / options["data-path"].as<std::string>();
fs::path pid_path = prefix / options["pid-path"].as<std::string>();
bool has_password_for_default_user = false;
@ -426,12 +481,80 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
fmt::print("There is no default config.xml, you have to download it and place to {}.\n", main_config_file.string());
}
else
{
{
WriteBufferFromFile out(main_config_file.string());
out.write(main_config_content.data(), main_config_content.size());
out.sync();
out.finalize();
}
/// Override the default paths.
/// Data paths.
{
std::string data_file = config_d / "data-paths.xml";
WriteBufferFromFile out(data_file);
out << "<yandex>\n"
" <path>" << data_path.string() << "</path>\n"
" <tmp_path>" << (data_path / "tmp").string() << "</tmp_path>\n"
" <user_files_path>" << (data_path / "user_files").string() << "</user_files_path>\n"
" <format_schema_path>" << (data_path / "format_schemas").string() << "</format_schema_path>\n"
"</yandex>\n";
out.sync();
out.finalize();
fmt::print("Data path configuration override is saved to file {}.\n", data_file);
}
/// Logger.
{
std::string logger_file = config_d / "logger.xml";
WriteBufferFromFile out(logger_file);
out << "<yandex>\n"
" <logger>\n"
" <log>" << (log_path / "clickhouse-server.log").string() << "</log>\n"
" <errorlog>" << (log_path / "clickhouse-server.err.log").string() << "</errorlog>\n"
" </logger>\n"
"</yandex>\n";
out.sync();
out.finalize();
fmt::print("Log path configuration override is saved to file {}.\n", logger_file);
}
/// User directories.
{
std::string user_directories_file = config_d / "user-directories.xml";
WriteBufferFromFile out(user_directories_file);
out << "<yandex>\n"
" <user_directories>\n"
" <local_directory>\n"
" <path>" << (data_path / "access").string() << "</path>\n"
" </local_directory>\n"
" </user_directories>\n"
"</yandex>\n";
out.sync();
out.finalize();
fmt::print("User directory path configuration override is saved to file {}.\n", user_directories_file);
}
/// OpenSSL.
{
std::string openssl_file = config_d / "openssl.xml";
WriteBufferFromFile out(openssl_file);
out << "<yandex>\n"
" <openSSL>\n"
" <server>\n"
" <certificateFile>" << (config_dir / "server.crt").string() << "</certificateFile>\n"
" <privateKeyFile>" << (config_dir / "server.key").string() << "</privateKeyFile>\n"
" <dhParamsFile>" << (config_dir / "dhparam.pem").string() << "</dhParamsFile>\n"
" </server>\n"
" </openSSL>\n"
"</yandex>\n";
out.sync();
out.finalize();
fmt::print("OpenSSL path configuration override is saved to file {}.\n", openssl_file);
}
}
}
else
{
@ -443,13 +566,13 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
if (configuration->has("path"))
{
data_path = configuration->getString("path");
fmt::print("{} has {} as data path.\n", main_config_file.string(), data_path);
fmt::print("{} has {} as data path.\n", main_config_file.string(), data_path.string());
}
if (configuration->has("logger.log"))
{
log_path = fs::path(configuration->getString("logger.log")).remove_filename();
fmt::print("{} has {} as log path.\n", main_config_file.string(), log_path);
fmt::print("{} has {} as log path.\n", main_config_file.string(), log_path.string());
}
}
@ -485,82 +608,44 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
}
}
auto change_ownership = [](const String & file_name, const String & user_name, const String & group_name)
{
std::string command = fmt::format("chown --recursive {}:{} '{}'", user_name, group_name, file_name);
fmt::print(" {}\n", command);
executeScript(command);
};
/// Chmod and chown configs
change_ownership(config_dir.string(), user, group);
/// Symlink "preprocessed_configs" is created by the server, so "write" is needed.
fs::permissions(config_dir, fs::perms::owner_all, fs::perm_options::replace);
/// Subdirectories, so "execute" is needed.
if (fs::exists(config_d))
fs::permissions(config_d, fs::perms::owner_read | fs::perms::owner_exec, fs::perm_options::replace);
if (fs::exists(users_d))
fs::permissions(users_d, fs::perms::owner_read | fs::perms::owner_exec, fs::perm_options::replace);
/// Readonly.
if (fs::exists(main_config_file))
fs::permissions(main_config_file, fs::perms::owner_read, fs::perm_options::replace);
if (fs::exists(users_config_file))
fs::permissions(users_config_file, fs::perms::owner_read, fs::perm_options::replace);
/// Create directories for data and log.
if (fs::exists(log_path))
{
fmt::print("Log directory {} already exists.\n", log_path);
fmt::print("Log directory {} already exists.\n", log_path.string());
}
else
{
fmt::print("Creating log directory {}.\n", log_path);
fmt::print("Creating log directory {}.\n", log_path.string());
fs::create_directories(log_path);
}
if (fs::exists(data_path))
{
fmt::print("Data directory {} already exists.\n", data_path);
fmt::print("Data directory {} already exists.\n", data_path.string());
}
else
{
fmt::print("Creating data directory {}.\n", data_path);
fmt::print("Creating data directory {}.\n", data_path.string());
fs::create_directories(data_path);
}
if (fs::exists(pid_path))
{
fmt::print("Pid directory {} already exists.\n", pid_path);
fmt::print("Pid directory {} already exists.\n", pid_path.string());
}
else
{
fmt::print("Creating pid directory {}.\n", pid_path);
fmt::print("Creating pid directory {}.\n", pid_path.string());
fs::create_directories(pid_path);
}
/// Chmod and chown data and log directories
{
std::string command = fmt::format("chown --recursive {}:{} '{}'", user, group, log_path);
fmt::print(" {}\n", command);
executeScript(command);
}
changeOwnership(log_path, user, group);
changeOwnership(pid_path, user, group);
{
std::string command = fmt::format("chown --recursive {}:{} '{}'", user, group, pid_path);
fmt::print(" {}\n", command);
executeScript(command);
}
{
/// Not recursive, because there can be a huge number of files and it will be slow.
std::string command = fmt::format("chown {}:{} '{}'", user, group, data_path);
fmt::print(" {}\n", command);
executeScript(command);
}
changeOwnership(data_path, user, group, /* recursive= */ false);
/// All users are allowed to read pid file (for clickhouse status command).
fs::permissions(pid_path, fs::perms::owner_all | fs::perms::group_read | fs::perms::others_read, fs::perm_options::replace);
@ -576,13 +661,13 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
if (fs::exists(odbc_bridge_path) || fs::exists(library_bridge_path))
{
create_group(CLICKHOUSE_BRIDGE_GROUP);
create_user(CLICKHOUSE_BRIDGE_USER, CLICKHOUSE_BRIDGE_GROUP);
createGroup(DEFAULT_CLICKHOUSE_BRIDGE_GROUP);
createUser(DEFAULT_CLICKHOUSE_BRIDGE_USER, DEFAULT_CLICKHOUSE_BRIDGE_GROUP);
if (fs::exists(odbc_bridge_path))
change_ownership(odbc_bridge_path, CLICKHOUSE_BRIDGE_USER, CLICKHOUSE_BRIDGE_GROUP);
changeOwnership(odbc_bridge_path, DEFAULT_CLICKHOUSE_BRIDGE_USER, DEFAULT_CLICKHOUSE_BRIDGE_GROUP);
if (fs::exists(library_bridge_path))
change_ownership(library_bridge_path, CLICKHOUSE_BRIDGE_USER, CLICKHOUSE_BRIDGE_GROUP);
changeOwnership(library_bridge_path, DEFAULT_CLICKHOUSE_BRIDGE_USER, DEFAULT_CLICKHOUSE_BRIDGE_GROUP);
}
bool stdin_is_a_tty = isatty(STDIN_FILENO);
@ -701,6 +786,25 @@ int mainEntryClickHouseInstall(int argc, char ** argv)
}
}
/// Chmod and chown configs
changeOwnership(config_dir, user, group);
/// Symlink "preprocessed_configs" is created by the server, so "write" is needed.
fs::permissions(config_dir, fs::perms::owner_all, fs::perm_options::replace);
/// Subdirectories, so "execute" is needed.
if (fs::exists(config_d))
fs::permissions(config_d, fs::perms::owner_read | fs::perms::owner_exec, fs::perm_options::replace);
if (fs::exists(users_d))
fs::permissions(users_d, fs::perms::owner_read | fs::perms::owner_exec, fs::perm_options::replace);
/// Readonly.
if (fs::exists(main_config_file))
fs::permissions(main_config_file, fs::perms::owner_read, fs::perm_options::replace);
if (fs::exists(users_config_file))
fs::permissions(users_config_file, fs::perms::owner_read, fs::perm_options::replace);
std::string maybe_password;
if (has_password_for_default_user)
maybe_password = " --password";
@ -766,11 +870,7 @@ namespace
/// All users are allowed to read pid file (for clickhouse status command).
fs::permissions(pid_path, fs::perms::owner_all | fs::perms::group_read | fs::perms::others_read, fs::perm_options::replace);
{
std::string command = fmt::format("chown --recursive {} '{}'", user, pid_path.string());
fmt::print(" {}\n", command);
executeScript(command);
}
changeOwnership(pid_path, user);
}
std::string command = fmt::format("{} --config-file {} --pid-file {} --daemon",
@ -973,14 +1073,17 @@ namespace
int mainEntryClickHouseStart(int argc, char ** argv)
{
try
{
po::options_description desc;
desc.add_options()
("help,h", "produce help message")
("binary-path", po::value<std::string>()->default_value("/usr/bin"), "directory with binary")
("config-path", po::value<std::string>()->default_value("/etc/clickhouse-server"), "directory with configs")
("pid-path", po::value<std::string>()->default_value("/var/run/clickhouse-server"), "directory for pid file")
("user", po::value<std::string>()->default_value("clickhouse"), "clickhouse user")
("prefix", po::value<std::string>()->default_value("/"), "prefix for all paths")
("binary-path", po::value<std::string>()->default_value("usr/bin"), "directory with binary")
("config-path", po::value<std::string>()->default_value("etc/clickhouse-server"), "directory with configs")
("pid-path", po::value<std::string>()->default_value("var/run/clickhouse-server"), "directory for pid file")
("user", po::value<std::string>()->default_value(DEFAULT_CLICKHOUSE_SERVER_USER), "clickhouse user")
;
po::variables_map options;
@ -995,13 +1098,12 @@ int mainEntryClickHouseStart(int argc, char ** argv)
return 1;
}
try
{
std::string user = options["user"].as<std::string>();
fs::path executable = fs::path(options["binary-path"].as<std::string>()) / "clickhouse-server";
fs::path config = fs::path(options["config-path"].as<std::string>()) / "config.xml";
fs::path pid_file = fs::path(options["pid-path"].as<std::string>()) / "clickhouse-server.pid";
fs::path prefix = options["prefix"].as<std::string>();
fs::path executable = prefix / options["binary-path"].as<std::string>() / "clickhouse-server";
fs::path config = prefix / options["config-path"].as<std::string>() / "config.xml";
fs::path pid_file = prefix / options["pid-path"].as<std::string>() / "clickhouse-server.pid";
return start(user, executable, config, pid_file);
}
@ -1014,11 +1116,14 @@ int mainEntryClickHouseStart(int argc, char ** argv)
int mainEntryClickHouseStop(int argc, char ** argv)
{
try
{
po::options_description desc;
desc.add_options()
("help,h", "produce help message")
("pid-path", po::value<std::string>()->default_value("/var/run/clickhouse-server"), "directory for pid file")
("prefix", po::value<std::string>()->default_value("/"), "prefix for all paths")
("pid-path", po::value<std::string>()->default_value("var/run/clickhouse-server"), "directory for pid file")
("force", po::bool_switch(), "Stop with KILL signal instead of TERM")
;
@ -1034,9 +1139,8 @@ int mainEntryClickHouseStop(int argc, char ** argv)
return 1;
}
try
{
fs::path pid_file = fs::path(options["pid-path"].as<std::string>()) / "clickhouse-server.pid";
fs::path prefix = options["prefix"].as<std::string>();
fs::path pid_file = prefix / options["pid-path"].as<std::string>() / "clickhouse-server.pid";
return stop(pid_file, options["force"].as<bool>());
}
@ -1049,11 +1153,14 @@ int mainEntryClickHouseStop(int argc, char ** argv)
int mainEntryClickHouseStatus(int argc, char ** argv)
{
try
{
po::options_description desc;
desc.add_options()
("help,h", "produce help message")
("pid-path", po::value<std::string>()->default_value("/var/run/clickhouse-server"), "directory for pid file")
("prefix", po::value<std::string>()->default_value("/"), "prefix for all paths")
("pid-path", po::value<std::string>()->default_value("var/run/clickhouse-server"), "directory for pid file")
;
po::variables_map options;
@ -1068,29 +1175,33 @@ int mainEntryClickHouseStatus(int argc, char ** argv)
return 1;
}
try
{
fs::path pid_file = fs::path(options["pid-path"].as<std::string>()) / "clickhouse-server.pid";
fs::path prefix = options["prefix"].as<std::string>();
fs::path pid_file = prefix / options["pid-path"].as<std::string>() / "clickhouse-server.pid";
isRunning(pid_file);
return 0;
}
catch (...)
{
std::cerr << getCurrentExceptionMessage(false) << '\n';
return getCurrentExceptionCode();
}
return 0;
}
int mainEntryClickHouseRestart(int argc, char ** argv)
{
try
{
po::options_description desc;
desc.add_options()
("help,h", "produce help message")
("binary-path", po::value<std::string>()->default_value("/usr/bin"), "directory with binary")
("config-path", po::value<std::string>()->default_value("/etc/clickhouse-server"), "directory with configs")
("pid-path", po::value<std::string>()->default_value("/var/run/clickhouse-server"), "directory for pid file")
("user", po::value<std::string>()->default_value("clickhouse"), "clickhouse user")
("prefix", po::value<std::string>()->default_value("/"), "prefix for all paths")
("binary-path", po::value<std::string>()->default_value("usr/bin"), "directory with binary")
("config-path", po::value<std::string>()->default_value("etc/clickhouse-server"), "directory with configs")
("pid-path", po::value<std::string>()->default_value("var/run/clickhouse-server"), "directory for pid file")
("user", po::value<std::string>()->default_value(DEFAULT_CLICKHOUSE_SERVER_USER), "clickhouse user")
("force", po::value<bool>()->default_value(false), "Stop with KILL signal instead of TERM")
;
@ -1106,16 +1217,16 @@ int mainEntryClickHouseRestart(int argc, char ** argv)
return 1;
}
try
{
std::string user = options["user"].as<std::string>();
fs::path executable = fs::path(options["binary-path"].as<std::string>()) / "clickhouse-server";
fs::path config = fs::path(options["config-path"].as<std::string>()) / "config.xml";
fs::path pid_file = fs::path(options["pid-path"].as<std::string>()) / "clickhouse-server.pid";
fs::path prefix = options["prefix"].as<std::string>();
fs::path executable = prefix / options["binary-path"].as<std::string>() / "clickhouse-server";
fs::path config = prefix / options["config-path"].as<std::string>() / "config.xml";
fs::path pid_file = prefix / options["pid-path"].as<std::string>() / "clickhouse-server.pid";
if (int res = stop(pid_file, options["force"].as<bool>()))
return res;
return start(user, executable, config, pid_file);
}
catch (...)

View File

@ -1,7 +1,6 @@
#include "Handlers.h"
#include "SharedLibraryHandlerFactory.h"
#include <DataStreams/copyData.h>
#include <Formats/FormatFactory.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
#include <IO/WriteHelpers.h>
@ -10,11 +9,13 @@
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTMLForm.h>
#include <Poco/ThreadPool.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Formats/IInputFormat.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <QueryPipeline/Pipe.h>
#include <Server/HTTP/HTMLForm.h>
#include <IO/ReadBufferFromString.h>
@ -189,8 +190,10 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
ReadBufferFromString read_block_buf(params.get("null_values"));
auto format = getContext()->getInputFormat(FORMAT, read_block_buf, *sample_block, DEFAULT_BLOCK_SIZE);
auto reader = std::make_shared<InputStreamFromInputFormat>(format);
auto sample_block_with_nulls = reader->read();
QueryPipeline pipeline(Pipe(std::move(format)));
PullingPipelineExecutor executor(pipeline);
Block sample_block_with_nulls;
executor.pull(sample_block_with_nulls);
LOG_DEBUG(log, "Dictionary sample block with null values: {}", sample_block_with_nulls.dumpStructure());
@ -281,8 +284,10 @@ void LibraryRequestHandler::handleRequest(HTTPServerRequest & request, HTTPServe
auto & read_buf = request.getStream();
auto format = getContext()->getInputFormat(FORMAT, read_buf, *requested_sample_block, DEFAULT_BLOCK_SIZE);
auto reader = std::make_shared<InputStreamFromInputFormat>(format);
auto block = reader->read();
QueryPipeline pipeline(std::move(format));
PullingPipelineExecutor executor(pipeline);
Block block;
executor.pull(block);
auto library_handler = SharedLibraryHandlerFactory::instance().get(dictionary_id);
if (!library_handler)

View File

@ -2,7 +2,6 @@
#include <Common/SharedLibrary.h>
#include <base/logger_useful.h>
#include <DataStreams/OneBlockInputStream.h>
#include "LibraryUtils.h"

View File

@ -32,7 +32,6 @@
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/ReadHelpers.h>
#include <IO/UseSSL.h>
#include <Parsers/parseQuery.h>
#include <Parsers/IAST.h>
#include <base/ErrorHandlers.h>
#include <Functions/registerFunctions.h>
@ -128,10 +127,9 @@ bool LocalServer::executeMultiQuery(const String & all_queries_text)
}
case MultiQueryProcessingStage::PARSING_EXCEPTION:
{
this_query_end = find_first_symbols<'\n'>(this_query_end, all_queries_end);
this_query_begin = this_query_end; /// It's expected syntax error, skip the line
current_exception.reset();
continue;
if (current_exception)
current_exception->rethrow();
return true;
}
case MultiQueryProcessingStage::EXECUTE_QUERY:
{

View File

@ -15,7 +15,7 @@
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeUUID.h>
#include <Interpreters/Context.h>
#include <Processors/Pipe.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/LimitTransform.h>
#include <Common/SipHash.h>
#include <Common/UTF8Helpers.h>
@ -25,7 +25,7 @@
#include <Common/assert_cast.h>
#include <Formats/registerFormats.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/QueryPipelineBuilder.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Core/Block.h>

View File

@ -4,7 +4,6 @@
#include "ODBCBlockInputStream.h"
#include "ODBCBlockOutputStream.h"
#include "getIdentifierQuote.h"
#include <DataStreams/copyData.h>
#include <DataTypes/DataTypeFactory.h>
#include <Formats/FormatFactory.h>
#include <Server/HTTP/WriteBufferFromHTTPServerResponse.h>
@ -15,9 +14,9 @@
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Net/HTMLForm.h>
#include <Poco/ThreadPool.h>
#include <Processors/Formats/InputStreamFromInputFormat.h>
#include <Processors/QueryPipeline.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/Formats/IInputFormat.h>
#include <base/logger_useful.h>
#include <Server/HTTP/HTMLForm.h>

View File

@ -62,7 +62,7 @@
#include <TableFunctions/registerTableFunctions.h>
#include <Formats/registerFormats.h>
#include <Storages/registerStorages.h>
#include <DataStreams/ConnectionCollector.h>
#include <QueryPipeline/ConnectionCollector.h>
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <Common/Config/ConfigReloader.h>
@ -844,7 +844,7 @@ if (ThreadFuzzer::instance().isEffective())
// FIXME logging-related things need synchronization -- see the 'Logger * log' saved
// in a lot of places. For now, disable updating log configuration without server restart.
//setTextLog(global_context->getTextLog());
//buildLoggers(*config, logger());
updateLevels(*config, logger());
global_context->setClustersConfig(config);
global_context->setMacros(std::make_unique<Macros>(*config, "macros", log));
global_context->setExternalAuthenticatorsConfig(*config);

View File

@ -1,8 +1,8 @@
#include "LibraryBridgeHelper.h"
#include <DataStreams/formatBlock.h>
#include <Formats/formatBlock.h>
#include <Dictionaries/DictionarySourceHelpers.h>
#include <Processors/Pipe.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/Formats/IInputFormat.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/WriteBufferFromString.h>

View File

@ -49,7 +49,7 @@ add_subdirectory (Backups)
add_subdirectory (Columns)
add_subdirectory (Common)
add_subdirectory (Core)
add_subdirectory (DataStreams)
add_subdirectory (QueryPipeline)
add_subdirectory (DataTypes)
add_subdirectory (Dictionaries)
add_subdirectory (Disks)
@ -185,7 +185,7 @@ add_object_library(clickhouse_backups Backups)
add_object_library(clickhouse_core Core)
add_object_library(clickhouse_core_mysql Core/MySQL)
add_object_library(clickhouse_compression Compression)
add_object_library(clickhouse_datastreams DataStreams)
add_object_library(clickhouse_querypipeline QueryPipeline)
add_object_library(clickhouse_datatypes DataTypes)
add_object_library(clickhouse_datatypes_serializations DataTypes/Serializations)
add_object_library(clickhouse_databases Databases)
@ -214,6 +214,7 @@ add_object_library(clickhouse_processors_transforms Processors/Transforms)
add_object_library(clickhouse_processors_sources Processors/Sources)
add_object_library(clickhouse_processors_sinks Processors/Sinks)
add_object_library(clickhouse_processors_merges Processors/Merges)
add_object_library(clickhouse_processors_ttl Processors/TTL)
add_object_library(clickhouse_processors_merges_algorithms Processors/Merges/Algorithms)
add_object_library(clickhouse_processors_queryplan Processors/QueryPlan)
add_object_library(clickhouse_processors_queryplan_optimizations Processors/QueryPlan/Optimizations)

View File

@ -45,14 +45,13 @@
#include <Processors/Formats/Impl/NullFormat.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/QueryPipeline.h>
#include <QueryPipeline/QueryPipeline.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Processors/Transforms/AddingDefaultsTransform.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <IO/WriteBufferFromOStream.h>
#include <IO/CompressionMethod.h>
#include <DataStreams/InternalTextLogs.h>
#include <DataStreams/materializeBlock.h>
#include <Client/InternalTextLogs.h>
namespace fs = std::filesystem;
@ -294,7 +293,7 @@ void ClientBase::onReceiveExceptionFromServer(std::unique_ptr<Exception> && e)
}
void ClientBase::onProfileInfo(const BlockStreamProfileInfo & profile_info)
void ClientBase::onProfileInfo(const ProfileInfo & profile_info)
{
if (profile_info.hasAppliedLimit() && output_format)
output_format->setRowsBeforeLimit(profile_info.getRowsBeforeLimit());
@ -517,6 +516,7 @@ void ClientBase::receiveResult(ASTPtr parsed_query)
const size_t poll_interval
= std::max(min_poll_interval, std::min<size_t>(receive_timeout.totalMicroseconds(), default_poll_interval));
bool break_on_timeout = connection->getConnectionType() != IServerConnection::Type::LOCAL;
while (true)
{
Stopwatch receive_watch(CLOCK_MONOTONIC_COARSE);
@ -547,7 +547,7 @@ void ClientBase::receiveResult(ASTPtr parsed_query)
else
{
double elapsed = receive_watch.elapsedSeconds();
if (elapsed > receive_timeout.totalSeconds())
if (break_on_timeout && elapsed > receive_timeout.totalSeconds())
{
std::cout << "Timeout exceeded while receiving data from server."
<< " Waited for " << static_cast<size_t>(elapsed) << " seconds,"
@ -668,7 +668,7 @@ void ClientBase::onEndOfStream()
void ClientBase::onProfileEvents(Block & block)
{
const auto rows = block.rows();
if (rows == 0)
if (rows == 0 || !progress_indication.print_hardware_utilization)
return;
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
const auto & names = typeid_cast<const ColumnString &>(*block.getByName("name").column);
@ -1560,6 +1560,7 @@ void ClientBase::init(int argc, char ** argv)
("ignore-error", "do not stop processing in multiquery mode")
("stacktrace", "print stack traces of exceptions")
("hardware-utilization", "print hardware utilization information in progress bar")
;
addAndCheckOptions(options_description, options, common_arguments);
@ -1626,6 +1627,8 @@ void ClientBase::init(int argc, char ** argv)
config().setBool("verbose", true);
if (options.count("log-level"))
Poco::Logger::root().setLevel(options["log-level"].as<std::string>());
if (options.count("hardware-utilization"))
progress_indication.print_hardware_utilization = true;
query_processing_stage = QueryProcessingStage::fromString(options["stage"].as<std::string>());

View File

@ -112,7 +112,7 @@ private:
void onTotals(Block & block, ASTPtr parsed_query);
void onExtremes(Block & block, ASTPtr parsed_query);
void onReceiveExceptionFromServer(std::unique_ptr<Exception> && e);
void onProfileInfo(const BlockStreamProfileInfo & profile_info);
void onProfileInfo(const ProfileInfo & profile_info);
void onEndOfStream();
void onProfileEvents(Block & block);

View File

@ -10,8 +10,8 @@
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <IO/TimeoutSetter.h>
#include <DataStreams/NativeReader.h>
#include <DataStreams/NativeWriter.h>
#include <Formats/NativeReader.h>
#include <Formats/NativeWriter.h>
#include <Client/Connection.h>
#include <Client/ConnectionParameters.h>
#include <Common/ClickHouseRevision.h>
@ -25,8 +25,8 @@
#include "Core/Block.h"
#include <Interpreters/ClientInfo.h>
#include <Compression/CompressionFactory.h>
#include <Processors/Pipe.h>
#include <Processors/QueryPipelineBuilder.h>
#include <QueryPipeline/Pipe.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/ISink.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <pcg_random.hpp>
@ -1017,9 +1017,9 @@ Progress Connection::receiveProgress() const
}
BlockStreamProfileInfo Connection::receiveProfileInfo() const
ProfileInfo Connection::receiveProfileInfo() const
{
BlockStreamProfileInfo profile_info;
ProfileInfo profile_info;
profile_info.read(*in);
return profile_info;
}

View File

@ -60,6 +60,8 @@ public:
~Connection() override;
IServerConnection::Type getConnectionType() const override { return IServerConnection::Type::SERVER; }
static ServerConnectionPtr createConnection(const ConnectionParameters & parameters, ContextPtr context);
/// Set throttler of network traffic. One throttler could be used for multiple connections to limit total traffic.
@ -255,7 +257,7 @@ private:
std::vector<String> receiveMultistringMessage(UInt64 msg_type) const;
std::unique_ptr<Exception> receiveException() const;
Progress receiveProgress() const;
BlockStreamProfileInfo receiveProfileInfo() const;
ProfileInfo receiveProfileInfo() const;
void initInputBuffers();
void initBlockInput();

View File

@ -6,10 +6,9 @@
#include <Core/Block.h>
#include <Core/Protocol.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataStreams/BlockStreamProfileInfo.h>
#include <QueryPipeline/ProfileInfo.h>
#include <Processors/Pipe.h>
#include <QueryPipeline/Pipe.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/Progress.h>
@ -31,7 +30,7 @@ struct Packet
std::unique_ptr<Exception> exception;
std::vector<String> multistring_message;
Progress progress;
BlockStreamProfileInfo profile_info;
ProfileInfo profile_info;
std::vector<UUID> part_uuids;
Packet() : type(Protocol::Server::Hello) {}
@ -57,6 +56,14 @@ class IServerConnection : boost::noncopyable
public:
virtual ~IServerConnection() = default;
enum class Type
{
SERVER,
LOCAL
};
virtual Type getConnectionType() const = 0;
virtual void setDefaultDatabase(const String & database) = 0;
virtual void getServerVersion(

View File

@ -1,4 +1,4 @@
#include "InternalTextLogs.h"
#include <Client/InternalTextLogs.h>
#include <Core/Block.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Common/typeid_cast.h>

View File

@ -1,6 +1,6 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <IO/WriteBuffer.h>
#include <Core/Block.h>
namespace DB

View File

@ -60,15 +60,15 @@ void LocalConnection::updateProgress(const Progress & value)
void LocalConnection::sendQuery(
const ConnectionTimeouts &,
const String & query_,
const String & query_id_,
UInt64,
const String & query,
const String & query_id,
UInt64 stage,
const Settings *,
const ClientInfo *,
bool)
{
query_context = session.makeQueryContext();
query_context->setCurrentQueryId(query_id_);
query_context->setCurrentQueryId(query_id);
if (send_progress)
query_context->setProgressCallback([this] (const Progress & value) { return this->updateProgress(value); });
@ -77,8 +77,9 @@ void LocalConnection::sendQuery(
state.reset();
state.emplace();
state->query_id = query_id_;
state->query = query_;
state->query_id = query_id;
state->query = query;
state->stage = QueryProcessingStage::Enum(stage);
if (send_progress)
state->after_send_progress.restart();

View File

@ -2,7 +2,7 @@
#include "Connection.h"
#include <Interpreters/Context.h>
#include <DataStreams/BlockIO.h>
#include <QueryPipeline/BlockIO.h>
#include <IO/TimeoutSetter.h>
#include <Interpreters/Session.h>
@ -56,6 +56,8 @@ public:
~LocalConnection() override;
IServerConnection::Type getConnectionType() const override { return IServerConnection::Type::LOCAL; }
static ServerConnectionPtr createConnection(const ConnectionParameters & connection_parameters, ContextPtr current_context, bool send_progress = false);
void setDefaultDatabase(const String & database) override;
@ -76,7 +78,7 @@ public:
void sendQuery(
const ConnectionTimeouts & timeouts,
const String & query,
const String & query_id_/* = "" */,
const String & query_id/* = "" */,
UInt64 stage/* = QueryProcessingStage::Complete */,
const Settings * settings/* = nullptr */,
const ClientInfo * client_info/* = nullptr */,

View File

@ -1,11 +1,14 @@
#include "ProgressIndication.h"
#include <algorithm>
#include <cstddef>
#include <numeric>
#include <cmath>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <base/types.h>
#include "Common/formatReadable.h"
#include <Common/TerminalSize.h>
#include <Common/UnicodeBar.h>
#include "IO/WriteBufferFromString.h"
#include <Databases/DatabaseMemory.h>
@ -113,16 +116,17 @@ UInt64 ProgressIndication::getApproximateCoresNumber() const
});
}
UInt64 ProgressIndication::getMemoryUsage() const
ProgressIndication::MemoryUsage ProgressIndication::getMemoryUsage() const
{
return std::accumulate(thread_data.cbegin(), thread_data.cend(), ZERO,
[](UInt64 acc, auto const & host_data)
return std::accumulate(thread_data.cbegin(), thread_data.cend(), MemoryUsage{},
[](MemoryUsage const & acc, auto const & host_data)
{
return acc + std::accumulate(host_data.second.cbegin(), host_data.second.cend(), ZERO,
auto host_usage = std::accumulate(host_data.second.cbegin(), host_data.second.cend(), ZERO,
[](UInt64 memory, auto const & data)
{
return memory + data.second.memory_usage;
});
return MemoryUsage{.total = acc.total + host_usage, .max = std::max(acc.max, host_usage)};
});
}
@ -189,6 +193,27 @@ void ProgressIndication::writeProgress()
written_progress_chars = message.count() - prefix_size - (strlen(indicator) - 2); /// Don't count invisible output (escape sequences).
// If approximate cores number is known, display it.
auto cores_number = getApproximateCoresNumber();
std::string profiling_msg;
if (cores_number != 0 && print_hardware_utilization)
{
WriteBufferFromOwnString profiling_msg_builder;
// Calculated cores number may be not accurate
// so it's better to print min(threads, cores).
UInt64 threads_number = getUsedThreadsCount();
profiling_msg_builder << " Running " << threads_number << " threads on "
<< std::min(cores_number, threads_number) << " cores";
auto [memory_usage, max_host_usage] = getMemoryUsage();
if (memory_usage != 0)
profiling_msg_builder << " with " << formatReadableSizeWithDecimalSuffix(memory_usage) << " RAM used";
if (thread_data.size() > 1 && max_host_usage)
profiling_msg_builder << " total (per host max: " << formatReadableSizeWithDecimalSuffix(max_host_usage) << ")";
profiling_msg_builder << ".";
profiling_msg = profiling_msg_builder.str();
}
/// If the approximate number of rows to process is known, we can display a progress bar and percentage.
if (progress.total_rows_to_read || progress.total_raw_bytes_to_read)
{
@ -215,7 +240,7 @@ void ProgressIndication::writeProgress()
if (show_progress_bar)
{
ssize_t width_of_progress_bar = static_cast<ssize_t>(terminal_width) - written_progress_chars - strlen(" 99%");
ssize_t width_of_progress_bar = static_cast<ssize_t>(terminal_width) - written_progress_chars - strlen(" 99%") - profiling_msg.length();
if (width_of_progress_bar > 0)
{
std::string bar
@ -231,23 +256,7 @@ void ProgressIndication::writeProgress()
message << ' ' << (99 * current_count / max_count) << '%';
}
// If approximate cores number is known, display it.
auto cores_number = getApproximateCoresNumber();
if (cores_number != 0)
{
// Calculated cores number may be not accurate
// so it's better to print min(threads, cores).
UInt64 threads_number = getUsedThreadsCount();
message << " Running " << threads_number << " threads on "
<< std::min(cores_number, threads_number) << " cores";
auto memory_usage = getMemoryUsage();
if (memory_usage != 0)
message << " with " << formatReadableSizeWithDecimalSuffix(memory_usage) << " RAM used.";
else
message << ".";
}
message << profiling_msg;
message << CLEAR_TO_END_OF_LINE;
++increment;

View File

@ -60,13 +60,21 @@ public:
void updateThreadEventData(HostToThreadTimesMap & new_thread_data);
bool print_hardware_utilization = false;
private:
size_t getUsedThreadsCount() const;
UInt64 getApproximateCoresNumber() const;
UInt64 getMemoryUsage() const;
struct MemoryUsage
{
UInt64 total = 0;
UInt64 max = 0;
};
MemoryUsage getMemoryUsage() const;
/// This flag controls whether to show the progress bar. We start showing it after
/// the query has been executing for 0.5 seconds, and is still less than half complete.

View File

@ -126,6 +126,8 @@ QueryProfilerBase<ProfilerImpl>::QueryProfilerBase(const UInt64 thread_id, const
#if defined(OS_FREEBSD)
sev._sigev_un._threadid = thread_id;
#elif defined(USE_MUSL)
sev.sigev_notify_thread_id = thread_id;
#else
sev._sigev_un._tid = thread_id;
#endif

View File

@ -1,4 +1,4 @@
#include <DataStreams/SquashingTransform.h>
#include <Common/SquashingTransform.h>
#include <iostream>

View File

@ -17,7 +17,9 @@ extern "C"
void *aligned_alloc(size_t alignment, size_t size);
void *valloc(size_t size);
void *memalign(size_t alignment, size_t size);
#if !defined(USE_MUSL)
void *pvalloc(size_t size);
#endif
}
#pragma GCC diagnostic pop
@ -39,6 +41,8 @@ static void dummyFunctionForInterposing()
ignore(aligned_alloc(0, 0)); // -V575 NOLINT
ignore(valloc(0)); // -V575 NOLINT
ignore(memalign(0, 0)); // -V575 NOLINT
#if !defined(USE_MUSL)
ignore(pvalloc(0)); // -V575 NOLINT
#endif
}
#endif

View File

@ -707,4 +707,27 @@ ColumnPtr getColumnFromBlock(const Block & block, const NameAndTypePair & column
return current_column;
}
Block materializeBlock(const Block & block)
{
if (!block)
return block;
Block res = block;
size_t columns = res.columns();
for (size_t i = 0; i < columns; ++i)
{
auto & element = res.getByPosition(i);
element.column = element.column->convertToFullColumnIfConst();
}
return res;
}
void materializeBlockInplace(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
block.getByPosition(i).column = block.getByPosition(i).column->convertToFullColumnIfConst();
}
}

View File

@ -196,4 +196,8 @@ void getBlocksDifference(const Block & lhs, const Block & rhs, std::string & out
/// Properly handles cases, when column is a subcolumn and when it is compressed.
ColumnPtr getColumnFromBlock(const Block & block, const NameAndTypePair & column);
/// Converts columns-constants to full columns ("materializes" them).
Block materializeBlock(const Block & block);
void materializeBlockInplace(Block & block);
}

View File

@ -1,5 +1,4 @@
#include <boost/program_options.hpp>
#include <DataStreams/IBlockOutputStream.h>
#include <DataTypes/DataTypeFactory.h>
#include <Storages/IStorage.h>
#include <Storages/ColumnsDescription.h>
@ -10,9 +9,8 @@
#include <IO/ReadBufferFromFile.h>
#include <IO/LimitReadBuffer.h>
#include <Processors/Pipe.h>
#include <QueryPipeline/Pipe.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Formats/IInputFormat.h>

View File

@ -1,7 +1,7 @@
#pragma once
#include <Core/SettingsFields.h>
#include <DataStreams/SizeLimits.h>
#include <QueryPipeline/SizeLimits.h>
#include <Formats/FormatSettings.h>

View File

@ -1,146 +0,0 @@
#include <DataStreams/BlockStreamProfileInfo.h>
#include <DataStreams/IBlockInputStream.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Core/Block.h>
namespace DB
{
void BlockStreamProfileInfo::read(ReadBuffer & in)
{
readVarUInt(rows, in);
readVarUInt(blocks, in);
readVarUInt(bytes, in);
readBinary(applied_limit, in);
readVarUInt(rows_before_limit, in);
readBinary(calculated_rows_before_limit, in);
}
void BlockStreamProfileInfo::write(WriteBuffer & out) const
{
writeVarUInt(rows, out);
writeVarUInt(blocks, out);
writeVarUInt(bytes, out);
writeBinary(hasAppliedLimit(), out);
writeVarUInt(getRowsBeforeLimit(), out);
writeBinary(calculated_rows_before_limit, out);
}
void BlockStreamProfileInfo::setFrom(const BlockStreamProfileInfo & rhs, bool skip_block_size_info)
{
if (!skip_block_size_info)
{
rows = rhs.rows;
blocks = rhs.blocks;
bytes = rhs.bytes;
}
applied_limit = rhs.applied_limit;
rows_before_limit = rhs.rows_before_limit;
calculated_rows_before_limit = rhs.calculated_rows_before_limit;
}
size_t BlockStreamProfileInfo::getRowsBeforeLimit() const
{
if (!calculated_rows_before_limit)
calculateRowsBeforeLimit();
return rows_before_limit;
}
bool BlockStreamProfileInfo::hasAppliedLimit() const
{
if (!calculated_rows_before_limit)
calculateRowsBeforeLimit();
return applied_limit;
}
void BlockStreamProfileInfo::update(Block & block)
{
update(block.rows(), block.bytes());
}
void BlockStreamProfileInfo::update(size_t num_rows, size_t num_bytes)
{
++blocks;
rows += num_rows;
bytes += num_bytes;
}
void BlockStreamProfileInfo::collectInfosForStreamsWithName(const char * name, BlockStreamProfileInfos & res) const
{
if (!parent)
return;
if (parent->getName() == name)
{
res.push_back(this);
return;
}
parent->forEachChild([&] (IBlockInputStream & child)
{
child.getProfileInfo().collectInfosForStreamsWithName(name, res);
return false;
});
}
void BlockStreamProfileInfo::calculateRowsBeforeLimit() const
{
calculated_rows_before_limit = true;
/// is there a Limit?
BlockStreamProfileInfos limits;
collectInfosForStreamsWithName("Limit", limits);
if (!limits.empty())
{
applied_limit = true;
/** Take the number of lines read below `PartialSorting`, if any, or below `Limit`.
* This is necessary, because sorting can return only part of the rows.
*/
BlockStreamProfileInfos partial_sortings;
collectInfosForStreamsWithName("PartialSorting", partial_sortings);
BlockStreamProfileInfos & limits_or_sortings = partial_sortings.empty() ? limits : partial_sortings;
for (const BlockStreamProfileInfo * info_limit_or_sort : limits_or_sortings)
{
info_limit_or_sort->parent->forEachChild([&] (IBlockInputStream & child)
{
rows_before_limit += child.getProfileInfo().rows;
return false;
});
}
}
else
{
/// Then the data about `rows_before_limit` can be in `RemoteBlockInputStream` (come from a remote server).
BlockStreamProfileInfos remotes;
collectInfosForStreamsWithName("Remote", remotes);
collectInfosForStreamsWithName("TreeExecutor", remotes);
if (remotes.empty())
return;
for (const auto & info : remotes)
{
if (info->applied_limit)
{
applied_limit = true;
rows_before_limit += info->rows_before_limit;
}
}
}
}
}

View File

@ -1,359 +0,0 @@
#include <DataStreams/IBlockInputStream.h>
#include <Core/Field.h>
#include <Interpreters/ProcessList.h>
#include <Access/EnabledQuota.h>
#include <Common/CurrentThread.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
namespace ProfileEvents
{
extern const Event ThrottlerSleepMicroseconds;
extern const Event SelectedRows;
extern const Event SelectedBytes;
}
namespace DB
{
namespace ErrorCodes
{
extern const int QUERY_WAS_CANCELLED;
extern const int TOO_MANY_ROWS;
extern const int TOO_MANY_BYTES;
extern const int TOO_MANY_ROWS_OR_BYTES;
extern const int LOGICAL_ERROR;
}
/// It's safe to access children without mutex as long as these methods are called before first call to `read()` or `readPrefix()`.
Block IBlockInputStream::read()
{
if (total_rows_approx)
{
progressImpl(Progress(0, 0, total_rows_approx));
total_rows_approx = 0;
}
if (!info.started)
{
info.total_stopwatch.start();
info.started = true;
}
Block res;
if (isCancelledOrThrowIfKilled())
return res;
if (!checkTimeLimit())
limit_exceeded_need_break = true;
if (!limit_exceeded_need_break)
res = readImpl();
if (res)
{
info.update(res);
if (enabled_extremes)
updateExtremes(res);
if (limits.mode == LimitsMode::LIMITS_CURRENT && !limits.size_limits.check(info.rows, info.bytes, "result", ErrorCodes::TOO_MANY_ROWS_OR_BYTES))
limit_exceeded_need_break = true;
if (quota)
checkQuota(res);
}
else
{
/** If the stream is over, then we will ask all children to abort the execution.
* This makes sense when running a query with LIMIT
* - there is a situation when all the necessary data has already been read,
* but children sources are still working,
* herewith they can work in separate threads or even remotely.
*/
cancel(false);
}
progress(Progress(res.rows(), res.bytes()));
#ifndef NDEBUG
if (res)
{
Block header = getHeader();
if (header)
assertBlocksHaveEqualStructure(res, header, getName());
}
#endif
return res;
}
void IBlockInputStream::readPrefix()
{
#ifndef NDEBUG
if (!read_prefix_is_called)
read_prefix_is_called = true;
else
throw Exception("readPrefix is called twice for " + getName() + " stream", ErrorCodes::LOGICAL_ERROR);
#endif
readPrefixImpl();
forEachChild([&] (IBlockInputStream & child)
{
child.readPrefix();
return false;
});
}
void IBlockInputStream::readSuffix()
{
#ifndef NDEBUG
if (!read_suffix_is_called)
read_suffix_is_called = true;
else
throw Exception("readSuffix is called twice for " + getName() + " stream", ErrorCodes::LOGICAL_ERROR);
#endif
forEachChild([&] (IBlockInputStream & child)
{
child.readSuffix();
return false;
});
readSuffixImpl();
}
void IBlockInputStream::updateExtremes(Block & block)
{
size_t num_columns = block.columns();
if (!extremes)
{
MutableColumns extremes_columns(num_columns);
for (size_t i = 0; i < num_columns; ++i)
{
const ColumnPtr & src = block.safeGetByPosition(i).column;
if (isColumnConst(*src))
{
/// Equal min and max.
extremes_columns[i] = src->cloneResized(2);
}
else
{
Field min_value;
Field max_value;
src->getExtremes(min_value, max_value);
extremes_columns[i] = src->cloneEmpty();
extremes_columns[i]->insert(min_value);
extremes_columns[i]->insert(max_value);
}
}
extremes = block.cloneWithColumns(std::move(extremes_columns));
}
else
{
for (size_t i = 0; i < num_columns; ++i)
{
ColumnPtr & old_extremes = extremes.safeGetByPosition(i).column;
if (isColumnConst(*old_extremes))
continue;
Field min_value = (*old_extremes)[0];
Field max_value = (*old_extremes)[1];
Field cur_min_value;
Field cur_max_value;
block.safeGetByPosition(i).column->getExtremes(cur_min_value, cur_max_value);
if (cur_min_value < min_value)
min_value = cur_min_value;
if (cur_max_value > max_value)
max_value = cur_max_value;
MutableColumnPtr new_extremes = old_extremes->cloneEmpty();
new_extremes->insert(min_value);
new_extremes->insert(max_value);
old_extremes = std::move(new_extremes);
}
}
}
bool IBlockInputStream::checkTimeLimit() const
{
return limits.speed_limits.checkTimeLimit(info.total_stopwatch, limits.timeout_overflow_mode);
}
void IBlockInputStream::checkQuota(Block & block)
{
switch (limits.mode)
{
case LimitsMode::LIMITS_TOTAL:
/// Checked in `progress` method.
break;
case LimitsMode::LIMITS_CURRENT:
{
UInt64 total_elapsed = info.total_stopwatch.elapsedNanoseconds();
quota->used({Quota::RESULT_ROWS, block.rows()}, {Quota::RESULT_BYTES, block.bytes()}, {Quota::EXECUTION_TIME, total_elapsed - prev_elapsed});
prev_elapsed = total_elapsed;
break;
}
}
}
void IBlockInputStream::progressImpl(const Progress & value)
{
if (progress_callback)
progress_callback(value);
if (process_list_elem)
{
if (!process_list_elem->updateProgressIn(value))
cancel(/* kill */ true);
/// The total amount of data processed or intended for processing in all leaf sources, possibly on remote servers.
ProgressValues progress = process_list_elem->getProgressIn();
size_t total_rows_estimate = std::max(progress.read_rows, progress.total_rows_to_read);
/** Check the restrictions on the amount of data to read, the speed of the query, the quota on the amount of data to read.
* NOTE: Maybe it makes sense to have them checked directly in ProcessList?
*/
if (limits.mode == LimitsMode::LIMITS_TOTAL)
{
if (!limits.size_limits.check(total_rows_estimate, progress.read_bytes, "rows to read",
ErrorCodes::TOO_MANY_ROWS, ErrorCodes::TOO_MANY_BYTES))
cancel(false);
}
size_t total_rows = progress.total_rows_to_read;
constexpr UInt64 profile_events_update_period_microseconds = 10 * 1000; // 10 milliseconds
UInt64 total_elapsed_microseconds = info.total_stopwatch.elapsedMicroseconds();
if (last_profile_events_update_time + profile_events_update_period_microseconds < total_elapsed_microseconds)
{
CurrentThread::updatePerformanceCounters();
last_profile_events_update_time = total_elapsed_microseconds;
}
limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_elapsed_microseconds);
if (quota && limits.mode == LimitsMode::LIMITS_TOTAL)
quota->used({Quota::READ_ROWS, value.read_rows}, {Quota::READ_BYTES, value.read_bytes});
}
ProfileEvents::increment(ProfileEvents::SelectedRows, value.read_rows);
ProfileEvents::increment(ProfileEvents::SelectedBytes, value.read_bytes);
}
void IBlockInputStream::cancel(bool kill)
{
if (kill)
is_killed = true;
bool old_val = false;
if (!is_cancelled.compare_exchange_strong(old_val, true, std::memory_order_seq_cst, std::memory_order_relaxed))
return;
forEachChild([&] (IBlockInputStream & child)
{
child.cancel(kill);
return false;
});
}
bool IBlockInputStream::isCancelled() const
{
return is_cancelled;
}
bool IBlockInputStream::isCancelledOrThrowIfKilled() const
{
if (!is_cancelled)
return false;
if (is_killed)
throw Exception("Query was cancelled", ErrorCodes::QUERY_WAS_CANCELLED);
return true;
}
void IBlockInputStream::setProgressCallback(const ProgressCallback & callback)
{
progress_callback = callback;
forEachChild([&] (IBlockInputStream & child)
{
child.setProgressCallback(callback);
return false;
});
}
void IBlockInputStream::setProcessListElement(QueryStatus * elem)
{
process_list_elem = elem;
forEachChild([&] (IBlockInputStream & child)
{
child.setProcessListElement(elem);
return false;
});
}
Block IBlockInputStream::getTotals()
{
if (totals)
return totals;
Block res;
forEachChild([&] (IBlockInputStream & child)
{
res = child.getTotals();
return bool(res);
});
return res;
}
Block IBlockInputStream::getExtremes()
{
if (extremes)
return extremes;
Block res;
forEachChild([&] (IBlockInputStream & child)
{
res = child.getExtremes();
return bool(res);
});
return res;
}
}

View File

@ -1,271 +0,0 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/BlockStreamProfileInfo.h>
#include <DataStreams/ExecutionSpeedLimits.h>
#include <DataStreams/SizeLimits.h>
#include <DataStreams/StreamLocalLimits.h>
#include <IO/Progress.h>
#include <Storages/TableLockHolder.h>
#include <Common/TypePromotion.h>
#include <atomic>
#include <shared_mutex>
namespace DB
{
namespace ErrorCodes
{
}
class ProcessListElement;
class EnabledQuota;
class QueryStatus;
/** The stream interface for reading data by blocks from the database.
* Relational operations are supposed to be done also as implementations of this interface.
* Watches out at how the source of the blocks works.
* Lets you get information for profiling: rows per second, blocks per second, megabytes per second, etc.
* Allows you to stop reading data (in nested sources).
*/
class IBlockInputStream : public TypePromotion<IBlockInputStream>
{
friend struct BlockStreamProfileInfo;
public:
IBlockInputStream() { info.parent = this; }
virtual ~IBlockInputStream() = default;
IBlockInputStream(const IBlockInputStream &) = delete;
IBlockInputStream & operator=(const IBlockInputStream &) = delete;
/// To output the data stream transformation tree (query execution plan).
virtual String getName() const = 0;
/** Get data structure of the stream in a form of "header" block (it is also called "sample block").
* Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values.
* It is guaranteed that method "read" returns blocks of exactly that structure.
*/
virtual Block getHeader() const = 0;
virtual const BlockMissingValues & getMissingValues() const
{
static const BlockMissingValues none;
return none;
}
/** Read next block.
* If there are no more blocks, return an empty block (for which operator `bool` returns false).
* NOTE: Only one thread can read from one instance of IBlockInputStream simultaneously.
* This also applies for readPrefix, readSuffix.
*/
Block read();
/** Read something before starting all data or after the end of all data.
* In the `readSuffix` function, you can implement a finalization that can lead to an exception.
* readPrefix() must be called before the first call to read().
* readSuffix() should be called after read() returns an empty block, or after a call to cancel(), but not during read() execution.
*/
/** The default implementation calls readPrefixImpl() on itself, and then readPrefix() recursively for all children.
* There are cases when you do not want `readPrefix` of children to be called synchronously, in this function,
* but you want them to be called, for example, in separate threads (for parallel initialization of children).
* Then overload `readPrefix` function.
*/
virtual void readPrefix();
/** The default implementation calls recursively readSuffix() on all children, and then readSuffixImpl() on itself.
* If this stream calls read() in children in a separate thread, this behavior is usually incorrect:
* readSuffix() of the child can not be called at the moment when the same child's read() is executed in another thread.
* In this case, you need to override this method so that readSuffix() in children is called, for example, after connecting streams.
*/
virtual void readSuffix();
/// Do not allow to change the table while the blocks stream and its children are alive.
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
/// Get information about execution speed.
const BlockStreamProfileInfo & getProfileInfo() const { return info; }
/** Get "total" values.
* The default implementation takes them from itself or from the first child source in which they are.
* The overridden method can perform some calculations. For example, apply an expression to the `totals` of the child source.
* There can be no total values - then an empty block is returned.
*
* Call this method only after all the data has been retrieved with `read`,
* otherwise there will be problems if any data at the same time is computed in another thread.
*/
virtual Block getTotals();
/// The same for minimums and maximums.
virtual Block getExtremes();
/** Set the execution progress bar callback.
* The callback is passed to all child sources.
* By default, it is called for leaf sources, after each block.
* (But this can be overridden in the progress() method)
* The function takes the number of rows in the last block, the number of bytes in the last block.
* Note that the callback can be called from different threads.
*/
virtual void setProgressCallback(const ProgressCallback & callback);
/** In this method:
* - the progress callback is called;
* - the status of the query execution in ProcessList is updated;
* - checks restrictions and quotas that should be checked not within the same source,
* but over the total amount of resources spent in all sources at once (information in the ProcessList).
*/
virtual void progress(const Progress & value)
{
/// The data for progress is taken from leaf sources.
if (children.empty())
progressImpl(value);
}
void progressImpl(const Progress & value);
/** Set the pointer to the process list item.
* It is passed to all child sources.
* General information about the resources spent on the request will be written into it.
* Based on this information, the quota and some restrictions will be checked.
* This information will also be available in the SHOW PROCESSLIST request.
*/
virtual void setProcessListElement(QueryStatus * elem);
/** Set the approximate total number of rows to read.
*/
void addTotalRowsApprox(size_t value) { total_rows_approx += value; }
/** Ask to abort the receipt of data as soon as possible.
* By default - just sets the flag is_cancelled and asks that all children be interrupted.
* This function can be called several times, including simultaneously from different threads.
* Have two modes:
* with kill = false only is_cancelled is set - streams will stop silently with returning some processed data.
* with kill = true also is_killed set - queries will stop with exception.
*/
virtual void cancel(bool kill);
bool isCancelled() const;
bool isCancelledOrThrowIfKilled() const;
/** Set limitations that checked on each block. */
virtual void setLimits(const StreamLocalLimits & limits_)
{
limits = limits_;
}
const StreamLocalLimits & getLimits() const
{
return limits;
}
/** Set the quota. If you set a quota on the amount of raw data,
* then you should also set mode = LIMITS_TOTAL to LocalLimits with setLimits.
*/
virtual void setQuota(const std::shared_ptr<const EnabledQuota> & new_quota)
{
quota = new_quota;
}
/// Enable calculation of minimums and maximums by the result columns.
void enableExtremes() { enabled_extremes = true; }
protected:
/// Order is important: `table_locks` must be destroyed after `children` so that tables from
/// which child streams read are protected by the locks during the lifetime of the child streams.
std::vector<TableLockHolder> table_locks;
BlockInputStreams children;
std::shared_mutex children_mutex;
BlockStreamProfileInfo info;
std::atomic<bool> is_cancelled{false};
std::atomic<bool> is_killed{false};
ProgressCallback progress_callback;
QueryStatus * process_list_elem = nullptr;
/// According to total_stopwatch in microseconds
UInt64 last_profile_events_update_time = 0;
/// Additional information that can be generated during the work process.
/// Total values during aggregation.
Block totals;
/// Minimums and maximums. The first row of the block - minimums, the second - the maximums.
Block extremes;
void addChild(const BlockInputStreamPtr & child)
{
std::unique_lock lock(children_mutex);
children.push_back(child);
}
/** Check limits.
* But only those that can be checked within each separate stream.
*/
bool checkTimeLimit() const;
#ifndef NDEBUG
bool read_prefix_is_called = false;
bool read_suffix_is_called = false;
#endif
private:
bool enabled_extremes = false;
/// The limit on the number of rows/bytes has been exceeded, and you need to stop execution on the next `read` call, as if the thread has run out.
bool limit_exceeded_need_break = false;
/// Limitations and quotas.
StreamLocalLimits limits;
std::shared_ptr<const EnabledQuota> quota; /// If nullptr - the quota is not used.
UInt64 prev_elapsed = 0;
/// The approximate total number of rows to read. For progress bar.
size_t total_rows_approx = 0;
/// Derived classes must implement this function.
virtual Block readImpl() = 0;
/// Here you can do a preliminary initialization.
virtual void readPrefixImpl() {}
/// Here you need to do a finalization, which can lead to an exception.
virtual void readSuffixImpl() {}
void updateExtremes(Block & block);
/** Check quotas.
* But only those that can be checked within each separate stream.
*/
void checkQuota(Block & block);
size_t checkDepthImpl(size_t max_depth, size_t level) const;
template <typename F>
void forEachChild(F && f)
{
/// NOTE: Acquire a read lock, therefore f() should be thread safe
std::shared_lock lock(children_mutex);
// Reduce lock scope and avoid recursive locking since that is undefined for shared_mutex.
const auto children_copy = children;
lock.unlock();
for (auto & child : children_copy)
if (f(*child))
return;
}
};
}

View File

@ -1,70 +0,0 @@
#pragma once
#include <Core/Block.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Storages/TableLockHolder.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <string>
#include <vector>
namespace DB
{
struct Progress;
/** Interface of stream for writing data (into table, filesystem, network, terminal, etc.)
*/
class IBlockOutputStream : private boost::noncopyable
{
public:
IBlockOutputStream() = default;
/** Get data structure of the stream in a form of "header" block (it is also called "sample block").
* Header block contains column names, data types, columns of size 0. Constant columns must have corresponding values.
* You must pass blocks of exactly this structure to the 'write' method.
*/
virtual Block getHeader() const = 0;
/** Write block.
*/
virtual void write(const Block & block) = 0;
/** Write or do something before all data or after all data.
*/
virtual void writePrefix() {}
virtual void writeSuffix() {}
/** Flush output buffers if any.
*/
virtual void flush() {}
/** Methods to set additional information for output in formats, that support it.
*/
virtual void setRowsBeforeLimit(size_t /*rows_before_limit*/) {}
virtual void setTotals(const Block & /*totals*/) {}
virtual void setExtremes(const Block & /*extremes*/) {}
/** Notify about progress. Method could be called from different threads.
* Passed value are delta, that must be summarized.
*/
virtual void onProgress(const Progress & /*progress*/) {}
/** Content-Type to set when sending HTTP response.
*/
virtual std::string getContentType() const { return "text/plain; charset=UTF-8"; }
virtual ~IBlockOutputStream() = default;
/** Don't let to alter table while instance of stream is alive.
*/
void addTableLock(const TableLockHolder & lock) { table_locks.push_back(lock); }
private:
std::vector<TableLockHolder> table_locks;
};
}

View File

@ -1,17 +0,0 @@
#pragma once
#include <memory>
#include <vector>
namespace DB
{
class IBlockInputStream;
class IBlockOutputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
using BlockInputStreams = std::vector<BlockInputStreamPtr>;
using BlockOutputStreamPtr = std::shared_ptr<IBlockOutputStream>;
using BlockOutputStreams = std::vector<BlockOutputStreamPtr>;
}

View File

@ -1,34 +0,0 @@
#pragma once
#include <DataStreams/materializeBlock.h>
#include <DataStreams/IBlockOutputStream.h>
namespace DB
{
/** Converts columns-constants to full columns ("materializes" them).
*/
class MaterializingBlockOutputStream : public IBlockOutputStream
{
public:
MaterializingBlockOutputStream(const BlockOutputStreamPtr & output_, const Block & header_)
: output{output_}, header(header_) {}
Block getHeader() const override { return header; }
void write(const Block & block) override { output->write(materializeBlock(block)); }
void flush() override { output->flush(); }
void writePrefix() override { output->writePrefix(); }
void writeSuffix() override { output->writeSuffix(); }
void setRowsBeforeLimit(size_t rows_before_limit) override { output->setRowsBeforeLimit(rows_before_limit); }
void setTotals(const Block & totals) override { output->setTotals(materializeBlock(totals)); }
void setExtremes(const Block & extremes) override { output->setExtremes(materializeBlock(extremes)); }
void onProgress(const Progress & progress) override { output->onProgress(progress); }
String getContentType() const override { return output->getContentType(); }
private:
BlockOutputStreamPtr output;
Block header;
};
}

View File

@ -1,41 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
namespace DB
{
/** A stream of blocks from which you can read one block.
*/
class OneBlockInputStream : public IBlockInputStream
{
public:
explicit OneBlockInputStream(Block block_) : block(std::move(block_)) { block.checkNumberOfRows(); }
String getName() const override { return "One"; }
Block getHeader() const override
{
Block res;
for (const auto & elem : block)
res.insert({ elem.column->cloneEmpty(), elem.type, elem.name });
return res;
}
protected:
Block readImpl() override
{
if (has_been_read)
return Block();
has_been_read = true;
return block;
}
private:
Block block;
bool has_been_read = false;
};
}

View File

@ -1,32 +0,0 @@
#include <DataStreams/SquashingBlockInputStream.h>
namespace DB
{
SquashingBlockInputStream::SquashingBlockInputStream(
const BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes, bool reserve_memory)
: header(src->getHeader()), transform(min_block_size_rows, min_block_size_bytes, reserve_memory)
{
children.emplace_back(src);
}
Block SquashingBlockInputStream::readImpl()
{
while (!all_read)
{
Block block = children[0]->read();
if (!block)
all_read = true;
auto squashed_block = transform.add(std::move(block));
if (squashed_block)
{
return squashed_block;
}
}
return {};
}
}

View File

@ -1,31 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/SquashingTransform.h>
namespace DB
{
/** Merging consecutive blocks of stream to specified minimum size.
*/
class SquashingBlockInputStream : public IBlockInputStream
{
public:
SquashingBlockInputStream(const BlockInputStreamPtr & src, size_t min_block_size_rows, size_t min_block_size_bytes,
bool reserve_memory = false);
String getName() const override { return "Squashing"; }
Block getHeader() const override { return header; }
protected:
Block readImpl() override;
private:
Block header;
SquashingTransform transform;
bool all_read = false;
};
}

View File

@ -1,54 +0,0 @@
#include <DataStreams/SquashingBlockOutputStream.h>
namespace DB
{
SquashingBlockOutputStream::SquashingBlockOutputStream(BlockOutputStreamPtr dst, Block header_, size_t min_block_size_rows, size_t min_block_size_bytes)
: output(std::move(dst)), header(std::move(header_)), transform(min_block_size_rows, min_block_size_bytes)
{
}
void SquashingBlockOutputStream::write(const Block & block)
{
auto squashed_block = transform.add(block);
if (squashed_block)
output->write(squashed_block);
}
void SquashingBlockOutputStream::finalize()
{
if (all_written)
return;
all_written = true;
auto squashed_block = transform.add({});
if (squashed_block)
output->write(squashed_block);
}
void SquashingBlockOutputStream::flush()
{
if (!disable_flush)
finalize();
output->flush();
}
void SquashingBlockOutputStream::writePrefix()
{
output->writePrefix();
}
void SquashingBlockOutputStream::writeSuffix()
{
finalize();
output->writeSuffix();
}
}

View File

@ -1,39 +0,0 @@
#pragma once
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/SquashingTransform.h>
namespace DB
{
/** Merging consecutive blocks of stream to specified minimum size.
*/
class SquashingBlockOutputStream : public IBlockOutputStream
{
public:
SquashingBlockOutputStream(BlockOutputStreamPtr dst, Block header_, size_t min_block_size_rows, size_t min_block_size_bytes);
Block getHeader() const override { return header; }
void write(const Block & block) override;
void flush() override;
void writePrefix() override;
void writeSuffix() override;
/// Don't write blocks less than specified size even when flush method was called by user.
void disableFlush() { disable_flush = true; }
private:
BlockOutputStreamPtr output;
Block header;
SquashingTransform transform;
bool all_written = false;
void finalize();
bool disable_flush = false;
};
}

View File

@ -1,86 +0,0 @@
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <Common/ThreadPool.h>
namespace DB
{
namespace
{
bool isAtomicSet(std::atomic<bool> * val)
{
return ((val != nullptr) && val->load(std::memory_order_seq_cst));
}
}
template <typename TCancelCallback, typename TProgressCallback>
void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCallback && is_cancelled, TProgressCallback && progress)
{
from.readPrefix();
to.writePrefix();
while (Block block = from.read())
{
if (is_cancelled())
break;
to.write(block);
progress(block);
}
if (is_cancelled())
return;
/// For outputting additional information in some formats.
if (from.getProfileInfo().hasAppliedLimit())
to.setRowsBeforeLimit(from.getProfileInfo().getRowsBeforeLimit());
to.setTotals(from.getTotals());
to.setExtremes(from.getExtremes());
if (is_cancelled())
return;
from.readSuffix();
to.writeSuffix();
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<void(const Block & block)> & progress,
std::atomic<bool> * is_cancelled)
{
auto is_cancelled_pred = [is_cancelled] ()
{
return isAtomicSet(is_cancelled);
};
copyDataImpl(from, to, is_cancelled_pred, progress);
}
inline void doNothing(const Block &) {}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
{
auto is_cancelled_pred = [is_cancelled] ()
{
return isAtomicSet(is_cancelled);
};
copyDataImpl(from, to, is_cancelled_pred, doNothing);
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled)
{
copyDataImpl(from, to, is_cancelled, doNothing);
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled,
const std::function<void(const Block & block)> & progress)
{
copyDataImpl(from, to, is_cancelled, progress);
}
}

View File

@ -1,27 +0,0 @@
#pragma once
#include <DataStreams/IBlockStream_fwd.h>
#include <atomic>
#include <functional>
namespace DB
{
class Block;
/** Copies data from the InputStream into the OutputStream
* (for example, from the database to the console, etc.)
*/
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<void(const Block & block)> & progress,
std::atomic<bool> * is_cancelled = nullptr);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled,
const std::function<void(const Block & block)> & progress);
}

View File

@ -1,27 +0,0 @@
#include <DataStreams/finalizeBlock.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include <Columns/ColumnAggregateFunction.h>
#include <Common/typeid_cast.h>
namespace DB
{
void finalizeBlock(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
{
ColumnWithTypeAndName & current = block.getByPosition(i);
const DataTypeAggregateFunction * unfinalized_type = typeid_cast<const DataTypeAggregateFunction *>(current.type.get());
if (unfinalized_type)
{
current.type = unfinalized_type->getReturnType();
if (current.column)
{
auto mut_column = IColumn::mutate(std::move(current.column));
current.column = ColumnAggregateFunction::convertToValues(std::move(mut_column));
}
}
}
}
}

View File

@ -1,9 +0,0 @@
#pragma once
#include <Core/Block.h>
namespace DB
{
/// Converts aggregate function columns with non-finalized states to final values
void finalizeBlock(Block & block);
}

View File

@ -1,29 +0,0 @@
#include <DataStreams/materializeBlock.h>
namespace DB
{
Block materializeBlock(const Block & block)
{
if (!block)
return block;
Block res = block;
size_t columns = res.columns();
for (size_t i = 0; i < columns; ++i)
{
auto & element = res.getByPosition(i);
element.column = element.column->convertToFullColumnIfConst();
}
return res;
}
void materializeBlockInplace(Block & block)
{
for (size_t i = 0; i < block.columns(); ++i)
block.getByPosition(i).column = block.getByPosition(i).column->convertToFullColumnIfConst();
}
}

View File

@ -1,14 +0,0 @@
#pragma once
#include <Core/Block.h>
namespace DB
{
/** Converts columns-constants to full columns ("materializes" them).
*/
Block materializeBlock(const Block & block);
void materializeBlockInplace(Block & block);
}

View File

@ -4,8 +4,7 @@
#include <Databases/DatabaseReplicatedSettings.h>
#include <Common/ZooKeeper/ZooKeeper.h>
#include <Core/BackgroundSchedulePool.h>
#include <DataStreams/BlockIO.h>
#include <DataStreams/OneBlockInputStream.h>
#include <QueryPipeline/BlockIO.h>
#include <Interpreters/Context.h>

View File

@ -11,9 +11,9 @@
# include <DataTypes/convertMySQLDataType.h>
# include <Databases/MySQL/DatabaseMySQL.h>
# include <Databases/MySQL/FetchTablesColumnsList.h>
# include <Formats/MySQLSource.h>
# include <Processors/Sources/MySQLSource.h>
# include <Processors/Executors/PullingPipelineExecutor.h>
# include <Processors/QueryPipelineBuilder.h>
# include <QueryPipeline/QueryPipelineBuilder.h>
# include <IO/Operators.h>
# include <Interpreters/Context.h>
# include <Parsers/ASTCreateQuery.h>

View File

@ -9,8 +9,8 @@
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipelineBuilder.h>
#include <Formats/MySQLSource.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Sources/MySQLSource.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>

View File

@ -5,9 +5,9 @@
#include <Core/Block.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Formats/MySQLSource.h>
#include <Processors/Sources/MySQLSource.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipelineBuilder.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/WriteBufferFromFile.h>
#include <Common/quoteString.h>

View File

@ -9,16 +9,14 @@
# include <random>
# include <Columns/ColumnTuple.h>
# include <Columns/ColumnDecimal.h>
# include <Processors/QueryPipelineBuilder.h>
# include <QueryPipeline/QueryPipelineBuilder.h>
# include <Processors/Executors/PullingPipelineExecutor.h>
# include <Processors/Executors/CompletedPipelineExecutor.h>
# include <Processors/Sources/SourceFromSingleChunk.h>
# include <Processors/Transforms/CountingTransform.h>
# include <DataStreams/OneBlockInputStream.h>
# include <DataStreams/copyData.h>
# include <Databases/MySQL/DatabaseMaterializedMySQL.h>
# include <Databases/MySQL/MaterializeMetadata.h>
# include <Formats/MySQLSource.h>
# include <Processors/Sources/MySQLSource.h>
# include <IO/ReadBufferFromString.h>
# include <Interpreters/Context.h>
# include <Interpreters/executeQuery.h>

View File

@ -8,7 +8,7 @@
# include <mutex>
# include <Core/MySQL/MySQLClient.h>
# include <DataStreams/BlockIO.h>
# include <QueryPipeline/BlockIO.h>
# include <DataTypes/DataTypeString.h>
# include <DataTypes/DataTypesNumber.h>
# include <Databases/DatabaseOrdinary.h>

View File

@ -10,6 +10,7 @@ if (CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE" OR CMAKE_BUILD_TYPE_UC STREQUAL "RELW
set_source_files_properties(
FlatDictionary.cpp
HashedDictionary.cpp
HashedArrayDictionary.cpp
CacheDictionary.cpp
RangeHashedDictionary.cpp
DirectDictionary.cpp

View File

@ -14,7 +14,7 @@
#include <Dictionaries/HierarchyDictionariesUtils.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipelineBuilder.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace ProfileEvents
{

View File

@ -2,11 +2,11 @@
#include <memory>
#include <Client/ConnectionPool.h>
#include <Processors/Sources/RemoteSource.h>
#include <DataStreams/RemoteQueryExecutor.h>
#include <QueryPipeline/RemoteQueryExecutor.h>
#include <Interpreters/ActionsDAG.h>
#include <Interpreters/ExpressionActions.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/QueryPipelineBuilder.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <IO/ConnectionTimeouts.h>
#include <Interpreters/Session.h>
#include <Interpreters/executeQuery.h>

View File

@ -16,7 +16,7 @@
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Processors/QueryPipelineBuilder.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
namespace DB

View File

@ -1,7 +1,6 @@
#include "DictionarySourceHelpers.h"
#include <Columns/ColumnsNumber.h>
#include <Core/ColumnWithTypeAndName.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <DataTypes/DataTypesNumber.h>
#include <IO/WriteHelpers.h>
#include "DictionaryStructure.h"

View File

@ -8,7 +8,7 @@
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
#include <Processors/QueryPipelineBuilder.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
namespace DB

View File

@ -4,8 +4,8 @@
#include <base/LocalDateTime.h>
#include <Common/ShellCommand.h>
#include <DataStreams/ShellCommandSource.h>
#include <DataStreams/formatBlock.h>
#include <Processors/Sources/ShellCommandSource.h>
#include <Formats/formatBlock.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>

View File

@ -4,7 +4,7 @@
#include <base/LocalDateTime.h>
#include <Common/ShellCommand.h>
#include <DataStreams/formatBlock.h>
#include <Formats/formatBlock.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>

View File

@ -7,7 +7,7 @@
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryStructure.h>
#include <DataStreams/ShellCommandSource.h>
#include <Processors/Sources/ShellCommandSource.h>
namespace DB

View File

@ -10,7 +10,7 @@
#include <Columns/ColumnNullable.h>
#include <Functions/FunctionHelpers.h>
#include <Processors/QueryPipelineBuilder.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Processors/Executors/PullingPipelineExecutor.h>
#include <Dictionaries//DictionarySource.h>

View File

@ -1,6 +1,5 @@
#include "HTTPDictionarySource.h"
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/formatBlock.h>
#include <Formats/formatBlock.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <IO/ReadWriteBufferFromHTTP.h>

View File

@ -0,0 +1,691 @@
#include "HashedArrayDictionary.h"
#include <Core/Defines.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnNullable.h>
#include <Functions/FunctionHelpers.h>
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int DICTIONARY_IS_EMPTY;
extern const int UNSUPPORTED_METHOD;
}
template <DictionaryKeyType dictionary_key_type>
HashedArrayDictionary<dictionary_key_type>::HashedArrayDictionary(
const StorageID & dict_id_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const HashedArrayDictionaryStorageConfiguration & configuration_,
BlockPtr update_field_loaded_block_)
: IDictionary(dict_id_)
, dict_struct(dict_struct_)
, source_ptr(std::move(source_ptr_))
, configuration(configuration_)
, update_field_loaded_block(std::move(update_field_loaded_block_))
{
createAttributes();
loadData();
calculateBytesAllocated();
}
template <DictionaryKeyType dictionary_key_type>
ColumnPtr HashedArrayDictionary<dictionary_key_type>::getColumn(
const std::string & attribute_name,
const DataTypePtr & result_type,
const Columns & key_columns,
const DataTypes & key_types [[maybe_unused]],
const ColumnPtr & default_values_column) const
{
if (dictionary_key_type == DictionaryKeyType::Complex)
dict_struct.validateKeyTypes(key_types);
ColumnPtr result;
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
DictionaryKeysExtractor<dictionary_key_type> extractor(key_columns, arena_holder.getComplexKeyArena());
const size_t size = extractor.getKeysSize();
const auto & dictionary_attribute = dict_struct.getAttribute(attribute_name, result_type);
const size_t attribute_index = dict_struct.attribute_name_to_index.find(attribute_name)->second;
auto & attribute = attributes[attribute_index];
bool is_attribute_nullable = attribute.is_index_null.has_value();
ColumnUInt8::MutablePtr col_null_map_to;
ColumnUInt8::Container * vec_null_map_to = nullptr;
if (attribute.is_index_null)
{
col_null_map_to = ColumnUInt8::create(size, false);
vec_null_map_to = &col_null_map_to->getData();
}
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
using ColumnProvider = DictionaryAttributeColumnProvider<AttributeType>;
DictionaryDefaultValueExtractor<AttributeType> default_value_extractor(dictionary_attribute.null_value, default_values_column);
auto column = ColumnProvider::getColumn(dictionary_attribute, size);
if constexpr (std::is_same_v<ValueType, Array>)
{
auto * out = column.get();
getItemsImpl<ValueType, false>(
attribute,
extractor,
[&](const size_t, const Array & value, bool) { out->insert(value); },
default_value_extractor);
}
else if constexpr (std::is_same_v<ValueType, StringRef>)
{
auto * out = column.get();
if (is_attribute_nullable)
getItemsImpl<ValueType, true>(
attribute,
extractor,
[&](size_t row, const StringRef value, bool is_null)
{
(*vec_null_map_to)[row] = is_null;
out->insertData(value.data, value.size);
},
default_value_extractor);
else
getItemsImpl<ValueType, false>(
attribute,
extractor,
[&](size_t, const StringRef value, bool) { out->insertData(value.data, value.size); },
default_value_extractor);
}
else
{
auto & out = column->getData();
if (is_attribute_nullable)
getItemsImpl<ValueType, true>(
attribute,
extractor,
[&](size_t row, const auto value, bool is_null)
{
(*vec_null_map_to)[row] = is_null;
out[row] = value;
},
default_value_extractor);
else
getItemsImpl<ValueType, false>(
attribute,
extractor,
[&](size_t row, const auto value, bool) { out[row] = value; },
default_value_extractor);
}
result = std::move(column);
};
callOnDictionaryAttributeType(attribute.type, type_call);
if (is_attribute_nullable)
result = ColumnNullable::create(std::move(result), std::move(col_null_map_to));
return result;
}
template <DictionaryKeyType dictionary_key_type>
ColumnUInt8::Ptr HashedArrayDictionary<dictionary_key_type>::hasKeys(const Columns & key_columns, const DataTypes & key_types) const
{
if (dictionary_key_type == DictionaryKeyType::Complex)
dict_struct.validateKeyTypes(key_types);
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
DictionaryKeysExtractor<dictionary_key_type> extractor(key_columns, arena_holder.getComplexKeyArena());
size_t keys_size = extractor.getKeysSize();
auto result = ColumnUInt8::create(keys_size, false);
auto & out = result->getData();
if (attributes.empty())
{
query_count.fetch_add(keys_size, std::memory_order_relaxed);
return result;
}
size_t keys_found = 0;
for (size_t requested_key_index = 0; requested_key_index < keys_size; ++requested_key_index)
{
auto requested_key = extractor.extractCurrentKey();
out[requested_key_index] = key_attribute.container.find(requested_key) != key_attribute.container.end();
keys_found += out[requested_key_index];
extractor.rollbackCurrentKey();
}
query_count.fetch_add(keys_size, std::memory_order_relaxed);
found_count.fetch_add(keys_found, std::memory_order_relaxed);
return result;
}
template <DictionaryKeyType dictionary_key_type>
ColumnPtr HashedArrayDictionary<dictionary_key_type>::getHierarchy(ColumnPtr key_column [[maybe_unused]], const DataTypePtr &) const
{
if constexpr (dictionary_key_type == DictionaryKeyType::Simple)
{
PaddedPODArray<UInt64> keys_backup_storage;
const auto & keys = getColumnVectorData(this, key_column, keys_backup_storage);
size_t hierarchical_attribute_index = *dict_struct.hierarchical_attribute_index;
const auto & dictionary_attribute = dict_struct.attributes[hierarchical_attribute_index];
const auto & hierarchical_attribute = attributes[hierarchical_attribute_index];
const auto & key_attribute_container = key_attribute.container;
const UInt64 null_value = dictionary_attribute.null_value.template get<UInt64>();
const AttributeContainerType<UInt64> & parent_keys_container = std::get<AttributeContainerType<UInt64>>(hierarchical_attribute.container);
auto is_key_valid_func = [&](auto & key) { return key_attribute_container.find(key) != key_attribute_container.end(); };
size_t keys_found = 0;
auto get_parent_func = [&](auto & hierarchy_key)
{
std::optional<UInt64> result;
auto it = key_attribute_container.find(hierarchy_key);
if (it != key_attribute_container.end())
result = parent_keys_container[it->getMapped()];
keys_found += result.has_value();
return result;
};
auto dictionary_hierarchy_array = getKeysHierarchyArray(keys, null_value, is_key_valid_func, get_parent_func);
query_count.fetch_add(keys.size(), std::memory_order_relaxed);
found_count.fetch_add(keys_found, std::memory_order_relaxed);
return dictionary_hierarchy_array;
}
else
{
return nullptr;
}
}
template <DictionaryKeyType dictionary_key_type>
ColumnUInt8::Ptr HashedArrayDictionary<dictionary_key_type>::isInHierarchy(
ColumnPtr key_column [[maybe_unused]],
ColumnPtr in_key_column [[maybe_unused]],
const DataTypePtr &) const
{
if constexpr (dictionary_key_type == DictionaryKeyType::Simple)
{
PaddedPODArray<UInt64> keys_backup_storage;
const auto & keys = getColumnVectorData(this, key_column, keys_backup_storage);
PaddedPODArray<UInt64> keys_in_backup_storage;
const auto & keys_in = getColumnVectorData(this, in_key_column, keys_in_backup_storage);
size_t hierarchical_attribute_index = *dict_struct.hierarchical_attribute_index;
const auto & dictionary_attribute = dict_struct.attributes[hierarchical_attribute_index];
auto & hierarchical_attribute = attributes[hierarchical_attribute_index];
const auto & key_attribute_container = key_attribute.container;
const UInt64 null_value = dictionary_attribute.null_value.template get<UInt64>();
const AttributeContainerType<UInt64> & parent_keys_container = std::get<AttributeContainerType<UInt64>>(hierarchical_attribute.container);
auto is_key_valid_func = [&](auto & key) { return key_attribute_container.find(key) != key_attribute_container.end(); };
size_t keys_found = 0;
auto get_parent_func = [&](auto & hierarchy_key)
{
std::optional<UInt64> result;
auto it = key_attribute_container.find(hierarchy_key);
if (it != key_attribute_container.end())
result = parent_keys_container[it->getMapped()];
keys_found += result.has_value();
return result;
};
auto result = getKeysIsInHierarchyColumn(keys, keys_in, null_value, is_key_valid_func, get_parent_func);
query_count.fetch_add(keys.size(), std::memory_order_relaxed);
found_count.fetch_add(keys_found, std::memory_order_relaxed);
return result;
}
else
{
return nullptr;
}
}
template <DictionaryKeyType dictionary_key_type>
ColumnPtr HashedArrayDictionary<dictionary_key_type>::getDescendants(
ColumnPtr key_column [[maybe_unused]],
const DataTypePtr &,
size_t level [[maybe_unused]]) const
{
if constexpr (dictionary_key_type == DictionaryKeyType::Simple)
{
PaddedPODArray<UInt64> keys_backup;
const auto & keys = getColumnVectorData(this, key_column, keys_backup);
size_t hierarchical_attribute_index = *dict_struct.hierarchical_attribute_index;
const auto & hierarchical_attribute = attributes[hierarchical_attribute_index];
const AttributeContainerType<UInt64> & parent_keys_container = std::get<AttributeContainerType<UInt64>>(hierarchical_attribute.container);
const auto & key_attribute_container = key_attribute.container;
HashMap<size_t, UInt64> index_to_key;
index_to_key.reserve(key_attribute.container.size());
for (auto & [key, value] : key_attribute_container)
index_to_key[value] = key;
HashMap<UInt64, PaddedPODArray<UInt64>> parent_to_child;
for (size_t i = 0; i < parent_keys_container.size(); ++i)
{
const auto * it = index_to_key.find(i);
if (it == index_to_key.end())
continue;
auto parent_key = it->getMapped();
auto child_key = parent_keys_container[i];
parent_to_child[parent_key].emplace_back(child_key);
}
size_t keys_found = 0;
auto result = getKeysDescendantsArray(keys, parent_to_child, level, keys_found);
query_count.fetch_add(keys.size(), std::memory_order_relaxed);
found_count.fetch_add(keys_found, std::memory_order_relaxed);
return result;
}
else
{
return nullptr;
}
}
template <DictionaryKeyType dictionary_key_type>
void HashedArrayDictionary<dictionary_key_type>::createAttributes()
{
const auto size = dict_struct.attributes.size();
attributes.reserve(size);
for (const auto & dictionary_attribute : dict_struct.attributes)
{
auto type_call = [&, this](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
auto is_index_null = dictionary_attribute.is_nullable ? std::make_optional<std::vector<bool>>() : std::optional<std::vector<bool>>{};
std::unique_ptr<Arena> string_arena = std::is_same_v<AttributeType, String> ? std::make_unique<Arena>() : nullptr;
Attribute attribute{dictionary_attribute.underlying_type, AttributeContainerType<ValueType>(), std::move(is_index_null), std::move(string_arena)};
attributes.emplace_back(std::move(attribute));
};
callOnDictionaryAttributeType(dictionary_attribute.underlying_type, type_call);
}
}
template <DictionaryKeyType dictionary_key_type>
void HashedArrayDictionary<dictionary_key_type>::updateData()
{
if (!update_field_loaded_block || update_field_loaded_block->rows() == 0)
{
QueryPipeline pipeline(source_ptr->loadUpdatedAll());
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
{
/// We are using this to keep saved data if input stream consists of multiple blocks
if (!update_field_loaded_block)
update_field_loaded_block = std::make_shared<DB::Block>(block.cloneEmpty());
for (size_t attribute_index = 0; attribute_index < block.columns(); ++attribute_index)
{
const IColumn & update_column = *block.getByPosition(attribute_index).column.get();
MutableColumnPtr saved_column = update_field_loaded_block->getByPosition(attribute_index).column->assumeMutable();
saved_column->insertRangeFrom(update_column, 0, update_column.size());
}
}
}
else
{
auto pipe = source_ptr->loadUpdatedAll();
mergeBlockWithPipe<dictionary_key_type>(
dict_struct.getKeysSize(),
*update_field_loaded_block,
std::move(pipe));
}
if (update_field_loaded_block)
{
resize(update_field_loaded_block->rows());
blockToAttributes(*update_field_loaded_block.get());
}
}
template <DictionaryKeyType dictionary_key_type>
void HashedArrayDictionary<dictionary_key_type>::blockToAttributes(const Block & block [[maybe_unused]])
{
size_t skip_keys_size_offset = dict_struct.getKeysSize();
Columns key_columns;
key_columns.reserve(skip_keys_size_offset);
/// Split into keys columns and attribute columns
for (size_t i = 0; i < skip_keys_size_offset; ++i)
key_columns.emplace_back(block.safeGetByPosition(i).column);
DictionaryKeysArenaHolder<dictionary_key_type> arena_holder;
DictionaryKeysExtractor<dictionary_key_type> keys_extractor(key_columns, arena_holder.getComplexKeyArena());
const size_t keys_size = keys_extractor.getKeysSize();
Field column_value_to_insert;
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys_extractor.extractCurrentKey();
auto it = key_attribute.container.find(key);
if (it != key_attribute.container.end())
{
keys_extractor.rollbackCurrentKey();
continue;
}
if constexpr (std::is_same_v<KeyType, StringRef>)
key = copyKeyInArena(key);
key_attribute.container.insert({key, element_count});
for (size_t attribute_index = 0; attribute_index < attributes.size(); ++attribute_index)
{
const IColumn & attribute_column = *block.safeGetByPosition(skip_keys_size_offset + attribute_index).column;
auto & attribute = attributes[attribute_index];
bool attribute_is_nullable = attribute.is_index_null.has_value();
attribute_column.get(key_index, column_value_to_insert);
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using AttributeValueType = DictionaryValueType<AttributeType>;
auto & attribute_container = std::get<AttributeContainerType<AttributeValueType>>(attribute.container);
attribute_container.emplace_back();
if (attribute_is_nullable)
{
attribute.is_index_null->emplace_back();
if (column_value_to_insert.isNull())
{
(*attribute.is_index_null).back() = true;
return;
}
}
if constexpr (std::is_same_v<AttributeValueType, StringRef>)
{
String & value_to_insert = column_value_to_insert.get<String>();
size_t value_to_insert_size = value_to_insert.size();
const char * string_in_arena = attribute.string_arena->insert(value_to_insert.data(), value_to_insert_size);
StringRef string_in_arena_reference = StringRef{string_in_arena, value_to_insert_size};
attribute_container.back() = string_in_arena_reference;
}
else
{
auto value_to_insert = column_value_to_insert.get<NearestFieldType<AttributeValueType>>();
attribute_container.back() = value_to_insert;
}
};
callOnDictionaryAttributeType(attribute.type, type_call);
}
++element_count;
keys_extractor.rollbackCurrentKey();
}
}
template <DictionaryKeyType dictionary_key_type>
void HashedArrayDictionary<dictionary_key_type>::resize(size_t added_rows)
{
if (unlikely(!added_rows))
return;
key_attribute.container.reserve(added_rows);
}
template <DictionaryKeyType dictionary_key_type>
template <typename AttributeType, bool is_nullable, typename ValueSetter, typename DefaultValueExtractor>
void HashedArrayDictionary<dictionary_key_type>::getItemsImpl(
const Attribute & attribute,
DictionaryKeysExtractor<dictionary_key_type> & keys_extractor,
ValueSetter && set_value [[maybe_unused]],
DefaultValueExtractor & default_value_extractor) const
{
const auto & key_attribute_container = key_attribute.container;
const auto & attribute_container = std::get<AttributeContainerType<AttributeType>>(attribute.container);
const size_t keys_size = keys_extractor.getKeysSize();
size_t keys_found = 0;
for (size_t key_index = 0; key_index < keys_size; ++key_index)
{
auto key = keys_extractor.extractCurrentKey();
const auto it = key_attribute_container.find(key);
if (it != key_attribute_container.end())
{
size_t element_index = it->getMapped();
const auto & element = attribute_container[element_index];
if constexpr (is_nullable)
set_value(key_index, element, (*attribute.is_index_null)[element_index]);
else
set_value(key_index, element, false);
++keys_found;
}
else
{
if constexpr (is_nullable)
set_value(key_index, default_value_extractor[key_index], default_value_extractor.isNullAt(key_index));
else
set_value(key_index, default_value_extractor[key_index], false);
}
keys_extractor.rollbackCurrentKey();
}
query_count.fetch_add(keys_size, std::memory_order_relaxed);
found_count.fetch_add(keys_found, std::memory_order_relaxed);
}
template <DictionaryKeyType dictionary_key_type>
StringRef HashedArrayDictionary<dictionary_key_type>::copyKeyInArena(StringRef key)
{
size_t key_size = key.size;
char * place_for_key = complex_key_arena.alloc(key_size);
memcpy(reinterpret_cast<void *>(place_for_key), reinterpret_cast<const void *>(key.data), key_size);
StringRef updated_key{place_for_key, key_size};
return updated_key;
}
template <DictionaryKeyType dictionary_key_type>
void HashedArrayDictionary<dictionary_key_type>::loadData()
{
if (!source_ptr->hasUpdateField())
{
QueryPipeline pipeline;
pipeline = QueryPipeline(source_ptr->loadAll());
PullingPipelineExecutor executor(pipeline);
Block block;
while (executor.pull(block))
{
resize(block.rows());
blockToAttributes(block);
}
}
else
{
updateData();
}
if (configuration.require_nonempty && 0 == element_count)
throw Exception(ErrorCodes::DICTIONARY_IS_EMPTY,
"{}: dictionary source is empty and 'require_nonempty' property is set.",
full_name);
}
template <DictionaryKeyType dictionary_key_type>
void HashedArrayDictionary<dictionary_key_type>::calculateBytesAllocated()
{
bytes_allocated += attributes.size() * sizeof(attributes.front());
bytes_allocated += key_attribute.container.size();
for (auto & attribute : attributes)
{
auto type_call = [&](const auto & dictionary_attribute_type)
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
const auto & container = std::get<AttributeContainerType<ValueType>>(attribute.container);
bytes_allocated += sizeof(AttributeContainerType<ValueType>);
if constexpr (std::is_same_v<ValueType, Array>)
{
/// It is not accurate calculations
bytes_allocated += sizeof(Array) * container.size();
}
else
{
bytes_allocated += container.allocated_bytes();
}
bucket_count = container.capacity();
if constexpr (std::is_same_v<ValueType, StringRef>)
bytes_allocated += sizeof(Arena) + attribute.string_arena->size();
};
callOnDictionaryAttributeType(attribute.type, type_call);
if (attribute.string_arena)
bytes_allocated += attribute.string_arena->size();
if (attribute.is_index_null.has_value())
bytes_allocated += (*attribute.is_index_null).size();
}
bytes_allocated += complex_key_arena.size();
if (update_field_loaded_block)
bytes_allocated += update_field_loaded_block->allocatedBytes();
}
template <DictionaryKeyType dictionary_key_type>
Pipe HashedArrayDictionary<dictionary_key_type>::read(const Names & column_names, size_t max_block_size) const
{
PaddedPODArray<HashedArrayDictionary::KeyType> keys;
keys.reserve(key_attribute.container.size());
for (auto & [key, _] : key_attribute.container)
keys.emplace_back(key);
return Pipe(std::make_shared<DictionarySource>(DictionarySourceData(shared_from_this(), std::move(keys), column_names), max_block_size));
}
template class HashedArrayDictionary<DictionaryKeyType::Simple>;
template class HashedArrayDictionary<DictionaryKeyType::Complex>;
void registerDictionaryArrayHashed(DictionaryFactory & factory)
{
auto create_layout = [](const std::string & full_name,
const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
DictionarySourcePtr source_ptr,
DictionaryKeyType dictionary_key_type) -> DictionaryPtr
{
if (dictionary_key_type == DictionaryKeyType::Simple && dict_struct.key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'key' is not supported for simple key hashed array dictionary");
else if (dictionary_key_type == DictionaryKeyType::Complex && dict_struct.id)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "'id' is not supported for complex key hashed array dictionary");
if (dict_struct.range_min || dict_struct.range_max)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"{}: elements .structure.range_min and .structure.range_max should be defined only "
"for a dictionary of layout 'range_hashed'",
full_name);
const auto dict_id = StorageID::fromDictionaryConfig(config, config_prefix);
const DictionaryLifetime dict_lifetime{config, config_prefix + ".lifetime"};
const bool require_nonempty = config.getBool(config_prefix + ".require_nonempty", false);
HashedArrayDictionaryStorageConfiguration configuration{require_nonempty, dict_lifetime};
if (dictionary_key_type == DictionaryKeyType::Simple)
return std::make_unique<HashedArrayDictionary<DictionaryKeyType::Simple>>(dict_id, dict_struct, std::move(source_ptr), configuration);
else
return std::make_unique<HashedArrayDictionary<DictionaryKeyType::Complex>>(dict_id, dict_struct, std::move(source_ptr), configuration);
};
using namespace std::placeholders;
factory.registerLayout("hashed_array",
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* global_context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Simple); }, false);
factory.registerLayout("complex_key_hashed_array",
[=](auto && a, auto && b, auto && c, auto && d, DictionarySourcePtr e, ContextPtr /* global_context */, bool /*created_from_ddl*/){ return create_layout(a, b, c, d, std::move(e), DictionaryKeyType::Complex); }, true);
}
}

View File

@ -0,0 +1,211 @@
#pragma once
#include <atomic>
#include <memory>
#include <variant>
#include <optional>
#include <Common/SparseHashMap.h>
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/HashSet.h>
#include <Core/Block.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryHelpers.h>
/** This dictionary stores all attributes in arrays.
* Key is stored in hash table and value is index into attribute array.
*/
namespace DB
{
struct HashedArrayDictionaryStorageConfiguration
{
const bool require_nonempty;
const DictionaryLifetime lifetime;
};
template <DictionaryKeyType dictionary_key_type>
class HashedArrayDictionary final : public IDictionary
{
public:
using KeyType = std::conditional_t<dictionary_key_type == DictionaryKeyType::Simple, UInt64, StringRef>;
HashedArrayDictionary(
const StorageID & dict_id_,
const DictionaryStructure & dict_struct_,
DictionarySourcePtr source_ptr_,
const HashedArrayDictionaryStorageConfiguration & configuration_,
BlockPtr update_field_loaded_block_ = nullptr);
std::string getTypeName() const override
{
if constexpr (dictionary_key_type == DictionaryKeyType::Simple)
return "HashedArray";
else
return "ComplexHashedArray";
}
size_t getBytesAllocated() const override { return bytes_allocated; }
size_t getQueryCount() const override { return query_count.load(std::memory_order_relaxed); }
double getFoundRate() const override
{
size_t queries = query_count.load(std::memory_order_relaxed);
if (!queries)
return 0;
return static_cast<double>(found_count.load(std::memory_order_relaxed)) / queries;
}
double getHitRate() const override { return 1.0; }
size_t getElementCount() const override { return element_count; }
double getLoadFactor() const override { return static_cast<double>(element_count) / bucket_count; }
std::shared_ptr<const IExternalLoadable> clone() const override
{
return std::make_shared<HashedArrayDictionary<dictionary_key_type>>(getDictionaryID(), dict_struct, source_ptr->clone(), configuration, update_field_loaded_block);
}
const IDictionarySource * getSource() const override { return source_ptr.get(); }
const DictionaryLifetime & getLifetime() const override { return configuration.lifetime; }
const DictionaryStructure & getStructure() const override { return dict_struct; }
bool isInjective(const std::string & attribute_name) const override
{
return dict_struct.getAttribute(attribute_name).injective;
}
DictionaryKeyType getKeyType() const override { return dictionary_key_type; }
ColumnPtr getColumn(
const std::string& attribute_name,
const DataTypePtr & result_type,
const Columns & key_columns,
const DataTypes & key_types,
const ColumnPtr & default_values_column) const override;
ColumnUInt8::Ptr hasKeys(const Columns & key_columns, const DataTypes & key_types) const override;
bool hasHierarchy() const override { return dictionary_key_type == DictionaryKeyType::Simple && dict_struct.hierarchical_attribute_index.has_value(); }
ColumnPtr getHierarchy(ColumnPtr key_column, const DataTypePtr & hierarchy_attribute_type) const override;
ColumnUInt8::Ptr isInHierarchy(
ColumnPtr key_column,
ColumnPtr in_key_column,
const DataTypePtr & key_type) const override;
ColumnPtr getDescendants(
ColumnPtr key_column,
const DataTypePtr & key_type,
size_t level) const override;
Pipe read(const Names & column_names, size_t max_block_size) const override;
private:
using KeyContainerType = std::conditional_t<
dictionary_key_type == DictionaryKeyType::Simple,
HashMap<UInt64, size_t>,
HashMapWithSavedHash<StringRef, size_t, DefaultHash<StringRef>>>;
template <typename Value>
using AttributeContainerType = std::conditional_t<std::is_same_v<Value, Array>, std::vector<Value>, PaddedPODArray<Value>>;
struct Attribute final
{
AttributeUnderlyingType type;
std::variant<
AttributeContainerType<UInt8>,
AttributeContainerType<UInt16>,
AttributeContainerType<UInt32>,
AttributeContainerType<UInt64>,
AttributeContainerType<UInt128>,
AttributeContainerType<UInt256>,
AttributeContainerType<Int8>,
AttributeContainerType<Int16>,
AttributeContainerType<Int32>,
AttributeContainerType<Int64>,
AttributeContainerType<Int128>,
AttributeContainerType<Int256>,
AttributeContainerType<Decimal32>,
AttributeContainerType<Decimal64>,
AttributeContainerType<Decimal128>,
AttributeContainerType<Decimal256>,
AttributeContainerType<Float32>,
AttributeContainerType<Float64>,
AttributeContainerType<UUID>,
AttributeContainerType<StringRef>,
AttributeContainerType<Array>>
container;
std::optional<std::vector<bool>> is_index_null;
std::unique_ptr<Arena> string_arena;
};
struct KeyAttribute final
{
KeyContainerType container;
};
void createAttributes();
void blockToAttributes(const Block & block);
void updateData();
void loadData();
void calculateBytesAllocated();
template <typename AttributeType, bool is_nullable, typename ValueSetter, typename DefaultValueExtractor>
void getItemsImpl(
const Attribute & attribute,
DictionaryKeysExtractor<dictionary_key_type> & keys_extractor,
ValueSetter && set_value,
DefaultValueExtractor & default_value_extractor) const;
template <typename GetContainerFunc>
void getAttributeContainer(size_t attribute_index, GetContainerFunc && get_container_func);
template <typename GetContainerFunc>
void getAttributeContainer(size_t attribute_index, GetContainerFunc && get_container_func) const;
void resize(size_t added_rows);
StringRef copyKeyInArena(StringRef key);
const DictionaryStructure dict_struct;
const DictionarySourcePtr source_ptr;
const HashedArrayDictionaryStorageConfiguration configuration;
std::vector<Attribute> attributes;
KeyAttribute key_attribute;
size_t bytes_allocated = 0;
size_t element_count = 0;
size_t bucket_count = 0;
mutable std::atomic<size_t> query_count{0};
mutable std::atomic<size_t> found_count{0};
BlockPtr update_field_loaded_block;
Arena complex_key_arena;
};
extern template class HashedArrayDictionary<DictionaryKeyType::Simple>;
extern template class HashedArrayDictionary<DictionaryKeyType::Complex>;
}

Some files were not shown because too many files have changed in this diff Show More