Merge branch 'master' into csv_unquoted_nulls_and_default_values

This commit is contained in:
tavplubix 2019-07-19 17:03:34 +03:00 committed by GitHub
commit 349d69c849
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
77 changed files with 1170 additions and 442 deletions

View File

@ -182,6 +182,11 @@ else ()
set (CXX_FLAGS_INTERNAL_COMPILER "-std=c++1z") set (CXX_FLAGS_INTERNAL_COMPILER "-std=c++1z")
endif () endif ()
if (COMPILER_GCC OR COMPILER_CLANG)
# Enable C++14 sized global deallocation functions. It should be enabled by setting -std=c++14 but I'm not sure.
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fsized-deallocation")
endif ()
option(WITH_COVERAGE "Build with coverage." 0) option(WITH_COVERAGE "Build with coverage." 0)
if(WITH_COVERAGE AND COMPILER_CLANG) if(WITH_COVERAGE AND COMPILER_CLANG)
set(COMPILER_FLAGS "${COMPILER_FLAGS} -fprofile-instr-generate -fcoverage-mapping") set(COMPILER_FLAGS "${COMPILER_FLAGS} -fprofile-instr-generate -fcoverage-mapping")
@ -220,7 +225,6 @@ endif ()
set(THREADS_PREFER_PTHREAD_FLAG ON) set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package (Threads) find_package (Threads)
include (cmake/find_cxxabi.cmake)
include (cmake/find_cxx.cmake) include (cmake/find_cxx.cmake)
include (cmake/test_compiler.cmake) include (cmake/test_compiler.cmake)
@ -404,6 +408,11 @@ if (UNBUNDLED OR NOT (OS_LINUX OR APPLE) OR ARCH_32)
option (NO_WERROR "Disable -Werror compiler option" ON) option (NO_WERROR "Disable -Werror compiler option" ON)
endif () endif ()
if (USE_LIBCXX)
set (HAVE_LIBCXX 1)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -stdlib=libc++")
endif()
if (USE_LIBCXX AND USE_INTERNAL_LIBCXX_LIBRARY) if (USE_LIBCXX AND USE_INTERNAL_LIBCXX_LIBRARY)
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -nostdinc++ -isystem ${LIBCXX_INCLUDE_DIR} -isystem ${LIBCXXABI_INCLUDE_DIR}") set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -nostdinc++ -isystem ${LIBCXX_INCLUDE_DIR} -isystem ${LIBCXXABI_INCLUDE_DIR}")
endif () endif ()

View File

@ -1,23 +1,26 @@
if (NOT APPLE) if (NOT APPLE)
option (USE_INTERNAL_LIBCXX_LIBRARY "Set to FALSE to use system libcxx library instead of bundled" ${NOT_UNBUNDLED}) option (USE_INTERNAL_LIBCXX_LIBRARY "Set to FALSE to use system libcxx and libcxxabi libraries instead of bundled" ${NOT_UNBUNDLED})
endif () endif ()
if (USE_INTERNAL_LIBCXX_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libcxx/include/vector") if (USE_INTERNAL_LIBCXX_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libcxx/include/vector")
message (WARNING "submodule contrib/libcxx is missing. to fix try run: \n git submodule update --init --recursive") message (WARNING "submodule contrib/libcxx is missing. to fix try run: \n git submodule update --init --recursive")
set (USE_INTERNAL_LIBCXX_LIBRARY 0) set (USE_INTERNAL_LIBCXX_LIBRARY 0)
endif ()
if (USE_INTERNAL_LIBCXX_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libcxxabi/src")
message (WARNING "submodule contrib/libcxxabi is missing. to fix try run: \n git submodule update --init --recursive")
set (USE_INTERNAL_LIBCXXABI_LIBRARY 0)
endif () endif ()
if (NOT USE_INTERNAL_LIBCXX_LIBRARY) if (NOT USE_INTERNAL_LIBCXX_LIBRARY)
find_library (LIBCXX_LIBRARY c++) find_library (LIBCXX_LIBRARY c++)
find_path (LIBCXX_INCLUDE_DIR NAMES vector PATHS ${LIBCXX_INCLUDE_PATHS}) find_library (LIBCXXABI_LIBRARY c++abi)
endif ()
if (LIBCXX_LIBRARY AND LIBCXX_INCLUDE_DIR)
else () else ()
set (LIBCXX_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcxx/include) set (LIBCXX_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcxx/include)
set (USE_INTERNAL_LIBCXX_LIBRARY 1) set (LIBCXXABI_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcxxabi/include)
set (LIBCXX_LIBRARY cxx_static) set (LIBCXX_LIBRARY cxx_static)
set (HAVE_LIBCXX 1) set (LIBCXXABI_LIBRARY cxxabi_static)
endif () endif ()
message (STATUS "Using libcxx: ${LIBCXX_INCLUDE_DIR} : ${LIBCXX_LIBRARY}") message (STATUS "Using libcxx: ${LIBCXX_LIBRARY}")
message (STATUS "Using libcxxabi: ${LIBCXXABI_LIBRARY}")

View File

@ -1,22 +0,0 @@
if (NOT APPLE)
option (USE_INTERNAL_LIBCXXABI_LIBRARY "Set to FALSE to use system libcxxabi library instead of bundled" ${NOT_UNBUNDLED})
endif ()
if (USE_INTERNAL_LIBCXXABI_LIBRARY AND NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/libcxxabi/src")
message (WARNING "submodule contrib/libcxxabi is missing. to fix try run: \n git submodule update --init --recursive")
set (USE_INTERNAL_LIBCXXABI_LIBRARY 0)
endif ()
if (NOT USE_INTERNAL_LIBCXXABI_LIBRARY)
find_library (LIBCXXABI_LIBRARY cxxabi)
find_path (LIBCXXABI_INCLUDE_DIR NAMES vector PATHS ${LIBCXXABI_INCLUDE_PATHS})
endif ()
if (LIBCXXABI_LIBRARY AND LIBCXXABI_INCLUDE_DIR)
else ()
set (LIBCXXABI_INCLUDE_DIR ${ClickHouse_SOURCE_DIR}/contrib/libcxxabi/include)
set (USE_INTERNAL_LIBCXXABI_LIBRARY 1)
set (LIBCXXABI_LIBRARY cxxabi_static)
endif ()
message (STATUS "Using libcxxabi: ${LIBCXXABI_INCLUDE_DIR} : ${LIBCXXABI_LIBRARY}")

View File

@ -15,12 +15,9 @@ if (USE_INTERNAL_UNWIND_LIBRARY)
add_subdirectory (libunwind-cmake) add_subdirectory (libunwind-cmake)
endif () endif ()
if (USE_LIBCXX AND USE_INTERNAL_LIBCXXABI_LIBRARY)
add_subdirectory(libcxxabi-cmake)
endif()
if (USE_LIBCXX AND USE_INTERNAL_LIBCXX_LIBRARY) if (USE_LIBCXX AND USE_INTERNAL_LIBCXX_LIBRARY)
add_subdirectory(libcxx-cmake) add_subdirectory(libcxx-cmake)
add_subdirectory(libcxxabi-cmake)
endif() endif()

View File

@ -15,7 +15,6 @@ ${JEMALLOC_SOURCE_DIR}/src/extent_mmap.c
${JEMALLOC_SOURCE_DIR}/src/hash.c ${JEMALLOC_SOURCE_DIR}/src/hash.c
${JEMALLOC_SOURCE_DIR}/src/hook.c ${JEMALLOC_SOURCE_DIR}/src/hook.c
${JEMALLOC_SOURCE_DIR}/src/jemalloc.c ${JEMALLOC_SOURCE_DIR}/src/jemalloc.c
${JEMALLOC_SOURCE_DIR}/src/jemalloc_cpp.cpp
${JEMALLOC_SOURCE_DIR}/src/large.c ${JEMALLOC_SOURCE_DIR}/src/large.c
${JEMALLOC_SOURCE_DIR}/src/log.c ${JEMALLOC_SOURCE_DIR}/src/log.c
${JEMALLOC_SOURCE_DIR}/src/malloc_io.c ${JEMALLOC_SOURCE_DIR}/src/malloc_io.c

View File

@ -385,6 +385,7 @@ endif()
if (USE_JEMALLOC) if (USE_JEMALLOC)
target_include_directories (dbms SYSTEM BEFORE PRIVATE ${JEMALLOC_INCLUDE_DIR}) # used in Interpreters/AsynchronousMetrics.cpp target_include_directories (dbms SYSTEM BEFORE PRIVATE ${JEMALLOC_INCLUDE_DIR}) # used in Interpreters/AsynchronousMetrics.cpp
target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${JEMALLOC_INCLUDE_DIR}) # new_delete.cpp
endif () endif ()
target_include_directories (dbms PUBLIC ${DBMS_INCLUDE_DIR} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/src/Formats/include) target_include_directories (dbms PUBLIC ${DBMS_INCLUDE_DIR} PRIVATE ${CMAKE_CURRENT_BINARY_DIR}/src/Formats/include)

View File

@ -28,6 +28,7 @@
#include <Storages/ColumnDefault.h> #include <Storages/ColumnDefault.h>
#include <DataTypes/DataTypeLowCardinality.h> #include <DataTypes/DataTypeLowCardinality.h>
#include <Compression/CompressionFactory.h> #include <Compression/CompressionFactory.h>
#include <common/logger_useful.h>
#include <Processors/Formats/LazyOutputFormat.h> #include <Processors/Formats/LazyOutputFormat.h>
@ -172,12 +173,13 @@ void TCPHandler::runImpl()
send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace; send_exception_with_stack_trace = query_context->getSettingsRef().calculate_text_stack_trace;
/// Should we send internal logs to client? /// Should we send internal logs to client?
const auto client_logs_level = query_context->getSettingsRef().send_logs_level;
if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS if (client_revision >= DBMS_MIN_REVISION_WITH_SERVER_LOGS
&& query_context->getSettingsRef().send_logs_level.value != LogsLevel::none) && client_logs_level.value != LogsLevel::none)
{ {
state.logs_queue = std::make_shared<InternalTextLogsQueue>(); state.logs_queue = std::make_shared<InternalTextLogsQueue>();
state.logs_queue->max_priority = Poco::Logger::parseLevel(query_context->getSettingsRef().send_logs_level.toString()); state.logs_queue->max_priority = Poco::Logger::parseLevel(client_logs_level.toString());
CurrentThread::attachInternalTextLogsQueue(state.logs_queue); CurrentThread::attachInternalTextLogsQueue(state.logs_queue, client_logs_level.value);
} }
query_context->setExternalTablesInitializer([&global_settings, this] (Context & context) query_context->setExternalTablesInitializer([&global_settings, this] (Context & context)

View File

@ -1,7 +1,6 @@
#pragma once #pragma once
#include <Common/HashTable/Hash.h> #include <Common/HashTable/Hash.h>
#include <Common/MemoryTracker.h>
#include <Common/PODArray.h> #include <Common/PODArray.h>
#include <IO/ReadBuffer.h> #include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h> #include <IO/WriteBuffer.h>
@ -513,8 +512,6 @@ private:
void mediumToLarge() void mediumToLarge()
{ {
CurrentMemoryTracker::alloc(sizeof(detail::QuantileTimingLarge));
/// While the data is copied from medium, it is not possible to set `large` value (otherwise it will overwrite some data). /// While the data is copied from medium, it is not possible to set `large` value (otherwise it will overwrite some data).
detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge; detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge;
@ -528,8 +525,6 @@ private:
void tinyToLarge() void tinyToLarge()
{ {
CurrentMemoryTracker::alloc(sizeof(detail::QuantileTimingLarge));
/// While the data is copied from `medium` it is not possible to set `large` value (otherwise it will overwrite some data). /// While the data is copied from `medium` it is not possible to set `large` value (otherwise it will overwrite some data).
detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge; detail::QuantileTimingLarge * tmp_large = new detail::QuantileTimingLarge;
@ -562,8 +557,6 @@ public:
else if (kind == Kind::Large) else if (kind == Kind::Large)
{ {
delete large; delete large;
CurrentMemoryTracker::free(sizeof(detail::QuantileTimingLarge));
} }
} }

View File

@ -108,13 +108,92 @@ class AllocatorWithHint : Hint
{ {
protected: protected:
static constexpr bool clear_memory = clear_memory_; static constexpr bool clear_memory = clear_memory_;
static constexpr size_t small_memory_threshold = mmap_threshold;
public: public:
/// Allocate memory range. /// Allocate memory range.
void * alloc(size_t size, size_t alignment = 0) void * alloc(size_t size, size_t alignment = 0)
{ {
CurrentMemoryTracker::alloc(size); CurrentMemoryTracker::alloc(size);
return allocNoTrack(size, alignment);
}
/// Free memory range.
void free(void * buf, size_t size)
{
freeNoTrack(buf, size);
CurrentMemoryTracker::free(size);
}
/** Enlarge memory range.
* Data from old range is moved to the beginning of new range.
* Address of memory range could change.
*/
void * realloc(void * buf, size_t old_size, size_t new_size, size_t alignment = 0)
{
if (old_size == new_size)
{
/// nothing to do.
/// BTW, it's not possible to change alignment while doing realloc.
}
else if (old_size < mmap_threshold && new_size < mmap_threshold && alignment <= MALLOC_MIN_ALIGNMENT)
{
/// Resize malloc'd memory region with no special alignment requirement.
CurrentMemoryTracker::realloc(old_size, new_size);
void * new_buf = ::realloc(buf, new_size);
if (nullptr == new_buf)
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
buf = new_buf;
if constexpr (clear_memory)
if (new_size > old_size)
memset(reinterpret_cast<char *>(buf) + old_size, 0, new_size - old_size);
}
else if (old_size >= mmap_threshold && new_size >= mmap_threshold)
{
/// Resize mmap'd memory region.
CurrentMemoryTracker::realloc(old_size, new_size);
// On apple and freebsd self-implemented mremap used (common/mremap.h)
buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_MREMAP);
/// No need for zero-fill, because mmap guarantees it.
}
else if (new_size < small_memory_threshold)
{
/// Small allocs that requires a copy. Assume there's enough memory in system. Call CurrentMemoryTracker once.
CurrentMemoryTracker::realloc(old_size, new_size);
void * new_buf = allocNoTrack(new_size, alignment);
memcpy(new_buf, buf, std::min(old_size, new_size));
freeNoTrack(buf, old_size);
buf = new_buf;
}
else
{
/// Big allocs that requires a copy. MemoryTracker is called inside 'alloc', 'free' methods.
void * new_buf = alloc(new_size, alignment);
memcpy(new_buf, buf, std::min(old_size, new_size));
free(buf, old_size);
buf = new_buf;
}
return buf;
}
protected:
static constexpr size_t getStackThreshold()
{
return 0;
}
private:
void * allocNoTrack(size_t size, size_t alignment)
{
void * buf; void * buf;
if (size >= mmap_threshold) if (size >= mmap_threshold)
@ -149,15 +228,14 @@ public:
if (0 != res) if (0 != res)
DB::throwFromErrno("Cannot allocate memory (posix_memalign) " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY, res); DB::throwFromErrno("Cannot allocate memory (posix_memalign) " + formatReadableSizeWithBinarySuffix(size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY, res);
if (clear_memory) if constexpr (clear_memory)
memset(buf, 0, size); memset(buf, 0, size);
} }
} }
return buf; return buf;
} }
/// Free memory range. void freeNoTrack(void * buf, size_t size)
void free(void * buf, size_t size)
{ {
if (size >= mmap_threshold) if (size >= mmap_threshold)
{ {
@ -168,63 +246,6 @@ public:
{ {
::free(buf); ::free(buf);
} }
CurrentMemoryTracker::free(size);
}
/** Enlarge memory range.
* Data from old range is moved to the beginning of new range.
* Address of memory range could change.
*/
void * realloc(void * buf, size_t old_size, size_t new_size, size_t alignment = 0)
{
if (old_size == new_size)
{
/// nothing to do.
/// BTW, it's not possible to change alignment while doing realloc.
}
else if (old_size < mmap_threshold && new_size < mmap_threshold && alignment <= MALLOC_MIN_ALIGNMENT)
{
/// Resize malloc'd memory region with no special alignment requirement.
CurrentMemoryTracker::realloc(old_size, new_size);
void * new_buf = ::realloc(buf, new_size);
if (nullptr == new_buf)
DB::throwFromErrno("Allocator: Cannot realloc from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_ALLOCATE_MEMORY);
buf = new_buf;
if (clear_memory && new_size > old_size)
memset(reinterpret_cast<char *>(buf) + old_size, 0, new_size - old_size);
}
else if (old_size >= mmap_threshold && new_size >= mmap_threshold)
{
/// Resize mmap'd memory region.
CurrentMemoryTracker::realloc(old_size, new_size);
// On apple and freebsd self-implemented mremap used (common/mremap.h)
buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
if (MAP_FAILED == buf)
DB::throwFromErrno("Allocator: Cannot mremap memory chunk from " + formatReadableSizeWithBinarySuffix(old_size) + " to " + formatReadableSizeWithBinarySuffix(new_size) + ".", DB::ErrorCodes::CANNOT_MREMAP);
/// No need for zero-fill, because mmap guarantees it.
}
else
{
/// All other cases that requires a copy. MemoryTracker is called inside 'alloc', 'free' methods.
void * new_buf = alloc(new_size, alignment);
memcpy(new_buf, buf, std::min(old_size, new_size));
free(buf, old_size);
buf = new_buf;
}
return buf;
}
protected:
static constexpr size_t getStackThreshold()
{
return 0;
} }
}; };
@ -267,7 +288,7 @@ public:
{ {
if (size <= N) if (size <= N)
{ {
if (Base::clear_memory) if constexpr (Base::clear_memory)
memset(stack_memory, 0, N); memset(stack_memory, 0, N);
return stack_memory; return stack_memory;
} }

View File

@ -3,7 +3,6 @@
#include <Common/HashTable/SmallTable.h> #include <Common/HashTable/SmallTable.h>
#include <Common/HashTable/HashSet.h> #include <Common/HashTable/HashSet.h>
#include <Common/HyperLogLogCounter.h> #include <Common/HyperLogLogCounter.h>
#include <Common/MemoryTracker.h>
#include <Core/Defines.h> #include <Core/Defines.h>
@ -230,7 +229,6 @@ private:
if (getContainerType() != details::ContainerType::SMALL) if (getContainerType() != details::ContainerType::SMALL)
throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR); throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR);
CurrentMemoryTracker::alloc(sizeof(Medium));
auto tmp_medium = std::make_unique<Medium>(); auto tmp_medium = std::make_unique<Medium>();
for (const auto & x : small) for (const auto & x : small)
@ -247,7 +245,6 @@ private:
if ((container_type != details::ContainerType::SMALL) && (container_type != details::ContainerType::MEDIUM)) if ((container_type != details::ContainerType::SMALL) && (container_type != details::ContainerType::MEDIUM))
throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR); throw Poco::Exception("Internal error", ErrorCodes::LOGICAL_ERROR);
CurrentMemoryTracker::alloc(sizeof(Large));
auto tmp_large = std::make_unique<Large>(); auto tmp_large = std::make_unique<Large>();
if (container_type == details::ContainerType::SMALL) if (container_type == details::ContainerType::SMALL)
@ -277,15 +274,11 @@ private:
{ {
delete medium; delete medium;
medium = nullptr; medium = nullptr;
CurrentMemoryTracker::free(sizeof(Medium));
} }
else if (container_type == details::ContainerType::LARGE) else if (container_type == details::ContainerType::LARGE)
{ {
delete large; delete large;
large = nullptr; large = nullptr;
CurrentMemoryTracker::free(sizeof(Large));
} }
} }

View File

@ -46,6 +46,12 @@ MemoryTracker * CurrentThread::getMemoryTracker()
return &current_thread->memory_tracker; return &current_thread->memory_tracker;
} }
Int64 & CurrentThread::getUntrackedMemory()
{
/// It assumes that (current_thread != nullptr) is already checked with getMemoryTracker()
return current_thread->untracked_memory;
}
void CurrentThread::updateProgressIn(const Progress & value) void CurrentThread::updateProgressIn(const Progress & value)
{ {
if (unlikely(!current_thread)) if (unlikely(!current_thread))
@ -60,11 +66,12 @@ void CurrentThread::updateProgressOut(const Progress & value)
current_thread->progress_out.incrementPiecewiseAtomically(value); current_thread->progress_out.incrementPiecewiseAtomically(value);
} }
void CurrentThread::attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue) void CurrentThread::attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue,
LogsLevel client_logs_level)
{ {
if (unlikely(!current_thread)) if (unlikely(!current_thread))
return; return;
current_thread->attachInternalTextLogsQueue(logs_queue); current_thread->attachInternalTextLogsQueue(logs_queue, client_logs_level);
} }
std::shared_ptr<InternalTextLogsQueue> CurrentThread::getInternalTextLogsQueue() std::shared_ptr<InternalTextLogsQueue> CurrentThread::getInternalTextLogsQueue()

View File

@ -39,7 +39,8 @@ public:
static ThreadGroupStatusPtr getGroup(); static ThreadGroupStatusPtr getGroup();
/// A logs queue used by TCPHandler to pass logs to a client /// A logs queue used by TCPHandler to pass logs to a client
static void attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue); static void attachInternalTextLogsQueue(const std::shared_ptr<InternalTextLogsQueue> & logs_queue,
LogsLevel client_logs_level);
static std::shared_ptr<InternalTextLogsQueue> getInternalTextLogsQueue(); static std::shared_ptr<InternalTextLogsQueue> getInternalTextLogsQueue();
/// Makes system calls to update ProfileEvents that contain info from rusage and taskstats /// Makes system calls to update ProfileEvents that contain info from rusage and taskstats
@ -47,6 +48,7 @@ public:
static ProfileEvents::Counters & getProfileEvents(); static ProfileEvents::Counters & getProfileEvents();
static MemoryTracker * getMemoryTracker(); static MemoryTracker * getMemoryTracker();
static Int64 & getUntrackedMemory();
/// Update read and write rows (bytes) statistics (used in system.query_thread_log) /// Update read and write rows (bytes) statistics (used in system.query_thread_log)
static void updateProgressIn(const Progress & value); static void updateProgressIn(const Progress & value);

View File

@ -4,7 +4,6 @@
#include <Common/HyperLogLogCounter.h> #include <Common/HyperLogLogCounter.h>
#include <Common/HashTable/SmallTable.h> #include <Common/HashTable/SmallTable.h>
#include <Common/MemoryTracker.h>
namespace DB namespace DB
@ -39,8 +38,6 @@ private:
void toLarge() void toLarge()
{ {
CurrentMemoryTracker::alloc(sizeof(Large));
/// At the time of copying data from `tiny`, setting the value of `large` is still not possible (otherwise it will overwrite some data). /// At the time of copying data from `tiny`, setting the value of `large` is still not possible (otherwise it will overwrite some data).
Large * tmp_large = new Large; Large * tmp_large = new Large;
@ -56,11 +53,7 @@ public:
~HyperLogLogWithSmallSetOptimization() ~HyperLogLogWithSmallSetOptimization()
{ {
if (isLarge()) if (isLarge())
{
delete large; delete large;
CurrentMemoryTracker::free(sizeof(Large));
}
} }
void insert(Key value) void insert(Key value)

View File

@ -1,3 +1,5 @@
#include <cstdlib>
#include "MemoryTracker.h" #include "MemoryTracker.h"
#include <common/likely.h> #include <common/likely.h>
#include <common/logger_useful.h> #include <common/logger_useful.h>
@ -17,6 +19,8 @@ namespace DB
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30; static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
/// Each thread could new/delete memory in range of (-untracked_memory_limit, untracked_memory_limit) without access to common counters.
static constexpr Int64 untracked_memory_limit = 4 * 1024 * 1024;
MemoryTracker::~MemoryTracker() MemoryTracker::~MemoryTracker()
@ -85,6 +89,9 @@ void MemoryTracker::alloc(Int64 size)
{ {
free(size); free(size);
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
auto untrack_lock = blocker.cancel();
std::stringstream message; std::stringstream message;
message << "Memory tracker"; message << "Memory tracker";
if (description) if (description)
@ -100,6 +107,9 @@ void MemoryTracker::alloc(Int64 size)
{ {
free(size); free(size);
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
auto untrack_lock = blocker.cancel();
std::stringstream message; std::stringstream message;
message << "Memory limit"; message << "Memory limit";
if (description) if (description)
@ -191,19 +201,41 @@ namespace CurrentMemoryTracker
void alloc(Int64 size) void alloc(Int64 size)
{ {
if (auto memory_tracker = DB::CurrentThread::getMemoryTracker()) if (auto memory_tracker = DB::CurrentThread::getMemoryTracker())
memory_tracker->alloc(size); {
Int64 & untracked = DB::CurrentThread::getUntrackedMemory();
untracked += size;
if (untracked > untracked_memory_limit)
{
/// Zero untracked before track. If tracker throws out-of-limit we would be able to alloc up to untracked_memory_limit bytes
/// more. It could be usefull for enlarge Exception message in rethrow logic.
Int64 tmp = untracked;
untracked = 0;
memory_tracker->alloc(tmp);
}
}
} }
void realloc(Int64 old_size, Int64 new_size) void realloc(Int64 old_size, Int64 new_size)
{ {
if (auto memory_tracker = DB::CurrentThread::getMemoryTracker()) Int64 addition = new_size - old_size;
memory_tracker->alloc(new_size - old_size); if (addition > 0)
alloc(addition);
else
free(-addition);
} }
void free(Int64 size) void free(Int64 size)
{ {
if (auto memory_tracker = DB::CurrentThread::getMemoryTracker()) if (auto memory_tracker = DB::CurrentThread::getMemoryTracker())
memory_tracker->free(size); {
Int64 & untracked = DB::CurrentThread::getUntrackedMemory();
untracked -= size;
if (untracked < -untracked_memory_limit)
{
memory_tracker->free(-untracked);
untracked = 0;
}
}
} }
} }

View File

@ -45,7 +45,11 @@ public:
void realloc(Int64 old_size, Int64 new_size) void realloc(Int64 old_size, Int64 new_size)
{ {
alloc(new_size - old_size); Int64 addition = new_size - old_size;
if (addition > 0)
alloc(addition);
else
free(-addition);
} }
/** This function should be called after memory deallocation. /** This function should be called after memory deallocation.

View File

@ -34,6 +34,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR; extern const int LOGICAL_ERROR;
} }
// Replace NLMSG_OK with explicit casts since that system macro contains signedness bugs which are not going to be fixed.
static inline bool is_nlmsg_ok(const struct nlmsghdr * const nlh, const ssize_t len)
{
return len >= static_cast<ssize_t>(sizeof(*nlh)) && nlh->nlmsg_len >= sizeof(*nlh) && static_cast<size_t>(len) >= nlh->nlmsg_len;
}
namespace namespace
{ {
@ -128,7 +133,7 @@ struct NetlinkMessage
if (header.nlmsg_type == NLMSG_ERROR) if (header.nlmsg_type == NLMSG_ERROR)
throw Exception("Can't receive Netlink response: error " + std::to_string(error.error), ErrorCodes::NETLINK_ERROR); throw Exception("Can't receive Netlink response: error " + std::to_string(error.error), ErrorCodes::NETLINK_ERROR);
if (!NLMSG_OK((&header), bytes_received)) if (!is_nlmsg_ok(&header, bytes_received))
throw Exception("Can't receive Netlink response: wrong number of bytes received", ErrorCodes::NETLINK_ERROR); throw Exception("Can't receive Netlink response: wrong number of bytes received", ErrorCodes::NETLINK_ERROR);
} }
}; };

View File

@ -50,6 +50,19 @@ ThreadStatus::ThreadStatus()
ThreadStatus::~ThreadStatus() ThreadStatus::~ThreadStatus()
{ {
try
{
if (untracked_memory > 0)
memory_tracker.alloc(untracked_memory);
else
memory_tracker.free(-untracked_memory);
}
catch (const DB::Exception &)
{
/// It's a minor tracked memory leak here (not the memory itself but it's counter).
/// We've already allocated a little bit more then the limit and cannot track it in the thread memory tracker or its parent.
}
if (deleter) if (deleter)
deleter(); deleter();
current_thread = nullptr; current_thread = nullptr;
@ -117,7 +130,8 @@ void ThreadStatus::assertState(const std::initializer_list<int> & permitted_stat
throw Exception(ss.str(), ErrorCodes::LOGICAL_ERROR); throw Exception(ss.str(), ErrorCodes::LOGICAL_ERROR);
} }
void ThreadStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue) void ThreadStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue,
LogsLevel client_logs_level)
{ {
logs_queue_ptr = logs_queue; logs_queue_ptr = logs_queue;
@ -126,6 +140,7 @@ void ThreadStatus::attachInternalTextLogsQueue(const InternalTextLogsQueuePtr &
std::lock_guard lock(thread_group->mutex); std::lock_guard lock(thread_group->mutex);
thread_group->logs_queue_ptr = logs_queue; thread_group->logs_queue_ptr = logs_queue;
thread_group->client_logs_level = client_logs_level;
} }
} }

View File

@ -4,6 +4,8 @@
#include <Common/ProfileEvents.h> #include <Common/ProfileEvents.h>
#include <Common/MemoryTracker.h> #include <Common/MemoryTracker.h>
#include <Core/SettingsCommon.h>
#include <IO/Progress.h> #include <IO/Progress.h>
#include <memory> #include <memory>
@ -62,6 +64,8 @@ public:
UInt32 master_thread_number = 0; UInt32 master_thread_number = 0;
Int32 master_thread_os_id = -1; Int32 master_thread_os_id = -1;
LogsLevel client_logs_level = LogsLevel::none;
String query; String query;
}; };
@ -92,6 +96,8 @@ public:
/// TODO: merge them into common entity /// TODO: merge them into common entity
ProfileEvents::Counters performance_counters{VariableContext::Thread}; ProfileEvents::Counters performance_counters{VariableContext::Thread};
MemoryTracker memory_tracker{VariableContext::Thread}; MemoryTracker memory_tracker{VariableContext::Thread};
/// Small amount of untracked memory (per thread atomic-less counter)
Int64 untracked_memory = 0;
/// Statistics of read and write rows/bytes /// Statistics of read and write rows/bytes
Progress progress_in; Progress progress_in;
@ -130,7 +136,8 @@ public:
return thread_state == Died ? nullptr : logs_queue_ptr.lock(); return thread_state == Died ? nullptr : logs_queue_ptr.lock();
} }
void attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue); void attachInternalTextLogsQueue(const InternalTextLogsQueuePtr & logs_queue,
LogsLevel client_logs_level);
/// Sets query context for current thread and its thread group /// Sets query context for current thread and its thread group
/// NOTE: query_context have to be alive until detachQuery() is called /// NOTE: query_context have to be alive until detachQuery() is called

View File

@ -0,0 +1,143 @@
#include <new>
#include <common/config_common.h>
#include <common/memory.h>
#include <Common/MemoryTracker.h>
/// Replace default new/delete with memory tracking versions.
/// @sa https://en.cppreference.com/w/cpp/memory/new/operator_new
/// https://en.cppreference.com/w/cpp/memory/new/operator_delete
#if NOT_UNBUNDLED
namespace Memory
{
ALWAYS_INLINE void trackMemory(std::size_t size)
{
#if USE_JEMALLOC
/// The nallocx() function allocates no memory, but it performs the same size computation as the mallocx() function
/// @note je_mallocx() != je_malloc(). It's expected they don't differ much in allocation logic.
if (likely(size != 0))
CurrentMemoryTracker::alloc(nallocx(size, 0));
#else
CurrentMemoryTracker::alloc(size);
#endif
}
ALWAYS_INLINE bool trackMemoryNoExept(std::size_t size) noexcept
{
try
{
trackMemory(size);
}
catch (...)
{
return false;
}
return true;
}
ALWAYS_INLINE void untrackMemory(void * ptr [[maybe_unused]], std::size_t size [[maybe_unused]] = 0) noexcept
{
try
{
#if USE_JEMALLOC
/// @note It's also possible to use je_malloc_usable_size() here.
if (likely(ptr != nullptr))
CurrentMemoryTracker::free(sallocx(ptr, 0));
#else
if (size)
CurrentMemoryTracker::free(size);
#endif
}
catch (...)
{}
}
}
/// new
void * operator new(std::size_t size)
{
Memory::trackMemory(size);
return Memory::newImpl(size);
}
void * operator new[](std::size_t size)
{
Memory::trackMemory(size);
return Memory::newImpl(size);
}
void * operator new(std::size_t size, const std::nothrow_t &) noexcept
{
if (likely(Memory::trackMemoryNoExept(size)))
return Memory::newNoExept(size);
return nullptr;
}
void * operator new[](std::size_t size, const std::nothrow_t &) noexcept
{
if (likely(Memory::trackMemoryNoExept(size)))
return Memory::newNoExept(size);
return nullptr;
}
/// delete
/// C++17 std 21.6.2.1 (11)
/// If a function without a size parameter is defined, the program should also define the corresponding function with a size parameter.
/// If a function with a size parameter is defined, the program shall also define the corresponding version without the size parameter.
/// cppreference:
/// It's unspecified whether size-aware or size-unaware version is called when deleting objects of
/// incomplete type and arrays of non-class and trivially-destructible class types.
void operator delete(void * ptr) noexcept
{
Memory::untrackMemory(ptr);
Memory::deleteImpl(ptr);
}
void operator delete[](void * ptr) noexcept
{
Memory::untrackMemory(ptr);
Memory::deleteImpl(ptr);
}
void operator delete(void * ptr, std::size_t size) noexcept
{
Memory::untrackMemory(ptr, size);
Memory::deleteSized(ptr, size);
}
void operator delete[](void * ptr, std::size_t size) noexcept
{
Memory::untrackMemory(ptr, size);
Memory::deleteSized(ptr, size);
}
#else
/// new
void * operator new(std::size_t size) { return Memory::newImpl(size); }
void * operator new[](std::size_t size) { return Memory::newImpl(size); }
void * operator new(std::size_t size, const std::nothrow_t &) noexcept { return Memory::newNoExept(size); }
void * operator new[](std::size_t size, const std::nothrow_t &) noexcept { return Memory::newNoExept(size); }
/// delete
void operator delete(void * ptr) noexcept { Memory::deleteImpl(ptr); }
void operator delete[](void * ptr) noexcept { Memory::deleteImpl(ptr); }
void operator delete(void * ptr, const std::nothrow_t &) noexcept { Memory::deleteImpl(ptr); }
void operator delete[](void * ptr, const std::nothrow_t &) noexcept { Memory::deleteImpl(ptr); }
void operator delete(void * ptr, std::size_t size) noexcept { Memory::deleteSized(ptr, size); }
void operator delete[](void * ptr, std::size_t size) noexcept { Memory::deleteSized(ptr, size); }
#endif

View File

@ -334,7 +334,11 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, allow_simdjson, true, "Allow using simdjson library in 'JSON*' functions if AVX2 instructions are available. If disabled rapidjson will be used.") \ M(SettingBool, allow_simdjson, true, "Allow using simdjson library in 'JSON*' functions if AVX2 instructions are available. If disabled rapidjson will be used.") \
\ \
M(SettingUInt64, max_partitions_per_insert_block, 100, "Limit maximum number of partitions in single INSERTed block. Zero means unlimited. Throw exception if the block contains too many partitions. This setting is a safety threshold, because using large number of partitions is a common misconception.") \ M(SettingUInt64, max_partitions_per_insert_block, 100, "Limit maximum number of partitions in single INSERTed block. Zero means unlimited. Throw exception if the block contains too many partitions. This setting is a safety threshold, because using large number of partitions is a common misconception.") \
M(SettingBool, check_query_single_value_result, true, "Return check query result as single 1/0 value") M(SettingBool, check_query_single_value_result, true, "Return check query result as single 1/0 value") \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\
M(SettingBool, allow_experimental_low_cardinality_type, true, "Obsolete setting, does nothing. Will be removed after 2019-08-13")
DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS) DECLARE_SETTINGS_COLLECTION(LIST_OF_SETTINGS)

View File

@ -30,8 +30,6 @@ public:
const DictionaryLifetime dict_lifetime, const DictionaryLifetime dict_lifetime,
const size_t size); const size_t size);
std::exception_ptr getCreationException() const override { return {}; }
std::string getName() const override { return name; } std::string getName() const override { return name; }
std::string getTypeName() const override { return "Cache"; } std::string getTypeName() const override { return "Cache"; }
@ -62,8 +60,6 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; } const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override bool isInjective(const std::string & attribute_name) const override
{ {
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective; return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;
@ -284,8 +280,6 @@ private:
mutable std::atomic<size_t> element_count{0}; mutable std::atomic<size_t> element_count{0};
mutable std::atomic<size_t> hit_count{0}; mutable std::atomic<size_t> hit_count{0};
mutable std::atomic<size_t> query_count{0}; mutable std::atomic<size_t> query_count{0};
const std::chrono::time_point<std::chrono::system_clock> creation_time = std::chrono::system_clock::now();
}; };
} }

View File

@ -50,8 +50,6 @@ public:
std::string getKeyDescription() const { return key_description; } std::string getKeyDescription() const { return key_description; }
std::exception_ptr getCreationException() const override { return {}; }
std::string getName() const override { return name; } std::string getName() const override { return name; }
std::string getTypeName() const override { return "ComplexKeyCache"; } std::string getTypeName() const override { return "ComplexKeyCache"; }
@ -86,8 +84,6 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; } const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override bool isInjective(const std::string & attribute_name) const override
{ {
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective; return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;

View File

@ -29,18 +29,8 @@ ComplexKeyHashedDictionary::ComplexKeyHashedDictionary(
, saved_block{std::move(saved_block)} , saved_block{std::move(saved_block)}
{ {
createAttributes(); createAttributes();
loadData();
try calculateBytesAllocated();
{
loadData();
calculateBytesAllocated();
}
catch (...)
{
creation_exception = std::current_exception();
}
creation_time = std::chrono::system_clock::now();
} }
#define DECLARE(TYPE) \ #define DECLARE(TYPE) \

View File

@ -32,8 +32,6 @@ public:
std::string getKeyDescription() const { return key_description; } std::string getKeyDescription() const { return key_description; }
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return name; } std::string getName() const override { return name; }
std::string getTypeName() const override { return "ComplexKeyHashed"; } std::string getTypeName() const override { return "ComplexKeyHashed"; }
@ -61,8 +59,6 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; } const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override bool isInjective(const std::string & attribute_name) const override
{ {
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective; return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;
@ -255,10 +251,6 @@ private:
size_t bucket_count = 0; size_t bucket_count = 0;
mutable std::atomic<size_t> query_count{0}; mutable std::atomic<size_t> query_count{0};
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
BlockPtr saved_block; BlockPtr saved_block;
}; };

View File

@ -36,18 +36,8 @@ FlatDictionary::FlatDictionary(
, saved_block{std::move(saved_block)} , saved_block{std::move(saved_block)}
{ {
createAttributes(); createAttributes();
loadData();
try calculateBytesAllocated();
{
loadData();
calculateBytesAllocated();
}
catch (...)
{
creation_exception = std::current_exception();
}
creation_time = std::chrono::system_clock::now();
} }

View File

@ -29,8 +29,6 @@ public:
bool require_nonempty, bool require_nonempty,
BlockPtr saved_block = nullptr); BlockPtr saved_block = nullptr);
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return name; } std::string getName() const override { return name; }
std::string getTypeName() const override { return "Flat"; } std::string getTypeName() const override { return "Flat"; }
@ -58,8 +56,6 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; } const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override bool isInjective(const std::string & attribute_name) const override
{ {
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective; return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;
@ -244,10 +240,6 @@ private:
size_t bucket_count = 0; size_t bucket_count = 0;
mutable std::atomic<size_t> query_count{0}; mutable std::atomic<size_t> query_count{0};
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
BlockPtr saved_block; BlockPtr saved_block;
}; };

View File

@ -30,18 +30,8 @@ HashedDictionary::HashedDictionary(
, saved_block{std::move(saved_block)} , saved_block{std::move(saved_block)}
{ {
createAttributes(); createAttributes();
loadData();
try calculateBytesAllocated();
{
loadData();
calculateBytesAllocated();
}
catch (...)
{
creation_exception = std::current_exception();
}
creation_time = std::chrono::system_clock::now();
} }

View File

@ -28,8 +28,6 @@ public:
bool require_nonempty, bool require_nonempty,
BlockPtr saved_block = nullptr); BlockPtr saved_block = nullptr);
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return name; } std::string getName() const override { return name; }
std::string getTypeName() const override { return "Hashed"; } std::string getTypeName() const override { return "Hashed"; }
@ -57,8 +55,6 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; } const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override bool isInjective(const std::string & attribute_name) const override
{ {
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective; return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;
@ -248,10 +244,6 @@ private:
size_t bucket_count = 0; size_t bucket_count = 0;
mutable std::atomic<size_t> query_count{0}; mutable std::atomic<size_t> query_count{0};
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
BlockPtr saved_block; BlockPtr saved_block;
}; };

View File

@ -80,18 +80,8 @@ RangeHashedDictionary::RangeHashedDictionary(
, require_nonempty(require_nonempty) , require_nonempty(require_nonempty)
{ {
createAttributes(); createAttributes();
loadData();
try calculateBytesAllocated();
{
loadData();
calculateBytesAllocated();
}
catch (...)
{
creation_exception = std::current_exception();
}
creation_time = std::chrono::system_clock::now();
} }

View File

@ -24,8 +24,6 @@ public:
const DictionaryLifetime dict_lifetime, const DictionaryLifetime dict_lifetime,
bool require_nonempty); bool require_nonempty);
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return dictionary_name; } std::string getName() const override { return dictionary_name; }
std::string getTypeName() const override { return "RangeHashed"; } std::string getTypeName() const override { return "RangeHashed"; }
@ -53,8 +51,6 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; } const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override bool isInjective(const std::string & attribute_name) const override
{ {
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective; return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;
@ -227,10 +223,6 @@ private:
size_t element_count = 0; size_t element_count = 0;
size_t bucket_count = 0; size_t bucket_count = 0;
mutable std::atomic<size_t> query_count{0}; mutable std::atomic<size_t> query_count{0};
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
}; };
} }

View File

@ -33,8 +33,6 @@ public:
std::string getKeyDescription() const { return key_description; } std::string getKeyDescription() const { return key_description; }
std::exception_ptr getCreationException() const override { return creation_exception; }
std::string getName() const override { return name; } std::string getName() const override { return name; }
std::string getTypeName() const override { return "Trie"; } std::string getTypeName() const override { return "Trie"; }
@ -62,8 +60,6 @@ public:
const DictionaryStructure & getStructure() const override { return dict_struct; } const DictionaryStructure & getStructure() const override { return dict_struct; }
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
bool isInjective(const std::string & attribute_name) const override bool isInjective(const std::string & attribute_name) const override
{ {
return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective; return dict_struct.attributes[&getAttribute(attribute_name) - attributes.data()].injective;

View File

@ -159,10 +159,14 @@ void AsynchronousMetrics::update()
size_t max_part_count_for_partition = 0; size_t max_part_count_for_partition = 0;
size_t number_of_databases = databases.size();
size_t total_number_of_tables = 0;
for (const auto & db : databases) for (const auto & db : databases)
{ {
for (auto iterator = db.second->getIterator(context); iterator->isValid(); iterator->next()) for (auto iterator = db.second->getIterator(context); iterator->isValid(); iterator->next())
{ {
++total_number_of_tables;
auto & table = iterator->table(); auto & table = iterator->table();
StorageMergeTree * table_merge_tree = dynamic_cast<StorageMergeTree *>(table.get()); StorageMergeTree * table_merge_tree = dynamic_cast<StorageMergeTree *>(table.get());
StorageReplicatedMergeTree * table_replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(table.get()); StorageReplicatedMergeTree * table_replicated_merge_tree = dynamic_cast<StorageReplicatedMergeTree *>(table.get());
@ -213,6 +217,9 @@ void AsynchronousMetrics::update()
set("ReplicasMaxRelativeDelay", max_relative_delay); set("ReplicasMaxRelativeDelay", max_relative_delay);
set("MaxPartCountForPartition", max_part_count_for_partition); set("MaxPartCountForPartition", max_part_count_for_partition);
set("NumberOfDatabases", number_of_databases);
set("NumberOfTables", total_number_of_tables);
} }
#if USE_TCMALLOC #if USE_TCMALLOC

View File

@ -504,20 +504,6 @@ std::shared_ptr<CatBoostLibHolder> getCatBoostWrapperHolder(const std::string &
CatBoostModel::CatBoostModel(std::string name_, std::string model_path_, std::string lib_path_, CatBoostModel::CatBoostModel(std::string name_, std::string model_path_, std::string lib_path_,
const ExternalLoadableLifetime & lifetime) const ExternalLoadableLifetime & lifetime)
: name(std::move(name_)), model_path(std::move(model_path_)), lib_path(std::move(lib_path_)), lifetime(lifetime) : name(std::move(name_)), model_path(std::move(model_path_)), lib_path(std::move(lib_path_)), lifetime(lifetime)
{
try
{
init();
}
catch (...)
{
creation_exception = std::current_exception();
}
creation_time = std::chrono::system_clock::now();
}
void CatBoostModel::init()
{ {
api_provider = getCatBoostWrapperHolder(lib_path); api_provider = getCatBoostWrapperHolder(lib_path);
api = &api_provider->getAPI(); api = &api_provider->getAPI();

View File

@ -68,9 +68,6 @@ public:
std::shared_ptr<const IExternalLoadable> clone() const override; std::shared_ptr<const IExternalLoadable> clone() const override;
std::chrono::time_point<std::chrono::system_clock> getCreationTime() const override { return creation_time; }
std::exception_ptr getCreationException() const override { return creation_exception; }
private: private:
std::string name; std::string name;
std::string model_path; std::string model_path;
@ -85,9 +82,6 @@ private:
size_t cat_features_count; size_t cat_features_count;
size_t tree_count; size_t tree_count;
std::chrono::time_point<std::chrono::system_clock> creation_time;
std::exception_ptr creation_exception;
void init(); void init();
}; };

View File

@ -219,7 +219,7 @@ class ExternalLoader::LoadingDispatcher : private boost::noncopyable
{ {
public: public:
/// Called to load or reload an object. /// Called to load or reload an object.
using CreateObjectFunction = std::function<ObjectWithException( using CreateObjectFunction = std::function<LoadablePtr(
const String & /* name */, const ObjectConfig & /* config */, bool config_changed, const LoadablePtr & /* previous_version */)>; const String & /* name */, const ObjectConfig & /* config */, bool config_changed, const LoadablePtr & /* previous_version */)>;
/// Called after loading/reloading an object to calculate the time of the next update. /// Called after loading/reloading an object to calculate the time of the next update.
@ -783,7 +783,7 @@ private:
std::exception_ptr new_exception; std::exception_ptr new_exception;
try try
{ {
std::tie(new_object, new_exception) = create_object(name, config, config_changed, previous_version); new_object = create_object(name, config, config_changed, previous_version);
} }
catch (...) catch (...)
{ {
@ -792,8 +792,6 @@ private:
if (!new_object && !new_exception) if (!new_object && !new_exception)
throw Exception("No object created and no exception raised for " + type_name, ErrorCodes::LOGICAL_ERROR); throw Exception("No object created and no exception raised for " + type_name, ErrorCodes::LOGICAL_ERROR);
if (new_object && new_exception)
new_object = nullptr;
/// Calculate a new update time. /// Calculate a new update time.
TimePoint next_update_time; TimePoint next_update_time;
@ -1152,17 +1150,13 @@ void ExternalLoader::reload(bool load_never_loading)
loading_dispatcher->reload(load_never_loading); loading_dispatcher->reload(load_never_loading);
} }
ExternalLoader::ObjectWithException ExternalLoader::createObject( ExternalLoader::LoadablePtr ExternalLoader::createObject(
const String & name, const ObjectConfig & config, bool config_changed, const LoadablePtr & previous_version) const const String & name, const ObjectConfig & config, bool config_changed, const LoadablePtr & previous_version) const
{ {
if (previous_version && !config_changed) if (previous_version && !config_changed)
{ return previous_version->clone();
auto new_object = previous_version->clone();
return {new_object, new_object->getCreationException()};
}
auto new_object = create(name, *config.config, config.key_in_config); return create(name, *config.config, config.key_in_config);
return {new_object, new_object->getCreationException()};
} }
ExternalLoader::TimePoint ExternalLoader::calculateNextUpdateTime(const LoadablePtr & loaded_object, size_t error_count) const ExternalLoader::TimePoint ExternalLoader::calculateNextUpdateTime(const LoadablePtr & loaded_object, size_t error_count) const

View File

@ -186,10 +186,8 @@ protected:
private: private:
struct ObjectConfig; struct ObjectConfig;
using ObjectWithException = std::pair<LoadablePtr, std::exception_ptr>;
ObjectWithException LoadablePtr createObject(const String & name, const ObjectConfig & config, bool config_changed, const LoadablePtr & previous_version) const;
createObject(const String & name, const ObjectConfig & config, bool config_changed, const LoadablePtr & previous_version) const;
TimePoint calculateNextUpdateTime(const LoadablePtr & loaded_object, size_t error_count) const; TimePoint calculateNextUpdateTime(const LoadablePtr & loaded_object, size_t error_count) const;
class ConfigFilesReader; class ConfigFilesReader;

View File

@ -1,6 +1,5 @@
#pragma once #pragma once
#include <chrono>
#include <string> #include <string>
#include <memory> #include <memory>
#include <boost/noncopyable.hpp> #include <boost/noncopyable.hpp>
@ -41,10 +40,6 @@ public:
virtual bool isModified() const = 0; virtual bool isModified() const = 0;
/// Returns new object with the same configuration. Is used to update modified object when lifetime exceeded. /// Returns new object with the same configuration. Is used to update modified object when lifetime exceeded.
virtual std::shared_ptr<const IExternalLoadable> clone() const = 0; virtual std::shared_ptr<const IExternalLoadable> clone() const = 0;
virtual std::chrono::time_point<std::chrono::system_clock> getCreationTime() const = 0;
virtual std::exception_ptr getCreationException() const = 0;
}; };
} }

View File

@ -20,13 +20,13 @@ Block InternalTextLogsQueue::getSampleBlock()
{ {
return Block { return Block {
{std::make_shared<DataTypeDateTime>(), "event_time"}, {std::make_shared<DataTypeDateTime>(), "event_time"},
{std::make_shared<DataTypeUInt32>(), "event_time_microseconds"}, {std::make_shared<DataTypeUInt32>(), "event_time_microseconds"},
{std::make_shared<DataTypeString>(), "host_name"}, {std::make_shared<DataTypeString>(), "host_name"},
{std::make_shared<DataTypeString>(), "query_id"}, {std::make_shared<DataTypeString>(), "query_id"},
{std::make_shared<DataTypeUInt32>(), "thread_number"}, {std::make_shared<DataTypeUInt32>(), "thread_number"},
{std::make_shared<DataTypeInt8>(), "priority"}, {std::make_shared<DataTypeInt8>(), "priority"},
{std::make_shared<DataTypeString>(), "source"}, {std::make_shared<DataTypeString>(), "source"},
{std::make_shared<DataTypeString>(), "text"} {std::make_shared<DataTypeString>(), "text"}
}; };
} }

View File

@ -101,6 +101,9 @@ BlockInputStreamPtr InterpreterDescribeQuery::executeImpl()
for (const auto & column : columns) for (const auto & column : columns)
{ {
if (column.is_virtual)
continue;
res_columns[0]->insert(column.name); res_columns[0]->insert(column.name);
res_columns[1]->insert(column.type->getName()); res_columns[1]->insert(column.type->getName());

View File

@ -44,6 +44,9 @@ class Client:
return self.get_query_request(sql, stdin=stdin, timeout=timeout, settings=settings, user=user).get_error() return self.get_query_request(sql, stdin=stdin, timeout=timeout, settings=settings, user=user).get_error()
def query_and_get_answer_with_error(self, sql, stdin=None, timeout=None, settings=None, user=None):
return self.get_query_request(sql, stdin=stdin, timeout=timeout, settings=settings, user=user).get_answer_and_error()
class QueryTimeoutExceedException(Exception): class QueryTimeoutExceedException(Exception):
pass pass
@ -110,3 +113,17 @@ class CommandRequest:
raise QueryRuntimeException('Client expected to be failed but succeeded! stdout: {}'.format(stdout)) raise QueryRuntimeException('Client expected to be failed but succeeded! stdout: {}'.format(stdout))
return stderr return stderr
def get_answer_and_error(self):
self.process.wait()
self.stdout_file.seek(0)
self.stderr_file.seek(0)
stdout = self.stdout_file.read()
stderr = self.stderr_file.read()
if self.timer is not None and not self.process_finished_before_timeout and not self.ignore_error:
raise QueryTimeoutExceedException('Client timed out!')
return (stdout, stderr)

View File

@ -527,6 +527,10 @@ class ClickHouseInstance:
def query_and_get_error(self, sql, stdin=None, timeout=None, settings=None, user=None): def query_and_get_error(self, sql, stdin=None, timeout=None, settings=None, user=None):
return self.client.query_and_get_error(sql, stdin, timeout, settings, user) return self.client.query_and_get_error(sql, stdin, timeout, settings, user)
# The same as query_and_get_error but ignores successful query.
def query_and_get_answer_with_error(self, sql, stdin=None, timeout=None, settings=None, user=None):
return self.client.query_and_get_answer_with_error(sql, stdin, timeout, settings, user)
# Connects to the instance via HTTP interface, sends a query and returns the answer # Connects to the instance via HTTP interface, sends a query and returns the answer
def http_query(self, sql, data=None): def http_query(self, sql, data=None):
return urllib.urlopen("http://"+self.ip_address+":8123/?query="+urllib.quote(sql,safe=''), data).read() return urllib.urlopen("http://"+self.ip_address+":8123/?query="+urllib.quote(sql,safe=''), data).read()

View File

@ -0,0 +1,26 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>information</level>
<log>/var/log/clickhouse-server/clickhouse-server.log</log>
<errorlog>/var/log/clickhouse-server/clickhouse-server.err.log</errorlog>
<size>1000M</size>
<count>10</count>
</logger>
<openSSL>
<client>
<cacheSessions>true</cacheSessions>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
<max_concurrent_queries>500</max_concurrent_queries>
<mark_cache_size>5368709120</mark_cache_size>
<users_config>users.xml</users_config>
<dictionaries_config>/etc/clickhouse-server/config.d/*.xml</dictionaries_config>
</yandex>

View File

@ -0,0 +1,18 @@
import pytest
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance('node', main_configs=['configs/config_information.xml'])
@pytest.fixture(scope="module")
def start_cluster():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def test_check_client_logs_level(start_cluster):
logs = node.query_and_get_answer_with_error("SELECT 1", settings={"send_logs_level": 'trace'})[1]
assert logs.count('Trace') != 0

View File

@ -13,11 +13,11 @@
<create_query>CREATE TABLE ints (i64 Int64, i32 Int32, i16 Int16, i8 Int8) ENGINE = Memory</create_query> <create_query>CREATE TABLE ints (i64 Int64, i32 Int32, i16 Int16, i8 Int8) ENGINE = Memory</create_query>
<fill_query>INSERT INTO ints SELECT number AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000)</fill_query> <fill_query>INSERT INTO ints SELECT number AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(5000)</fill_query>
<fill_query>INSERT INTO ints SELECT 10000 + number % 1000 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000)</fill_query> <fill_query>INSERT INTO ints SELECT 10000 + number % 1000 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(5000)</fill_query>
<fill_query>INSERT INTO ints SELECT 20000 + number % 100 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000)</fill_query> <fill_query>INSERT INTO ints SELECT 20000 + number % 100 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(5000)</fill_query>
<fill_query>INSERT INTO ints SELECT 30000 + number % 10 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000)</fill_query> <fill_query>INSERT INTO ints SELECT 30000 + number % 10 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(5000)</fill_query>
<fill_query>INSERT INTO ints SELECT 40000 + number % 1 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(10000)</fill_query> <fill_query>INSERT INTO ints SELECT 40000 + number % 1 AS i64, i64 AS i32, i64 AS i16, i64 AS i8 FROM numbers(5000)</fill_query>
<query tag='ANY LEFT'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 = 200042</query> <query tag='ANY LEFT'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64 WHERE i32 = 200042</query>
<query tag='ANY LEFT KEY'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 200042</query> <query tag='ANY LEFT KEY'>SELECT COUNT() FROM ints l ANY LEFT JOIN ints r USING i64,i32,i16,i8 WHERE i32 = 200042</query>

View File

@ -0,0 +1,7 @@
SET max_memory_usage = 1000000000;
SELECT sum(ignore(*)) FROM (
SELECT number, argMax(number, (number, toFixedString(toString(number), 1024)))
FROM numbers(1000000)
GROUP BY number
) -- { serverError 241 }

View File

@ -0,0 +1,24 @@
1
1
1
1
1
1
<Debug>
<Trace>
<Debug>
<Information>
<Debug>
<Debug>
<Debug>
<Information>
<Debug>
<Information>

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
> 00965_logs_level_bugfix.tmp
clickhouse-client --send_logs_level="trace" --query="SELECT 1;" 2>> 00965_logs_level_bugfix.tmp
clickhouse-client --send_logs_level="debug" --query="SELECT 1;" 2>> 00965_logs_level_bugfix.tmp
clickhouse-client --send_logs_level="information" --query="SELECT 1;" 2>> 00965_logs_level_bugfix.tmp
clickhouse-client --send_logs_level="warning" --query="SELECT 1;" 2>> 00965_logs_level_bugfix.tmp
clickhouse-client --send_logs_level="error" --query="SELECT 1;" 2>> 00965_logs_level_bugfix.tmp
clickhouse-client --send_logs_level="none" --query="SELECT 1;" 2>> 00965_logs_level_bugfix.tmp
awk '{ print $8 }' 00965_logs_level_bugfix.tmp

View File

@ -0,0 +1,12 @@
<Debug>
<Trace>
<Debug>
<Information>
<Debug>
*****
<Information>

View File

@ -0,0 +1,17 @@
#!/usr/bin/env bash
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
. $CURDIR/../shell_config.sh
> 00965_send_logs_level_concurrent_queries_first.tmp
> 00965_send_logs_level_concurrent_queries_second.tmp
clickhouse-client --send_logs_level="trace" --query="SELECT * from numbers(100000);" >> /dev/null 2>> 00965_send_logs_level_concurrent_queries_first.tmp &
clickhouse-client --send_logs_level="information" --query="SELECT * from numbers(100000);" >> /dev/null 2>> 00965_send_logs_level_concurrent_queries_second.tmp
sleep 2
awk '{ print $8 }' 00965_send_logs_level_concurrent_queries_first.tmp
echo "*****"
awk '{ print $8 }' 00965_send_logs_level_concurrent_queries_second.tmp

View File

@ -0,0 +1 @@
x UInt64

View File

@ -0,0 +1,9 @@
-- No virtual columns should be output in DESC TABLE query.
DROP TABLE IF EXISTS upyachka;
CREATE TABLE upyachka (x UInt64) ENGINE = Memory;
-- Merge table has virtual column `_table`
DESC TABLE merge(currentDatabase(), 'upyachka');
DROP TABLE upyachka;

View File

@ -1,3 +1,3 @@
Template: clickhouse-server/default-password Template: clickhouse-server/default-password
Type: password Type: password
Description: Password for default user. Description: Password for default user

View File

@ -514,7 +514,7 @@ Use the following parameters to configure logging:
``` ```
## path ## path {#server_settings-path}
The path to the directory containing data. The path to the directory containing data.

View File

@ -71,7 +71,7 @@ $ echo -e "1,2\n3,4" | clickhouse-local -q "CREATE TABLE table (a Int64, b Int64
## Details of Implementation ## Details of Implementation
- Multiple SELECT queries can be performed concurrently, but INSERT queries will wait each other. - Multiple `SELECT` queries can be performed concurrently, but `INSERT` queries will wait each other.
- Not supported: - Not supported:
- `ALTER` - `ALTER`
- `SELECT ... SAMPLE` - `SELECT ... SAMPLE`

View File

@ -22,13 +22,21 @@ Example 2: `uniqArray(arr)` Count the number of unique elements in all 'arr'
## -State ## -State
If you apply this combinator, the aggregate function doesn't return the resulting value (such as the number of unique values for the `uniq` function), but an intermediate state of the aggregation (for `uniq`, this is the hash table for calculating the number of unique values). This is an AggregateFunction(...) that can be used for further processing or stored in a table to finish aggregating later. To work with these states, use the [AggregatingMergeTree](../../operations/table_engines/aggregatingmergetree.md) table engine, the functions [`finalizeAggregation`](../functions/other_functions.md#finalizeaggregation) and [`runningAccumulate`](../functions/other_functions.md#function-runningaccumulate), and the combinators -Merge and -MergeState described below. If you apply this combinator, the aggregate function doesn't return the resulting value (such as the number of unique values for the [uniq](reference.md#agg_function-uniq) function), but an intermediate state of the aggregation (for `uniq`, this is the hash table for calculating the number of unique values). This is an `AggregateFunction(...)` that can be used for further processing or stored in a table to finish aggregating later.
## -Merge To work with these states, use:
- [AggregatingMergeTree](../../operations/table_engines/aggregatingmergetree.md) table engine.
- [finalizeAggregation](../functions/other_functions.md#function-finalizeaggregation) function.
- [runningAccumulate](../functions/other_functions.md#function-runningaccumulate) function.
- [-Merge](#aggregate_functions_combinators_merge) combinator.
- [-MergeState](#aggregate_functions_combinators_mergestate) combinator.
## -Merge {#aggregate_functions_combinators_merge}
If you apply this combinator, the aggregate function takes the intermediate aggregation state as an argument, combines the states to finish aggregation, and returns the resulting value. If you apply this combinator, the aggregate function takes the intermediate aggregation state as an argument, combines the states to finish aggregation, and returns the resulting value.
## -MergeState. ## -MergeState {#aggregate_functions_combinators_mergestate}
Merges the intermediate aggregation states in the same way as the -Merge combinator. However, it doesn't return the resulting value, but an intermediate aggregation state, similar to the -State combinator. Merges the intermediate aggregation states in the same way as the -Merge combinator. However, it doesn't return the resulting value, but an intermediate aggregation state, similar to the -State combinator.

View File

@ -107,22 +107,26 @@ Besides default data compression, defined in [server settings](../operations/ser
Supported compression algorithms: Supported compression algorithms:
- `NONE` - no compression for data applied - `NONE` — No compression.
- `LZ4` - `LZ4` — Lossless [data compression algorithm](https://github.com/lz4/lz4) used by default. Applies LZ4 fast compression.
- `LZ4HC(level)` - (level) - LZ4\_HC compression algorithm with defined level. - `LZ4HC[(level)]` — LZ4 CH (high compression) algorithm with configurable level. Default level: 9. If you set `level <= 0`, the default level is applied. Possible levels: [1, 12]. Recommended levels are in range: [4, 9].
Possible `level` range: \[3, 12\]. Default value: 9. Greater values stands for better compression and higher CPU usage. Recommended value range: [4,9]. - `ZSTD[(level)]` — [ZSTD compression algorithm](https://en.wikipedia.org/wiki/Zstandard) with configurable `level`. Possible levels: [1, 22]. Default value: 1.
- `ZSTD(level)` - ZSTD compression algorithm with defined `level`. Possible `level` value range: \[1, 22\]. Default value: 1. - `Delta(delta_bytes)` — compression approach, when raw values are replaced with the difference of two neighbour values. Up to `delta_bytes` are used for storing delta value, so `delta_bytes` is a maximum size of raw values.
Greater values stands for better compression and higher CPU usage. Possible `delta_bytes` values: 1, 2, 4, 8. Default value for `delta_bytes` is `sizeof(type)`, if it is equals to 1, 2, 4, 8. Otherwise it equals 1.
- `Delta(delta_bytes)` - compression approach when raw values are replace with difference of two neighbour values. Up to `delta_bytes` are used for storing delta value. - `DoubleDelta` — Compresses values down to 1 bit (in the best case), using deltas calculation. Best compression rates are achieved on monotonic sequences with constant stride, for example, time series data. Can be used with any fixed-width type. Implements the algorithm used in Gorilla TSDB, extending it to support 64 bit types. Uses 1 extra bit for 32 byte deltas: 5 bit prefix instead of 4 bit prefix. For additional information, see the "Compressing time stamps" section of the [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf) document.
Possible `delta_bytes` values: 1, 2, 4, 8. Default value for delta bytes is `sizeof(type)`, if it is equals to 1, 2, 4, 8 and equals to 1 otherwise. - `Gorilla` — Compresses values down to 1 bit (in the best case). The codec is efficient when storing series of floating point values that change slowly, because the best compression rate is achieved when neighbouring values are binary equal. Implements the algorithm used in Gorilla TSDB, extending it to support 64 bit types. For additional information, see the "Compressing values" section of the [Gorilla: A Fast, Scalable, In-Memory Time Series Database](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf) document.
- `DoubleDelta` - stores delta of deltas in compact binary form, compressing values down to 1 bit (in the best case). Best compression rates are achieved on monotonic sequences with constant stride, e.g. time samples. Can be used against any fixed-width type. Implementation is based on [Gorilla paper](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf), and extended to support 64bit types. The drawback is 1 extra bit for 32-byte wide deltas: 5-bit prefix instead of 4-bit prefix.
- `Gorilla` - stores (parts of) xored values in compact binary form, compressing values down to 1 bit (in the best case). Best compression rate is achieved when neighbouring values are binary equal. Basic use case - floating point data that do not change rapidly. Implementation is based on [Gorilla paper](http://www.vldb.org/pvldb/vol8/p1816-teller.pdf), and extended to support 64bit types. High compression levels useful for asymmetric scenarios, like compress once, decompress a lot of times. Greater levels stands for better compression and higher CPU usage.
!!!warning
You cannot decompress ClickHouse database files with external utilities, for example, `lz4`. Use the special utility [clickhouse-compressor](https://github.com/yandex/ClickHouse/tree/master/dbms/programs/compressor).
Syntax example: Syntax example:
``` ```
CREATE TABLE codec_example CREATE TABLE codec_example
( (
dt Date CODEC(ZSTD), /* используется уровень сжатия по-умолчанию */ dt Date CODEC(ZSTD),
ts DateTime CODEC(LZ4HC), ts DateTime CODEC(LZ4HC),
float_value Float32 CODEC(NONE), float_value Float32 CODEC(NONE),
double_value Float64 CODEC(LZ4HC(9)) double_value Float64 CODEC(LZ4HC(9))
@ -134,6 +138,7 @@ ORDER BY dt
Codecs can be combined in a pipeline. Default table codec is not included into pipeline (if it should be applied to a column, you have to specify it explicitly in pipeline). Example below shows an optimization approach for storing timeseries metrics. Codecs can be combined in a pipeline. Default table codec is not included into pipeline (if it should be applied to a column, you have to specify it explicitly in pipeline). Example below shows an optimization approach for storing timeseries metrics.
Usually, values for particular metric, stored in `path` does not differ significantly from point to point. Using delta-encoding allows to reduce disk space usage significantly. Usually, values for particular metric, stored in `path` does not differ significantly from point to point. Using delta-encoding allows to reduce disk space usage significantly.
``` ```
CREATE TABLE timeseries_example CREATE TABLE timeseries_example
( (

View File

@ -96,7 +96,7 @@ LIMIT 3
Checks whether the dictionary has the key. Checks whether the dictionary has the key.
``` ```
dictHas('dict_name', id) dictHas('dict_name', id_expr)
``` ```
**Parameters** **Parameters**
@ -116,7 +116,7 @@ Type: `UInt8`.
For the hierarchical dictionary, returns an array of dictionary keys starting from passed `id_expr` and continuing along the chain of parent elements. For the hierarchical dictionary, returns an array of dictionary keys starting from passed `id_expr` and continuing along the chain of parent elements.
``` ```
dictGetHierarchy('dict_name', id) dictGetHierarchy('dict_name', id_expr)
``` ```
**Parameters** **Parameters**

View File

@ -151,4 +151,36 @@ SELECT geohashDecode('ezs42') AS res
└─────────────────────────────────┘ └─────────────────────────────────┘
``` ```
## geoToH3
Calculates [H3](https://uber.github.io/h3/#/documentation/overview/introduction) point index `(lon, lat)` with specified resolution.
```
geoToH3(lon, lat, resolution)
```
**Input values**
- `lon` — Longitude. Type: [Float64](../../data_types/float.md).
- `lat` — Latitude. Type: [Float64](../../data_types/float.md).
- `resolution` — Index resolution. Range: `[0, 15]`. Type: [UInt8](../../data_types/int_uint.md).
**Returned values**
- Hexagon index number.
- 0 in case of error.
Type: [UInt64](../../data_types/int_uint.md).
**Example**
``` sql
SELECT geoToH3(37.79506683, 55.71290588, 15) as h3Index
```
```
┌────────────h3Index─┐
│ 644325524701193974 │
└────────────────────┘
```
[Original article](https://clickhouse.yandex/docs/en/query_language/functions/geo/) <!--hide--> [Original article](https://clickhouse.yandex/docs/en/query_language/functions/geo/) <!--hide-->

View File

@ -627,15 +627,36 @@ SELECT replicate(1, ['a', 'b', 'c'])
└───────────────────────────────┘ └───────────────────────────────┘
``` ```
## filesystemAvailable ## filesystemAvailable {#function-filesystemavailable}
Returns the remaining space information of the disk, in bytes. This information is evaluated using the configured by path. Returns the amount of remaining space in the filesystem where the files of the databases located. See the [path](../../operations/server_settings/settings.md#server_settings-path) server setting description.
```
filesystemAvailable()
```
**Returned values**
- Amount of remaining space in bytes.
Type: [UInt64](../../data_types/int_uint.md).
**Example**
```sql
SELECT filesystemAvailable() AS "Free space", toTypeName(filesystemAvailable()) AS "Type"
```
```text
┌──Free space─┬─Type───┐
│ 18152624128 │ UInt64 │
└─────────────┴────────┘
```
## filesystemCapacity ## filesystemCapacity
Returns the capacity information of the disk, in bytes. This information is evaluated using the configured by path. Returns the capacity information of the disk, in bytes. This information is evaluated using the configured by path.
## finalizeAggregation ## finalizeAggregation {#function-finalizeaggregation}
Takes state of aggregate function. Returns result of aggregation (finalized state). Takes state of aggregate function. Returns result of aggregation (finalized state).

View File

@ -101,11 +101,11 @@ CREATE TABLE lineorder_flat
ENGINE = MergeTree ENGINE = MergeTree
PARTITION BY toYear(LO_ORDERDATE) PARTITION BY toYear(LO_ORDERDATE)
ORDER BY (LO_ORDERDATE, LO_ORDERKEY) AS ORDER BY (LO_ORDERDATE, LO_ORDERKEY) AS
SELECT * SELECT l.*, c.*, s.*, p.*
FROM lineorder FROM lineorder l
ANY INNER JOIN customer ON LO_CUSTKEY = C_CUSTKEY ANY INNER JOIN customer c ON (c.C_CUSTKEY = l.LO_CUSTKEY)
ANY INNER JOIN supplier ON LO_SUPPKEY = S_SUPPKEY ANY INNER JOIN supplier s ON (s.S_SUPPKEY = l.LO_SUPPKEY)
ANY INNER JOIN part ON LO_PARTKEY = P_PARTKEY; ANY INNER JOIN part p ON (p.P_PARTKEY = l.LO_PARTKEY);
ALTER TABLE lineorder_flat DROP COLUMN C_CUSTKEY, DROP COLUMN S_SUPPKEY, DROP COLUMN P_PARTKEY; ALTER TABLE lineorder_flat DROP COLUMN C_CUSTKEY, DROP COLUMN S_SUPPKEY, DROP COLUMN P_PARTKEY;
``` ```

View File

@ -167,6 +167,8 @@ clickhouse-client --format_csv_delimiter="|" --query="INSERT INTO test.csv FORMA
`NULL` форматируется в виде `\N` или `NULL` или пустой неэкранированной строки (см. настройки [format_csv_unquoted_null_literal_as_null](../operations/settings/settings.md#settings-format_csv_unquoted_null_literal_as_null) и [input_format_defaults_for_omitted_fields](../operations/settings/settings.md#settings-input_format_defaults_for_omitted_fields)). `NULL` форматируется в виде `\N` или `NULL` или пустой неэкранированной строки (см. настройки [format_csv_unquoted_null_literal_as_null](../operations/settings/settings.md#settings-format_csv_unquoted_null_literal_as_null) и [input_format_defaults_for_omitted_fields](../operations/settings/settings.md#settings-input_format_defaults_for_omitted_fields)).
Если установлена настройка [input_format_defaults_for_omitted_fields = 1](../operations/settings/settings.md#session_settings-input_format_defaults_for_omitted_fields) и тип столбца не `Nullable(T)`, то пустые значения без кавычек заменяются значениями по умолчанию для типа данных столбца.
Формат CSV поддерживает вывод totals и extremes аналогично `TabSeparated`. Формат CSV поддерживает вывод totals и extremes аналогично `TabSeparated`.
## CSVWithNames ## CSVWithNames

View File

@ -514,7 +514,7 @@ ClickHouse проверит условия `min_part_size` и `min_part_size_rat
``` ```
## path ## path {#server_settings-path}
Путь к каталогу с данными. Путь к каталогу с данными.

View File

@ -181,20 +181,15 @@ Ok.
## input_format_defaults_for_omitted_fields {#session_settings-input_format_defaults_for_omitted_fields} ## input_format_defaults_for_omitted_fields {#session_settings-input_format_defaults_for_omitted_fields}
Включает/выключает расширенный обмен данными между клиентом ClickHouse и сервером ClickHouse. Параметр применяется для запросов `INSERT`. При вставке данных запросом `INSERT`, заменяет пропущенные поля значениям по умолчанию для типа данных столбца.
При выполнении запроса`INSERT`, клиент ClickHouse подготавливает данные и отправляет их на сервер для записи. При подготовке данных клиент получает структуру таблицы от сервера. В некоторых случаях клиенту требуется больше информации, чем сервер отправляет по умолчанию. Включите расширенный обмен данными с помощью настройки `input_format_defaults_for_omitted_fields = 1`. Поддерживаемые форматы вставки:
Если расширенный обмен данными включен, сервер отправляет дополнительные метаданные вместе со структурой таблицы. Состав метаданных зависит от операции. - [JSONEachRow](../../interfaces/formats.md#jsoneachrow)
- [CSV](../../interfaces/formats.md#csv)
Операции, для которых может потребоваться включить расширенный обмен данными:
- Вставка данных в формате [JSONEachRow](../../interfaces/formats.md#jsoneachrow).
Для всех остальных операций ClickHouse не применяет этот параметр.
!!! note "Примечание" !!! note "Примечание"
Функциональность расширенного обмена данными потребляет дополнительные вычислительные ресурсы на сервере и может снизить производительность. Когда опция включена, сервер отправляет клиенту расширенные метаданные. Это требует дополнительных вычислительных ресурсов на сервере и может снизить производительность.
Возможные значения: Возможные значения:

View File

@ -68,7 +68,7 @@ $ echo -e "1,2\n3,4" | clickhouse-local -q "CREATE TABLE table (a Int64, b Int64
## Детали реализации ## Детали реализации
- Поддерживается многопоточное чтение и однопоточная запись. - Поддерживается одновременное выполнение множества запросов `SELECT`, запросы `INSERT` могут выполняться только последовательно.
- Не поддерживается: - Не поддерживается:
- использование операций `ALTER` и `SELECT...SAMPLE`; - использование операций `ALTER` и `SELECT...SAMPLE`;
- индексы; - индексы;

View File

@ -25,7 +25,7 @@ SETTINGS
[kafka_row_delimiter = 'delimiter_symbol',] [kafka_row_delimiter = 'delimiter_symbol',]
[kafka_schema = '',] [kafka_schema = '',]
[kafka_num_consumers = N,] [kafka_num_consumers = N,]
[kafka_skip_broken_messages = <0|1>] [kafka_skip_broken_messages = N]
``` ```
Обязательные параметры: Обязательные параметры:
@ -40,7 +40,7 @@ SETTINGS
- `kafka_row_delimiter` символ-разделитель записей (строк), которым завершается сообщение. - `kafka_row_delimiter` символ-разделитель записей (строк), которым завершается сообщение.
- `kafka_schema` опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Cap'n Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`. - `kafka_schema` опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Cap'n Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
- `kafka_num_consumers` количество потребителей (consumer) на таблицу. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя. - `kafka_num_consumers` количество потребителей (consumer) на таблицу. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна. Общее число потребителей не должно превышать количество партиций в топике, так как на одну партицию может быть назначено не более одного потребителя.
- `kafka_skip_broken_messages` режим обработки сообщений Kafka. Если `kafka_skip_broken_messages = 1`, то движок отбрасывает сообщения Кафки, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). - `kafka_skip_broken_messages` максимальное количество некорректных сообщений в блоке. Если `kafka_skip_broken_messages = N`, то движок отбрасывает `N` сообщений Кафки, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию 0.
Примеры Примеры

View File

@ -23,13 +23,22 @@
## -State ## -State
В случае применения этого комбинатора, агрегатная функция возвращает не готовое значение (например, в случае функции `uniq` — количество уникальных значений), а промежуточное состояние агрегации (например, в случае функции `uniq` — хэш-таблицу для расчёта количества уникальных значений), которое имеет тип AggregateFunction(...) и может использоваться для дальнейшей обработки или может быть сохранено в таблицу для последующей доагрегации - смотрите разделы «AggregatingMergeTree» и «функции для работы с промежуточными состояниями агрегации». В случае применения этого комбинатора, агрегатная функция возвращает не готовое значение (например, в случае функции [uniq](reference.md#agg_function-uniq) — количество уникальных значений), а промежуточное состояние агрегации (например, в случае функции `uniq` — хэш-таблицу для расчёта количества уникальных значений), которое имеет тип `AggregateFunction(...)` и может использоваться для дальнейшей обработки или может быть сохранено в таблицу для последующей доагрегации.
## -Merge Для работы с промежуточными состояниями предназначены:
- Движок таблиц [AggregatingMergeTree](../../operations/table_engines/aggregatingmergetree.md).
- Функция [finalizeAggregation](../functions/other_functions.md#function-finalizeaggregation).
- Функция [runningAccumulate](../functions/other_functions.md#function-runningaccumulate).
- Комбинатор [-Merge](#aggregate_functions_combinators_merge).
- Комбинатор [-MergeState](#aggregate_functions_combinators_mergestate).
## -Merge {#aggregate_functions_combinators_merge}
В случае применения этого комбинатора, агрегатная функция будет принимать в качестве аргумента промежуточное состояние агрегации, доагрегировать (объединять вместе) эти состояния, и возвращать готовое значение. В случае применения этого комбинатора, агрегатная функция будет принимать в качестве аргумента промежуточное состояние агрегации, доагрегировать (объединять вместе) эти состояния, и возвращать готовое значение.
## -MergeState. ## -MergeState {#aggregate_functions_combinators_mergestate}
Выполняет слияние промежуточных состояний агрегации, аналогично комбинатору -Merge, но возвращает не готовое значение, а промежуточное состояние агрегации, аналогично комбинатору -State. Выполняет слияние промежуточных состояний агрегации, аналогично комбинатору -Merge, но возвращает не готовое значение, а промежуточное состояние агрегации, аналогично комбинатору -State.

View File

@ -1,40 +1,192 @@
# Функции для работы с внешними словарями {#ext_dict_functions} # Функции для работы с внешними словарями {#ext_dict_functions}
Информация о подключении и настройке внешних словарей смотрите в разделе [Внешние словари](../dicts/external_dicts.md). Для получения информации о подключении и настройке, читайте раздел про [внешние словари](../dicts/external_dicts.md).
## dictGetUInt8, dictGetUInt16, dictGetUInt32, dictGetUInt64 ## dictGet
## dictGetInt8, dictGetInt16, dictGetInt32, dictGetInt64 Получение значения из внешнего словаря.
## dictGetFloat32, dictGetFloat64 ```
dictGet('dict_name', 'attr_name', id_expr)
dictGetOrDefault('dict_name', 'attr_name', id_expr, default_value_expr)
```
## dictGetDate, dictGetDateTime **Параметры**
## dictGetUUID - `dict_name` — Название словаря. [Строковый литерал](../syntax.md#syntax-string-literal).
- `attr_name` — Название колонки словаря. [Строковый литерал](../syntax.md#syntax-string-literal).
- `id_expr` — Значение ключа. [Выражение](../syntax.md#syntax-expressions) возвращает значение типа [UInt64](../../data_types/int_uint.md) или [Tuple](../../data_types/tuple.md) в зависимости от конфигурации словаря.
- `default_value_expr` — Значение которое возвращается, если словарь не содержит колонку с ключом `id_expr`. [Выражение](../syntax.md#syntax-expressions) возвращает значение такого же типа, что и у атрибута `attr_name`.
## dictGetString **Возвращаемое значение**
`dictGetT('dict_name', 'attr_name', id)`
- получить из словаря dict_name значение атрибута attr_name по ключу id.
`dict_name` и `attr_name` - константные строки.
`id` должен иметь тип UInt64.
Если ключа `id` нет в словаре - вернуть значение по умолчанию, заданное в описании словаря.
## dictGetTOrDefault - Если ClickHouse успешно обрабатывает атрибут в соотвествии с указаным [типом данных](../dicts/external_dicts_dict_structure.md#ext_dict_structure-attributes), то функция возвращает значение для заданного ключа `id_expr`.
- Если запрашиваемого `id_expr` не оказалось в словаре:
`dictGetT('dict_name', 'attr_name', id, default)` - `dictGet` возвратит содержимое элемента `<null_value>` определенного в настройках словаря.
- `dictGetOrDefault` вернет значение переданного `default_value_expr` параметра.
Аналогично функциям `dictGetT`, но значение по умолчанию берётся из последнего аргумента функции. ClickHouse бросает исключение, если не может обработать значение атрибута или значение несопоставимо с типом атрибута.
## dictIsIn **Пример использования**
`dictIsIn('dict_name', child_id, ancestor_id)`
- для иерархического словаря dict_name - узнать, находится ли ключ child_id внутри ancestor_id (или совпадает с ancestor_id). Возвращает UInt8. Создайте файл `ext-dict-text.csv` со следующим содержимым:
```text
1,1
2,2
```
Первая колонка - это `id`, вторая - `c1`
Конфигурация внешнего словаря:
```xml
<yandex>
<dictionary>
<name>ext-dict-test</name>
<source>
<file>
<path>/path-to/ext-dict-test.csv</path>
<format>CSV</format>
</file>
</source>
<layout>
<flat />
</layout>
<structure>
<id>
<name>id</name>
</id>
<attribute>
<name>c1</name>
<type>UInt32</type>
<null_value></null_value>
</attribute>
</structure>
<lifetime>0</lifetime>
</dictionary>
</yandex>
```
Выполните запрос:
```sql
SELECT
dictGetOrDefault('ext-dict-test', 'c1', number + 1, toUInt32(number * 10)) AS val,
toТипName(val) AS Type
FROM system.numbers
LIMIT 3
```
```text
┌─val─┬─type───┐
│ 1 │ UInt32 │
│ 2 │ UInt32 │
│ 20 │ UInt32 │
└─────┴────────┘
```
**Смотрите также**
- [Внешние словари](../dicts/external_dicts.md)
## dictGetHierarchy
`dictGetHierarchy('dict_name', id)`
- для иерархического словаря dict_name - вернуть массив ключей словаря, начиная с id и продолжая цепочкой родительских элементов. Возвращает Array(UInt64).
## dictHas ## dictHas
`dictHas('dict_name', id)`
- проверить наличие ключа в словаре. Возвращает значение типа UInt8, равное 0, если ключа нет и 1, если ключ есть. Проверяет наличие записи с заданным ключом в словаре.
```
dictHas('dict_name', id_expr)
```
**Параметры**
- `dict_name` — Название словаря. [Строковый литерал](../syntax.md#syntax-string-literal).
- `id_expr` — Значение ключа. [Выражение](../syntax.md#syntax-expressions) возвращает значение типа [UInt64](../../data_types/int_uint.md).
**Возвращаемое значение**
- 0, если ключ не был обнаружен
- 1, если ключ присутствует в словаре
Тип: `UInt8`.
## dictGetHierarchy
Для иерархических словарей возвращает массив ключей, содержащий ключ `id_expr` и все ключи родительских элементов по цепочке.
```
dictGetHierarchy('dict_name', id_expr)
```
**Параметры**
- `dict_name` — Название словаря. [Строковый литерал](../syntax.md#syntax-string-literal).
- `id_expr` — Значение ключа. [Выражение](../syntax.md#syntax-expressions) возвращает значение типа [UInt64](../../data_types/int_uint.md).
**Возвращаемое значение**
Иерархию ключей словаря.
Тип: [Array(UInt64)](../../data_types/array.md).
## dictIsIn
Осуществляет проверку - является ли ключ родительским во всей иерархической цепочке словаря.
`dictIsIn ('dict_name', child_id_expr, ancestor_id_expr)`
**Параметры**
- `dict_name` — Название словаря. [Строковый литерал](../syntax.md#syntax-string-literal).
- `child_id_expr` — Ключ который должен быть проверен. [Выражение](../syntax.md#syntax-expressions) возвращает значение типа [UInt64](../../data_types/int_uint.md).
- `ancestor_id_expr` — Родительский ключ для ключа `child_id_expr`. [Выражение](../syntax.md#syntax-expressions) возвращает значение типа [UInt64](../../data_types/int_uint.md).
**Возвращаемое значение**
- 0, если `child_id_expr` не является потомком для `ancestor_id_expr`.
- 1, если `child_id_expr` является потомком для `ancestor_id_expr` или если `child_id_expr` равен `ancestor_id_expr`.
Тип: `UInt8`.
## Другие функции {#ext_dict_functions-other}
ClickHouse поддерживает специализированные функции для конвертации значений атрибутов словаря к определенному типу, независимо от настроек словаря.
Функции:
- `dictGetInt8`, `dictGetInt16`, `dictGetInt32`, `dictGetInt64`
- `dictGetUInt8`, `dictGetUInt16`, `dictGetUInt32`, `dictGetUInt64`
- `dictGetFloat32`, `dictGetFloat64`
- `dictGetDate`
- `dictGetDateTime`
- `dictGetUUID`
- `dictGetString`
Все эти функции имеют так же `OrDefault` версию. Например, `dictGetDateOrDefault`.
Синтаксис:
```
dictGet[Тип]('dict_name', 'attr_name', id_expr)
dictGet[Тип]OrDefault('dict_name', 'attr_name', id_expr, default_value_expr)
```
**Параметры**
- `dict_name` — Название словаря. [Строковый литерал](../syntax.md#syntax-string-literal).
- `attr_name` — Название колонки словаря. [Строковый литерал](../syntax.md#syntax-string-literal).
- `id_expr` — Значение ключа. [Выражение](../syntax.md#syntax-expressions) возвращает значение типа [UInt64](../../data_types/int_uint.md).
- `default_value_expr` — Значение которое возвращается, если словарь не содержит строку с ключом `id_expr`. [Выражение](../syntax.md#syntax-expressions) возвращает значение с таким же типом, что и тип атрибута `attr_name`.
**Возвращаемое значение**
- Если ClickHouse успешно обрабатывает атрибут в соотвествии с указаным [типом данных](../dicts/external_dicts_dict_structure.md#ext_dict_structure-attributes),то функция возвращает значение для заданного ключа `id_expr`.
- Если запрашиваемого `id_expr` не оказалось в словаре:
- `dictGet[Тип]` возвратит содержимое элемента `<null_value>` определенного в настройках словаря.
- `dictGet[Тип]OrDefault` вернет значение переданного `default_value_expr` параметра.
ClickHouse бросает исключение, если не может обработать значение атрибута или значение несопоставимо с типом атрибута
[Оригинальная статья](https://clickhouse.yandex/docs/ru/query_language/functions/ext_dict_functions/) <!--hide--> [Оригинальная статья](https://clickhouse.yandex/docs/ru/query_language/functions/ext_dict_functions/) <!--hide-->

View File

@ -132,13 +132,17 @@ SELECT geohashEncode(-5.60302734375, 42.593994140625, 0) AS res
Декодирует любую строку, закодированную в geohash, на долготу и широту. Декодирует любую строку, закодированную в geohash, на долготу и широту.
```
geohashDecode(geohash_string)
```
**Входные значения** **Входные значения**
- encoded string — строка, содержащая geohash. - `geohash_string` — строка, содержащая geohash.
**Возвращаемые значения** **Возвращаемые значения**
- (longitude, latitude) — широта и долгота. Кортеж из двух значений типа `Float64`. - `(longitude, latitude)` — широта и долгота. Кортеж из двух значений типа `Float64`.
**Пример** **Пример**
@ -154,7 +158,7 @@ SELECT geohashDecode('ezs42') AS res
## geoToH3 ## geoToH3
Получает H3 индекс точки (lon, lat) с заданным разрешением Получает H3 индекс точки `(lon, lat)` с заданным разрешением
``` ```
geoToH3(lon, lat, resolution) geoToH3(lon, lat, resolution)
@ -162,15 +166,16 @@ geoToH3(lon, lat, resolution)
**Входные значения** **Входные значения**
- `lon` - географическая долгота. Тип данных — [Float64](../../data_types/float.md). - `lon` географическая долгота. Тип данных — [Float64](../../data_types/float.md).
- `lat` - географическая широта. Тип данных — [Float64](../../data_types/float.md). - `lat` географическая широта. Тип данных — [Float64](../../data_types/float.md).
- `resolution` - требуемое разрешение индекса. Тип данных — [UInt8](../../data_types/int_uint.md). Диапазон возможных значение`[0, 15]`. - `resolution` требуемое разрешение индекса. Тип данных — [UInt8](../../data_types/int_uint.md). Диапазон возможных значений`[0, 15]`.
**Возвращаемые значения** **Возвращаемые значения**
Возвращает значение с типом [UInt64] (../../data_types/int_uint.md). - Порядковый номер шестиугольника.
`0` в случае ошибки. - 0 в случае ошибки.
Иначе возвращается индексный номер шестиугольника.
Тип — [UInt64](../../data_types/int_uint.md).
**Пример** **Пример**

View File

@ -600,6 +600,39 @@ SELECT replicate(1, ['a', 'b', 'c'])
└───────────────────────────────┘ └───────────────────────────────┘
``` ```
## filesystemAvailable {#function-filesystemavailable}
Возвращает объем оставшегося места в файловой системе, в которой расположены файлы баз данных. Смотрите описание конфигурационного параметра сервера [path](../../operations/server_settings/settings.md#server_settings-path).
```
filesystemAvailable()
```
**Возвращаемое значение**
- Объем свободного места.
Тип — [UInt64](../../data_types/int_uint.md).
**Пример**
```sql
SELECT filesystemAvailable() AS "Free space", toTypeName(filesystemAvailable()) AS "Type"
```
```text
┌──Free space─┬─Type───┐
│ 18152624128 │ UInt64 │
└─────────────┴────────┘
```
## filesystemCapacity
Возвращает данные о ёмкости диска.
## finalizeAggregation {#function-finalizeaggregation}
Принимает состояние агрегатной функции. Возвращает результат агрегирования.
## runningAccumulate {#function-runningaccumulate} ## runningAccumulate {#function-runningaccumulate}
Принимает на вход состояния агрегатной функции и возвращает столбец со значениями, которые представляют собой результат мёржа этих состояний для выборки строк из блока от первой до текущей строки. Например, принимает состояние агрегатной функции (например, `runningAccumulate(uniqState(UserID))`), и для каждой строки блока возвращает результат агрегатной функции после мёржа состояний функции для всех предыдущих строк и текущей. Таким образом, результат зависит от разбиения данных по блокам и от порядка данных в блоке. Принимает на вход состояния агрегатной функции и возвращает столбец со значениями, которые представляют собой результат мёржа этих состояний для выборки строк из блока от первой до текущей строки. Например, принимает состояние агрегатной функции (например, `runningAccumulate(uniqState(UserID))`), и для каждой строки блока возвращает результат агрегатной функции после мёржа состояний функции для всех предыдущих строк и текущей. Таким образом, результат зависит от разбиения данных по блокам и от порядка данных в блоке.

View File

@ -637,7 +637,7 @@ SELECT replicate(1, ['a', 'b', 'c'])
返回磁盘的容量信息以字节为单位。使用配置文件中的path配置评估此信息。 返回磁盘的容量信息以字节为单位。使用配置文件中的path配置评估此信息。
## finalizeAggregation ## finalizeAggregation {#function-finalizeaggregation}
获取聚合函数的状态。返回聚合结果(最终状态)。 获取聚合函数的状态。返回聚合结果(最终状态)。

View File

@ -7,3 +7,4 @@
#cmakedefine01 USE_READLINE #cmakedefine01 USE_READLINE
#cmakedefine01 USE_LIBEDIT #cmakedefine01 USE_LIBEDIT
#cmakedefine01 HAVE_READLINE_HISTORY #cmakedefine01 HAVE_READLINE_HISTORY
#cmakedefine01 NOT_UNBUNDLED

View File

@ -4,41 +4,39 @@
#include <sstream> #include <sstream>
#include <Poco/Logger.h> #include <Poco/Logger.h>
#include <Poco/Message.h>
#include <Poco/Version.h>
#include <Core/SettingsCommon.h>
#include <Common/CurrentThread.h>
#ifndef QUERY_PREVIEW_LENGTH #ifndef QUERY_PREVIEW_LENGTH
#define QUERY_PREVIEW_LENGTH 160 #define QUERY_PREVIEW_LENGTH 160
#endif #endif
using Poco::Logger; using Poco::Logger;
using Poco::Message;
using DB::LogsLevel;
using DB::CurrentThread;
/// Logs a message to a specified logger with that level. /// Logs a message to a specified logger with that level.
#define LOG_TRACE(logger, message) do { \ #define LOG_SIMPLE(logger, message, priority, PRIORITY) do \
if ((logger)->trace()) {\ { \
std::stringstream oss_internal_rare; \ const bool is_clients_log = (CurrentThread::getGroup() != nullptr) && \
oss_internal_rare << message; \ (CurrentThread::getGroup()->client_logs_level >= (priority)); \
(logger)->trace(oss_internal_rare.str());}} while(false) if ((logger)->is((PRIORITY)) || is_clients_log) { \
std::stringstream oss_internal_rare; \
oss_internal_rare << message; \
if (auto channel = (logger)->getChannel()) { \
channel->log(Message((logger)->name(), oss_internal_rare.str(), (PRIORITY))); \
} \
} \
} while (false)
#define LOG_DEBUG(logger, message) do { \
if ((logger)->debug()) {\
std::stringstream oss_internal_rare; \
oss_internal_rare << message; \
(logger)->debug(oss_internal_rare.str());}} while(false)
#define LOG_INFO(logger, message) do { \ #define LOG_TRACE(logger, message) LOG_SIMPLE(logger, message, LogsLevel::trace, Message::PRIO_TRACE)
if ((logger)->information()) {\ #define LOG_DEBUG(logger, message) LOG_SIMPLE(logger, message, LogsLevel::debug, Message::PRIO_DEBUG)
std::stringstream oss_internal_rare; \ #define LOG_INFO(logger, message) LOG_SIMPLE(logger, message, LogsLevel::information, Message::PRIO_INFORMATION)
oss_internal_rare << message; \ #define LOG_WARNING(logger, message) LOG_SIMPLE(logger, message, LogsLevel::warning, Message::PRIO_WARNING)
(logger)->information(oss_internal_rare.str());}} while(false) #define LOG_ERROR(logger, message) LOG_SIMPLE(logger, message, LogsLevel::error, Message::PRIO_ERROR)
#define LOG_WARNING(logger, message) do { \
if ((logger)->warning()) {\
std::stringstream oss_internal_rare; \
oss_internal_rare << message; \
(logger)->warning(oss_internal_rare.str());}} while(false)
#define LOG_ERROR(logger, message) do { \
if ((logger)->error()) {\
std::stringstream oss_internal_rare; \
oss_internal_rare << message; \
(logger)->error(oss_internal_rare.str());}} while(false)

View File

@ -0,0 +1,65 @@
#pragma once
#include <new>
#include <common/likely.h>
#if __has_include(<common/config_common.h>)
#include <common/config_common.h>
#endif
#if USE_JEMALLOC
#include <jemalloc/jemalloc.h>
#if JEMALLOC_VERSION_MAJOR < 4
#undef USE_JEMALLOC
#define USE_JEMALLOC 0
#include <cstdlib>
#endif
#endif
#define ALWAYS_INLINE inline __attribute__((__always_inline__))
#define NO_INLINE __attribute__((__noinline__))
namespace Memory
{
ALWAYS_INLINE void * newImpl(std::size_t size)
{
auto * ptr = malloc(size);
if (likely(ptr != nullptr))
return ptr;
/// @note no std::get_new_handler logic implemented
throw std::bad_alloc{};
}
ALWAYS_INLINE void * newNoExept(std::size_t size) noexcept
{
return malloc(size);
}
ALWAYS_INLINE void deleteImpl(void * ptr) noexcept
{
free(ptr);
}
#if USE_JEMALLOC
ALWAYS_INLINE void deleteSized(void * ptr, std::size_t size) noexcept
{
if (unlikely(ptr == nullptr))
return;
sdallocx(ptr, size, 0);
}
#else
ALWAYS_INLINE void deleteSized(void * ptr, std::size_t size [[maybe_unused]]) noexcept
{
free(ptr);
}
#endif
}

View File

@ -24,6 +24,7 @@ if (NOT DEFINED ENABLE_UTILS OR ENABLE_UTILS)
add_subdirectory (zookeeper-copy-tree) add_subdirectory (zookeeper-copy-tree)
add_subdirectory (zookeeper-remove-by-list) add_subdirectory (zookeeper-remove-by-list)
add_subdirectory (zookeeper-create-entry-to-download-part) add_subdirectory (zookeeper-create-entry-to-download-part)
add_subdirectory (zookeeper-adjust-block-numbers-to-parts)
add_subdirectory (wikistat-loader) add_subdirectory (wikistat-loader)
add_subdirectory (fill-factor) add_subdirectory (fill-factor)
add_subdirectory (check-marks) add_subdirectory (check-marks)

View File

@ -1,20 +1,62 @@
#include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h> #include <Storages/MergeTree/ReplicatedMergeTreeTableMetadata.h>
#include <Storages/MergeTree/MergeTreePartInfo.h> #include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Common/ZooKeeper/ZooKeeper.h> #include <Common/ZooKeeper/ZooKeeper.h>
#include <boost/algorithm/string.hpp>
#include <boost/program_options.hpp> #include <boost/program_options.hpp>
#include <IO/ReadHelpers.h> #include <IO/ReadHelpers.h>
#include <unordered_map> #include <unordered_map>
#include <cmath> #include <cmath>
size_t getMaxBlockSizeForPartition(zkutil::ZooKeeper & zk,
std::vector<std::string> getAllShards(zkutil::ZooKeeper & zk, const std::string & root)
{
return zk.getChildren(root);
}
std::vector<std::string> removeNotExistingShards(zkutil::ZooKeeper & zk, const std::string & root, const std::vector<std::string> & shards)
{
auto existing_shards = getAllShards(zk, root);
std::vector<std::string> filtered_shards;
filtered_shards.reserve(shards.size());
for (const auto & shard : shards)
if (std::find(existing_shards.begin(), existing_shards.end(), shard) == existing_shards.end())
std::cerr << "Shard " << shard << " not found." << std::endl;
else
filtered_shards.emplace_back(shard);
return filtered_shards;
}
std::vector<std::string> getAllTables(zkutil::ZooKeeper & zk, const std::string & root, const std::string & shard)
{
return zk.getChildren(root + "/" + shard);
}
std::vector<std::string> removeNotExistingTables(zkutil::ZooKeeper & zk, const std::string & root, const std::string & shard, const std::vector<std::string> & tables)
{
auto existing_tables = getAllTables(zk, root, shard);
std::vector<std::string> filtered_tables;
filtered_tables.reserve(tables.size());
for (const auto & table : tables)
if (std::find(existing_tables.begin(), existing_tables.end(), table) == existing_tables.end())
std::cerr << "\tTable " << table << " not found on shard " << shard << "." << std::endl;
else
filtered_tables.emplace_back(table);
return filtered_tables;
}
Int64 getMaxBlockNumberForPartition(zkutil::ZooKeeper & zk,
const std::string & replica_path, const std::string & replica_path,
const std::string & partition_name, const std::string & partition_name,
const DB::MergeTreeDataFormatVersion & format_version) const DB::MergeTreeDataFormatVersion & format_version)
{ {
auto replicas_path = replica_path + "/replicas"; auto replicas_path = replica_path + "/replicas";
auto replica_hosts = zk.getChildren(replicas_path); auto replica_hosts = zk.getChildren(replicas_path);
size_t max_block_num = 0; Int64 max_block_num = 0;
for (const auto & replica_host : replica_hosts) for (const auto & replica_host : replica_hosts)
{ {
auto parts = zk.getChildren(replicas_path + "/" + replica_host + "/parts"); auto parts = zk.getChildren(replicas_path + "/" + replica_host + "/parts");
@ -24,40 +66,78 @@ size_t getMaxBlockSizeForPartition(zkutil::ZooKeeper & zk,
{ {
auto info = DB::MergeTreePartInfo::fromPartName(part, format_version); auto info = DB::MergeTreePartInfo::fromPartName(part, format_version);
if (info.partition_id == partition_name) if (info.partition_id == partition_name)
max_block_num = std::max<UInt64>(info.max_block, max_block_num); max_block_num = std::max<Int64>(info.max_block, max_block_num);
} }
catch (const DB::Exception & ex) catch (const DB::Exception & ex)
{ {
std::cerr << "Exception on: " << ex.displayText() << " will skip part: " << part << std::endl; std::cerr << ex.displayText() << ", Part " << part << "skipped." << std::endl;
} }
} }
} }
return max_block_num; return max_block_num;
} }
std::unordered_map<std::string, size_t> getAllTablesBlockPaths(zkutil::ZooKeeper & zk, const std::string & root)
Int64 getCurrentBlockNumberForPartition(zkutil::ZooKeeper & zk, const std::string & part_path)
{ {
std::unordered_map<std::string, size_t> result; Coordination::Stat stat;
auto shards = zk.getChildren(root); zk.get(part_path, &stat);
for (const auto & shard : shards)
/// References:
/// https://stackoverflow.com/a/10347910
/// https://bowenli86.github.io/2016/07/07/distributed%20system/zookeeper/How-does-ZooKeeper-s-persistent-sequential-id-work/
return (stat.cversion + stat.numChildren) / 2;
}
std::unordered_map<std::string, Int64> getPartitionsNeedAdjustingBlockNumbers(
zkutil::ZooKeeper & zk, const std::string & root, const std::vector<std::string> & shards, const std::vector<std::string> & tables)
{
std::unordered_map<std::string, Int64> result;
std::vector<std::string> use_shards = shards.empty() ? getAllShards(zk, root) : removeNotExistingShards(zk, root, shards);
for (const auto & shard : use_shards)
{ {
std::string shard_path = root + "/" + shard; std::cout << "Shard: " << shard << std::endl;
auto tables = zk.getChildren(shard_path); std::vector<std::string> use_tables = tables.empty() ? getAllTables(zk, root, shard) : removeNotExistingTables(zk, root, shard, tables);
for (auto table : tables)
for (auto table : use_tables)
{ {
std::cerr << "Searching for nodes in: " << table << std::endl; std::cout << "\tTable: " << table << std::endl;
std::string table_path = shard_path + "/" + table; std::string table_path = root + "/" + shard + "/" + table;
auto format_version = DB::ReplicatedMergeTreeTableMetadata::parse(zk.get(table_path + "/metadata")).data_format_version;
std::string blocks_path = table_path + "/block_numbers"; std::string blocks_path = table_path + "/block_numbers";
auto partitions = zk.getChildren(blocks_path);
if (!partitions.empty()) std::vector<std::string> partitions;
DB::MergeTreeDataFormatVersion format_version;
try
{ {
for (auto partition : partitions) format_version = DB::ReplicatedMergeTreeTableMetadata::parse(zk.get(table_path + "/metadata")).data_format_version;
partitions = zk.getChildren(blocks_path);
}
catch (const DB::Exception & ex)
{
std::cerr << ex.displayText() << ", table " << table << " skipped." << std::endl;
continue;
}
for (auto partition : partitions)
{
try
{ {
std::string part_path = blocks_path + "/" + partition; std::string part_path = blocks_path + "/" + partition;
size_t partition_max_block = getMaxBlockSizeForPartition(zk, table_path, partition, format_version); Int64 partition_max_block = getMaxBlockNumberForPartition(zk, table_path, partition, format_version);
std::cerr << "\tFound max block number: " << partition_max_block << " for part: " << partition << std::endl; Int64 current_block_number = getCurrentBlockNumberForPartition(zk, part_path);
result.emplace(part_path, partition_max_block); if (current_block_number < partition_max_block + 1)
{
std::cout << "\t\tPartition: " << partition << ": current block_number: " << current_block_number
<< ", max block number: " << partition_max_block << ". Adjusting is required." << std::endl;
result.emplace(part_path, partition_max_block);
}
}
catch (const DB::Exception & ex)
{
std::cerr << ex.displayText() << ", partition " << partition << " skipped." << std::endl;
} }
} }
} }
@ -66,67 +146,137 @@ std::unordered_map<std::string, size_t> getAllTablesBlockPaths(zkutil::ZooKeeper
} }
void rotateNodes(zkutil::ZooKeeper & zk, const std::string & path, size_t max_block_num) void setCurrentBlockNumber(zkutil::ZooKeeper & zk, const std::string & path, Int64 new_current_block_number)
{ {
Coordination::Requests requests; Int64 current_block_number = getCurrentBlockNumberForPartition(zk, path);
std::string block_prefix = path + "/block-";
std::string current = zk.create(block_prefix, "", zkutil::CreateMode::EphemeralSequential);
size_t current_block_num = DB::parse<UInt64>(current.c_str() + block_prefix.size(), current.size() - block_prefix.size());
if (current_block_num >= max_block_num)
{
std::cerr << "Nothing to rotate, current block num: " << current_block_num << " max_block_num:" << max_block_num << std::endl;
return;
}
size_t need_to_rotate = max_block_num - current_block_num; auto create_ephemeral_nodes = [&](size_t count)
std::cerr << "Will rotate: " << need_to_rotate << " block numbers from " << current_block_num << " to " << max_block_num << std::endl;
for (size_t i = 0; i < need_to_rotate; ++i)
{ {
if (requests.size() == 50) std::string block_prefix = path + "/block-";
Coordination::Requests requests;
requests.reserve(count);
for (size_t i = 0; i != count; ++i)
requests.emplace_back(zkutil::makeCreateRequest(block_prefix, "", zkutil::CreateMode::EphemeralSequential));
auto responses = zk.multi(requests);
std::vector<std::string> paths_created;
paths_created.reserve(responses.size());
for (const auto & response : responses)
{ {
std::cerr << "Rotating: " << i << " block numbers" << std::endl; const auto * create_response = dynamic_cast<Coordination::CreateResponse*>(response.get());
zk.multi(requests); if (!create_response)
requests.clear(); {
std::cerr << "\tCould not create ephemeral node " << block_prefix << std::endl;
return false;
}
paths_created.emplace_back(create_response->path_created);
} }
requests.emplace_back(zkutil::makeCreateRequest(path + "/block-", "", zkutil::CreateMode::EphemeralSequential));
} std::sort(paths_created.begin(), paths_created.end());
if (!requests.empty()) for (const auto & path_created : paths_created)
{ {
zk.multi(requests); Int64 number = DB::parse<Int64>(path_created.c_str() + block_prefix.size(), path_created.size() - block_prefix.size());
} if (number != current_block_number)
{
char suffix[11] = "";
sprintf(suffix, "%010ld", current_block_number);
std::string expected_path = block_prefix + suffix;
std::cerr << "\t" << path_created << ": Ephemeral node has been created with an unexpected path (expected something like "
<< expected_path << ")." << std::endl;
return false;
}
std::cout << "\t" << path_created << std::endl;
++current_block_number;
}
return true;
};
if (current_block_number >= new_current_block_number)
return;
std::cout << "Creating ephemeral sequential nodes:" << std::endl;
create_ephemeral_nodes(1); /// Firstly try to create just a single node.
/// Create other nodes in batches of 50 nodes.
while (current_block_number + 50 <= new_current_block_number)
create_ephemeral_nodes(50);
create_ephemeral_nodes(new_current_block_number - current_block_number);
} }
int main(int argc, char ** argv) int main(int argc, char ** argv)
try try
{ {
boost::program_options::options_description desc("Allowed options"); /// Parse the command line.
namespace po = boost::program_options;
po::options_description desc("Allowed options");
desc.add_options() desc.add_options()
("help,h", "produce help message") ("help,h", "show help")
("address,a", boost::program_options::value<std::string>()->required(), "addresses of ZooKeeper instances, comma separated. Example: example01e.yandex.ru:2181") ("zookeeper,z", po::value<std::string>(), "Addresses of ZooKeeper instances, comma-separated. Example: example01e.yandex.ru:2181")
("path,p", boost::program_options::value<std::string>()->required(), "path of replica queue to insert node (without trailing slash)"); ("path,p", po::value<std::string>(), "[optional] Path of replica queue to insert node (without trailing slash). By default it's /clickhouse/tables")
("shard,s", po::value<std::string>(), "[optional] Shards to process, comma-separated. If not specified then the utility will process all the shards.")
("table,t", po::value<std::string>(), "[optional] Tables to process, comma-separated. If not specified then the utility will process all the tables.")
("dry-run", "[optional] Specify if you want this utility just to analyze block numbers without any changes.");
boost::program_options::variables_map options; po::variables_map options;
boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), options); po::store(po::parse_command_line(argc, argv, desc), options);
if (options.count("help")) auto show_usage = [&]
{ {
std::cout << "Util for /block_numbers node adjust with max block number in partition" << std::endl; std::cout << "Usage: " << std::endl;
std::cout << "Usage: " << argv[0] << " [options]" << std::endl; std::cout << " " << argv[0] << " [options]" << std::endl;
std::cout << desc << std::endl; std::cout << desc << std::endl;
};
if (options.count("help") || (argc == 1))
{
std::cout << "This utility adjusts the /block_numbers zookeeper nodes to the correct block number in partition." << std::endl;
std::cout << "It might be useful when incorrect block numbers stored in zookeeper don't allow you to insert data into a table or drop/detach a partition." << std::endl;
show_usage();
return 0;
}
if (!options.count("zookeeper"))
{
std::cerr << "Option --zookeeper should be set." << std::endl;
show_usage();
return 1; return 1;
} }
std::string global_path = options.at("path").as<std::string>(); std::string root = options.count("path") ? options.at("path").as<std::string>() : "/clickhouse/tables";
zkutil::ZooKeeper zookeeper(options.at("address").as<std::string>()); std::vector<std::string> shards, tables;
if (options.count("shard"))
boost::split(shards, options.at("shard").as<std::string>(), boost::algorithm::is_any_of(","));
if (options.count("table"))
boost::split(tables, options.at("table").as<std::string>(), boost::algorithm::is_any_of(","));
auto all_path = getAllTablesBlockPaths(zookeeper, global_path); /// Check if the adjusting of the block numbers is required.
for (const auto & [path, max_block_num] : all_path) std::cout << "Checking if adjusting of the block numbers is required:" << std::endl;
zkutil::ZooKeeper zookeeper(options.at("zookeeper").as<std::string>());
auto part_paths_with_max_block_numbers = getPartitionsNeedAdjustingBlockNumbers(zookeeper, root, shards, tables);
if (part_paths_with_max_block_numbers.empty())
{ {
std::cerr << "Rotating on: " << path << std::endl; std::cout << "No adjusting required." << std::endl;
rotateNodes(zookeeper, path, max_block_num); return 0;
} }
std::cout << "Required adjusting of " << part_paths_with_max_block_numbers.size() << " block numbers." << std::endl;
/// Adjust the block numbers.
if (options.count("dry-run"))
{
std::cout << "This is a dry-run, exiting." << std::endl;
return 0;
}
std::cout << std::endl << "Adjusting the block numbers:" << std::endl;
for (const auto & [part_path, max_block_number] : part_paths_with_max_block_numbers)
setCurrentBlockNumber(zookeeper, part_path, max_block_number + 1);
return 0; return 0;
} }
catch (const Poco::Exception & e) catch (const Poco::Exception & e)