Merge branch 'master' into decimal_hex_function

This commit is contained in:
millb 2019-10-21 11:13:51 +03:00
commit 2a183c3d07
415 changed files with 6379 additions and 1907 deletions

View File

@ -1,2 +0,0 @@
- regExp: ".*\\.md$"
labels: ["documentation", "pr-documentation"]

1
.github/labeler.keywords.yml vendored Normal file
View File

@ -0,0 +1 @@
pr-feature: "New Feature"

23
.github/labeler.yml vendored Normal file
View File

@ -0,0 +1,23 @@
# Build changes
pr-build:
- "**/CMakeLists.txt"
# Documentation PRs
documentation:
- "**/*.md"
- "docs/**/*"
pr-documentation:
- "**/*.md"
- "docs/**/*"
# Component labels
comp-mutations:
- "**/*Mutation*"
comp-matview:
- "**/*MaterializedView*"
comp-skipidx:
- "**/*Indices*"
comp-kafka:
- "dbms/src/Storages/Kafka/**/*"
- "dbms/tests/integration/test_storage_kafka/**/*"
- "utils/kafka/**/*"

View File

@ -1,9 +0,0 @@
workflow "Main workflow" {
resolves = ["Label PR"]
on = "pull_request"
}
action "Label PR" {
uses = "decathlon/pull-request-labeler-action@v1.0.0"
secrets = ["GITHUB_TOKEN"]
}

67
.github/stale.yml vendored Normal file
View File

@ -0,0 +1,67 @@
# Configuration for probot-stale - https://github.com/probot/stale
# Number of days of inactivity before an Issue or Pull Request becomes stale
daysUntilStale: 45
# Number of days of inactivity before an Issue or Pull Request with the stale label is closed.
# Set to false to disable. If disabled, issues still need to be closed manually, but will remain marked as stale.
daysUntilClose: 30
# Only issues or pull requests with all of these labels are check if stale. Defaults to `[]` (disabled)
onlyLabels: []
# Issues or Pull Requests with these labels will never be considered stale. Set to `[]` to disable
exemptLabels:
- bug
- feature
- memory
- performance
- prio-crit
- prio-major
- st-accepted
- st-in-progress
- st-waiting-for-fix
# Set to true to ignore issues in a project (defaults to false)
exemptProjects: false
# Set to true to ignore issues in a milestone (defaults to false)
exemptMilestones: false
# Set to true to ignore issues with an assignee (defaults to false)
exemptAssignees: false
# Label to use when marking as stale
staleLabel: stale
# Comment to post when marking as stale. Set to `false` to disable
markComment: >
This issue has been automatically marked as stale because it has not had
recent activity. It will be closed if no further activity occurs. Thank you
for your contributions.
# Comment to post when removing the stale label.
# unmarkComment: >
# Your comment here.
# Comment to post when closing a stale Issue or Pull Request.
# closeComment: >
# Your comment here.
# Limit the number of actions per hour, from 1-30. Default is 30
limitPerRun: 30
# Limit to only `issues` or `pulls`
# only: issues
# Optionally, specify configuration settings that are specific to just 'issues' or 'pulls':
pulls:
daysUntilStale: 365
markComment: >
This pull request has been automatically marked as stale because it has not had
any activity for over a year. It will be closed if no further activity occurs. Thank you
for your contributions.
# issues:
# exemptLabels:
# - confirmed

11
.github/workflows/labeler.yml vendored Normal file
View File

@ -0,0 +1,11 @@
name: "Pull Request Labeler"
on:
pull_request
jobs:
by-filename:
runs-on: ubuntu-latest
steps:
- uses: "actions/labeler@v2"
with:
repo-token: "${{ secrets.GITHUB_TOKEN }}"

27
.potato.yml Normal file
View File

@ -0,0 +1,27 @@
# This is the configuration file with settings for Potato.
# Potato is an internal Yandex technology that allows us to sync internal [Yandex.Tracker](https://yandex.com/tracker/) and GitHub.
# For all PRs where documentation is needed, just add a 'pr-feature' label and we will include it into documentation sprints.
# The project name.
name: clickhouse
# Object handlers defines which handlers we use.
handlers:
# The handler for creating an Yandex.Tracker issue.
- name: issue-create
params:
triggers:
# The trigger for creating the Yandex.Tracker issue. When the specified event occurs, it transfers PR data to Yandex.Tracker.
github:pullRequest:labeled:
data:
# The Yandex.Tracker queue to create the issue in. Each issue in Tracker belongs to one of the project queues.
queue: CLICKHOUSEDOCS
# The issue title.
summary: '[Potato] Pull Request #{{pullRequest.number}}'
# The issue description.
description: >
{{pullRequest.description}}
Ссылка на Pull Request: {{pullRequest.webUrl}}
# The condition for creating the Yandex.Tracker issue.
condition: eventPayload.labels.filter(label => ['pr-feature'].includes(label.name)).length

View File

@ -118,16 +118,16 @@ endif ()
option (ENABLE_TESTS "Enables tests" ON)
if (CMAKE_SYSTEM_PROCESSOR MATCHES "amd64|x86_64")
if (ARCH_AMD64)
option (USE_INTERNAL_MEMCPY "Use internal implementation of 'memcpy' function instead of provided by libc. Only for x86_64." ON)
endif ()
if (OS_LINUX AND NOT UNBUNDLED AND MAKE_STATIC_LIBRARIES AND NOT SPLIT_SHARED_LIBRARIES AND CMAKE_VERSION VERSION_GREATER "3.9.0")
option (GLIBC_COMPATIBILITY "Set to TRUE to enable compatibility with older glibc libraries. Only for x86_64, Linux. Implies USE_INTERNAL_MEMCPY." ON)
endif ()
if (OS_LINUX AND NOT UNBUNDLED AND MAKE_STATIC_LIBRARIES AND NOT SPLIT_SHARED_LIBRARIES AND CMAKE_VERSION VERSION_GREATER "3.9.0")
option (GLIBC_COMPATIBILITY "Set to TRUE to enable compatibility with older glibc libraries. Only for x86_64, Linux. Implies USE_INTERNAL_MEMCPY." ON)
endif ()
if (NOT CMAKE_VERSION VERSION_GREATER "3.9.0")
message (WARNING "CMake version must be greater than 3.9.0 for production builds.")
endif ()
if (NOT CMAKE_VERSION VERSION_GREATER "3.9.0")
message (WARNING "CMake version must be greater than 3.9.0 for production builds.")
endif ()
# Make sure the final executable has symbols exported

View File

@ -13,8 +13,6 @@ ClickHouse is an open-source column-oriented database management system that all
* You can also [fill this form](https://forms.yandex.com/surveys/meet-yandex-clickhouse-team/) to meet Yandex ClickHouse team in person.
## Upcoming Events
* [ClickHouse Meetup in Hong Kong](https://www.meetup.com/Hong-Kong-Machine-Learning-Meetup/events/263580542/) on October 17.
* [ClickHouse Meetup in Shenzhen](https://www.huodongxing.com/event/3483759917300) on October 20.
* [ClickHouse Meetup in Shanghai](https://www.huodongxing.com/event/4483760336000) on October 27.
* [ClickHouse Meetup in Tokyo](https://clickhouse.connpass.com/event/147001/) on November 14.
* [ClickHouse Meetup in Istanbul](https://www.eventbrite.com/e/clickhouse-meetup-istanbul-create-blazing-fast-experiences-w-clickhouse-tickets-73101120419) on November 19.

View File

@ -1,3 +1,6 @@
if (CMAKE_SYSTEM_PROCESSOR MATCHES "amd64|x86_64")
set (ARCH_AMD64 1)
endif ()
if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(aarch64.*|AARCH64.*)")
set (ARCH_AARCH64 1)
endif ()

View File

@ -45,9 +45,15 @@ endif ()
add_library(jemalloc STATIC ${SRCS})
target_include_directories(jemalloc PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/include
${CMAKE_CURRENT_SOURCE_DIR}/include_linux_x86_64) # jemalloc.h
target_include_directories(jemalloc PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include)
if (ARCH_AMD64)
target_include_directories(jemalloc PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include_linux_x86_64)
elseif (ARCH_ARM)
target_include_directories(jemalloc PUBLIC ${CMAKE_CURRENT_SOURCE_DIR}/include_linux_aarch64)
else ()
message (FATAL_ERROR "jemalloc can only be used on x86_64 or aarch64.")
endif ()
target_include_directories(jemalloc PRIVATE
${JEMALLOC_SOURCE_DIR}/include)

View File

@ -0,0 +1,7 @@
Here are pre-generated files from jemalloc on Linux aarch64.
You can obtain these files by running ./autogen.sh inside jemalloc source directory.
Added #define GNU_SOURCE
Added JEMALLOC_OVERRIDE___POSIX_MEMALIGN because why not.
Removed JEMALLOC_HAVE_ATTR_FORMAT_GNU_PRINTF because it's non standard.
Removed JEMALLOC_PURGE_MADVISE_FREE because it's available only from Linux 4.5.

View File

@ -0,0 +1,382 @@
/* include/jemalloc/internal/jemalloc_internal_defs.h. Generated from jemalloc_internal_defs.h.in by configure. */
#ifndef JEMALLOC_INTERNAL_DEFS_H_
#define JEMALLOC_INTERNAL_DEFS_H_
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
/*
* If JEMALLOC_PREFIX is defined via --with-jemalloc-prefix, it will cause all
* public APIs to be prefixed. This makes it possible, with some care, to use
* multiple allocators simultaneously.
*/
/* #undef JEMALLOC_PREFIX */
/* #undef JEMALLOC_CPREFIX */
/*
* Define overrides for non-standard allocator-related functions if they are
* present on the system.
*/
#define JEMALLOC_OVERRIDE___LIBC_CALLOC
#define JEMALLOC_OVERRIDE___LIBC_FREE
#define JEMALLOC_OVERRIDE___LIBC_MALLOC
#define JEMALLOC_OVERRIDE___LIBC_MEMALIGN
#define JEMALLOC_OVERRIDE___LIBC_REALLOC
#define JEMALLOC_OVERRIDE___LIBC_VALLOC
#define JEMALLOC_OVERRIDE___POSIX_MEMALIGN
/*
* JEMALLOC_PRIVATE_NAMESPACE is used as a prefix for all library-private APIs.
* For shared libraries, symbol visibility mechanisms prevent these symbols
* from being exported, but for static libraries, naming collisions are a real
* possibility.
*/
#define JEMALLOC_PRIVATE_NAMESPACE je_
/*
* Hyper-threaded CPUs may need a special instruction inside spin loops in
* order to yield to another virtual CPU.
*/
#define CPU_SPINWAIT
/* 1 if CPU_SPINWAIT is defined, 0 otherwise. */
#define HAVE_CPU_SPINWAIT 0
/*
* Number of significant bits in virtual addresses. This may be less than the
* total number of bits in a pointer, e.g. on x64, for which the uppermost 16
* bits are the same as bit 47.
*/
#define LG_VADDR 48
/* Defined if C11 atomics are available. */
#define JEMALLOC_C11_ATOMICS 1
/* Defined if GCC __atomic atomics are available. */
#define JEMALLOC_GCC_ATOMIC_ATOMICS 1
/* Defined if GCC __sync atomics are available. */
#define JEMALLOC_GCC_SYNC_ATOMICS 1
/*
* Defined if __sync_add_and_fetch(uint32_t *, uint32_t) and
* __sync_sub_and_fetch(uint32_t *, uint32_t) are available, despite
* __GCC_HAVE_SYNC_COMPARE_AND_SWAP_4 not being defined (which means the
* functions are defined in libgcc instead of being inlines).
*/
/* #undef JE_FORCE_SYNC_COMPARE_AND_SWAP_4 */
/*
* Defined if __sync_add_and_fetch(uint64_t *, uint64_t) and
* __sync_sub_and_fetch(uint64_t *, uint64_t) are available, despite
* __GCC_HAVE_SYNC_COMPARE_AND_SWAP_8 not being defined (which means the
* functions are defined in libgcc instead of being inlines).
*/
/* #undef JE_FORCE_SYNC_COMPARE_AND_SWAP_8 */
/*
* Defined if __builtin_clz() and __builtin_clzl() are available.
*/
#define JEMALLOC_HAVE_BUILTIN_CLZ
/*
* Defined if os_unfair_lock_*() functions are available, as provided by Darwin.
*/
/* #undef JEMALLOC_OS_UNFAIR_LOCK */
/*
* Defined if OSSpin*() functions are available, as provided by Darwin, and
* documented in the spinlock(3) manual page.
*/
/* #undef JEMALLOC_OSSPIN */
/* Defined if syscall(2) is usable. */
#define JEMALLOC_USE_SYSCALL
/*
* Defined if secure_getenv(3) is available.
*/
#define JEMALLOC_HAVE_SECURE_GETENV
/*
* Defined if issetugid(2) is available.
*/
/* #undef JEMALLOC_HAVE_ISSETUGID */
/* Defined if pthread_atfork(3) is available. */
#define JEMALLOC_HAVE_PTHREAD_ATFORK
/* Defined if pthread_setname_np(3) is available. */
#define JEMALLOC_HAVE_PTHREAD_SETNAME_NP
/*
* Defined if clock_gettime(CLOCK_MONOTONIC_COARSE, ...) is available.
*/
#define JEMALLOC_HAVE_CLOCK_MONOTONIC_COARSE 1
/*
* Defined if clock_gettime(CLOCK_MONOTONIC, ...) is available.
*/
#define JEMALLOC_HAVE_CLOCK_MONOTONIC 1
/*
* Defined if mach_absolute_time() is available.
*/
/* #undef JEMALLOC_HAVE_MACH_ABSOLUTE_TIME */
/*
* Defined if _malloc_thread_cleanup() exists. At least in the case of
* FreeBSD, pthread_key_create() allocates, which if used during malloc
* bootstrapping will cause recursion into the pthreads library. Therefore, if
* _malloc_thread_cleanup() exists, use it as the basis for thread cleanup in
* malloc_tsd.
*/
/* #undef JEMALLOC_MALLOC_THREAD_CLEANUP */
/*
* Defined if threaded initialization is known to be safe on this platform.
* Among other things, it must be possible to initialize a mutex without
* triggering allocation in order for threaded allocation to be safe.
*/
#define JEMALLOC_THREADED_INIT
/*
* Defined if the pthreads implementation defines
* _pthread_mutex_init_calloc_cb(), in which case the function is used in order
* to avoid recursive allocation during mutex initialization.
*/
/* #undef JEMALLOC_MUTEX_INIT_CB */
/* Non-empty if the tls_model attribute is supported. */
#define JEMALLOC_TLS_MODEL __attribute__((tls_model("initial-exec")))
/*
* JEMALLOC_DEBUG enables assertions and other sanity checks, and disables
* inline functions.
*/
/* #undef JEMALLOC_DEBUG */
/* JEMALLOC_STATS enables statistics calculation. */
#define JEMALLOC_STATS
/* JEMALLOC_EXPERIMENTAL_SMALLOCX_API enables experimental smallocx API. */
/* #undef JEMALLOC_EXPERIMENTAL_SMALLOCX_API */
/* JEMALLOC_PROF enables allocation profiling. */
/* #undef JEMALLOC_PROF */
/* Use libunwind for profile backtracing if defined. */
/* #undef JEMALLOC_PROF_LIBUNWIND */
/* Use libgcc for profile backtracing if defined. */
/* #undef JEMALLOC_PROF_LIBGCC */
/* Use gcc intrinsics for profile backtracing if defined. */
/* #undef JEMALLOC_PROF_GCC */
/*
* JEMALLOC_DSS enables use of sbrk(2) to allocate extents from the data storage
* segment (DSS).
*/
#define JEMALLOC_DSS
/* Support memory filling (junk/zero). */
#define JEMALLOC_FILL
/* Support utrace(2)-based tracing. */
/* #undef JEMALLOC_UTRACE */
/* Support optional abort() on OOM. */
/* #undef JEMALLOC_XMALLOC */
/* Support lazy locking (avoid locking unless a second thread is launched). */
/* #undef JEMALLOC_LAZY_LOCK */
/*
* Minimum allocation alignment is 2^LG_QUANTUM bytes (ignoring tiny size
* classes).
*/
/* #undef LG_QUANTUM */
/* One page is 2^LG_PAGE bytes. */
#define LG_PAGE 16
/*
* One huge page is 2^LG_HUGEPAGE bytes. Note that this is defined even if the
* system does not explicitly support huge pages; system calls that require
* explicit huge page support are separately configured.
*/
#define LG_HUGEPAGE 29
/*
* If defined, adjacent virtual memory mappings with identical attributes
* automatically coalesce, and they fragment when changes are made to subranges.
* This is the normal order of things for mmap()/munmap(), but on Windows
* VirtualAlloc()/VirtualFree() operations must be precisely matched, i.e.
* mappings do *not* coalesce/fragment.
*/
#define JEMALLOC_MAPS_COALESCE
/*
* If defined, retain memory for later reuse by default rather than using e.g.
* munmap() to unmap freed extents. This is enabled on 64-bit Linux because
* common sequences of mmap()/munmap() calls will cause virtual memory map
* holes.
*/
#define JEMALLOC_RETAIN
/* TLS is used to map arenas and magazine caches to threads. */
#define JEMALLOC_TLS
/*
* Used to mark unreachable code to quiet "end of non-void" compiler warnings.
* Don't use this directly; instead use unreachable() from util.h
*/
#define JEMALLOC_INTERNAL_UNREACHABLE __builtin_unreachable
/*
* ffs*() functions to use for bitmapping. Don't use these directly; instead,
* use ffs_*() from util.h.
*/
#define JEMALLOC_INTERNAL_FFSLL __builtin_ffsll
#define JEMALLOC_INTERNAL_FFSL __builtin_ffsl
#define JEMALLOC_INTERNAL_FFS __builtin_ffs
/*
* If defined, explicitly attempt to more uniformly distribute large allocation
* pointer alignments across all cache indices.
*/
#define JEMALLOC_CACHE_OBLIVIOUS
/*
* If defined, enable logging facilities. We make this a configure option to
* avoid taking extra branches everywhere.
*/
/* #undef JEMALLOC_LOG */
/*
* If defined, use readlinkat() (instead of readlink()) to follow
* /etc/malloc_conf.
*/
/* #undef JEMALLOC_READLINKAT */
/*
* Darwin (OS X) uses zones to work around Mach-O symbol override shortcomings.
*/
/* #undef JEMALLOC_ZONE */
/*
* Methods for determining whether the OS overcommits.
* JEMALLOC_PROC_SYS_VM_OVERCOMMIT_MEMORY: Linux's
* /proc/sys/vm.overcommit_memory file.
* JEMALLOC_SYSCTL_VM_OVERCOMMIT: FreeBSD's vm.overcommit sysctl.
*/
/* #undef JEMALLOC_SYSCTL_VM_OVERCOMMIT */
#define JEMALLOC_PROC_SYS_VM_OVERCOMMIT_MEMORY
/* Defined if madvise(2) is available. */
#define JEMALLOC_HAVE_MADVISE
/*
* Defined if transparent huge pages are supported via the MADV_[NO]HUGEPAGE
* arguments to madvise(2).
*/
#define JEMALLOC_HAVE_MADVISE_HUGE
/*
* Methods for purging unused pages differ between operating systems.
*
* madvise(..., MADV_FREE) : This marks pages as being unused, such that they
* will be discarded rather than swapped out.
* madvise(..., MADV_DONTNEED) : If JEMALLOC_PURGE_MADVISE_DONTNEED_ZEROS is
* defined, this immediately discards pages,
* such that new pages will be demand-zeroed if
* the address region is later touched;
* otherwise this behaves similarly to
* MADV_FREE, though typically with higher
* system overhead.
*/
#define JEMALLOC_PURGE_MADVISE_FREE
#define JEMALLOC_PURGE_MADVISE_DONTNEED
#define JEMALLOC_PURGE_MADVISE_DONTNEED_ZEROS
/* Defined if madvise(2) is available but MADV_FREE is not (x86 Linux only). */
/* #undef JEMALLOC_DEFINE_MADVISE_FREE */
/*
* Defined if MADV_DO[NT]DUMP is supported as an argument to madvise.
*/
#define JEMALLOC_MADVISE_DONTDUMP
/*
* Defined if transparent huge pages (THPs) are supported via the
* MADV_[NO]HUGEPAGE arguments to madvise(2), and THP support is enabled.
*/
/* #undef JEMALLOC_THP */
/* Define if operating system has alloca.h header. */
#define JEMALLOC_HAS_ALLOCA_H 1
/* C99 restrict keyword supported. */
#define JEMALLOC_HAS_RESTRICT 1
/* For use by hash code. */
/* #undef JEMALLOC_BIG_ENDIAN */
/* sizeof(int) == 2^LG_SIZEOF_INT. */
#define LG_SIZEOF_INT 2
/* sizeof(long) == 2^LG_SIZEOF_LONG. */
#define LG_SIZEOF_LONG 3
/* sizeof(long long) == 2^LG_SIZEOF_LONG_LONG. */
#define LG_SIZEOF_LONG_LONG 3
/* sizeof(intmax_t) == 2^LG_SIZEOF_INTMAX_T. */
#define LG_SIZEOF_INTMAX_T 3
/* glibc malloc hooks (__malloc_hook, __realloc_hook, __free_hook). */
#define JEMALLOC_GLIBC_MALLOC_HOOK
/* glibc memalign hook. */
#define JEMALLOC_GLIBC_MEMALIGN_HOOK
/* pthread support */
#define JEMALLOC_HAVE_PTHREAD
/* dlsym() support */
#define JEMALLOC_HAVE_DLSYM
/* Adaptive mutex support in pthreads. */
#define JEMALLOC_HAVE_PTHREAD_MUTEX_ADAPTIVE_NP
/* GNU specific sched_getcpu support */
#define JEMALLOC_HAVE_SCHED_GETCPU
/* GNU specific sched_setaffinity support */
#define JEMALLOC_HAVE_SCHED_SETAFFINITY
/*
* If defined, all the features necessary for background threads are present.
*/
#define JEMALLOC_BACKGROUND_THREAD 1
/*
* If defined, jemalloc symbols are not exported (doesn't work when
* JEMALLOC_PREFIX is not defined).
*/
/* #undef JEMALLOC_EXPORT */
/* config.malloc_conf options string. */
#define JEMALLOC_CONFIG_MALLOC_CONF ""
/* If defined, jemalloc takes the malloc/free/etc. symbol names. */
#define JEMALLOC_IS_MALLOC 1
/*
* Defined if strerror_r returns char * if _GNU_SOURCE is defined.
*/
#define JEMALLOC_STRERROR_R_RETURNS_CHAR_WITH_GNU_SOURCE
#endif /* JEMALLOC_INTERNAL_DEFS_H_ */

View File

@ -0,0 +1,194 @@
#ifndef JEMALLOC_PREAMBLE_H
#define JEMALLOC_PREAMBLE_H
#include "jemalloc_internal_defs.h"
#include "jemalloc/internal/jemalloc_internal_decls.h"
#ifdef JEMALLOC_UTRACE
#include <sys/ktrace.h>
#endif
#define JEMALLOC_NO_DEMANGLE
#ifdef JEMALLOC_JET
# undef JEMALLOC_IS_MALLOC
# define JEMALLOC_N(n) jet_##n
# include "jemalloc/internal/public_namespace.h"
# define JEMALLOC_NO_RENAME
# include "jemalloc/jemalloc.h"
# undef JEMALLOC_NO_RENAME
#else
# define JEMALLOC_N(n) je_##n
# include "jemalloc/jemalloc.h"
#endif
#if (defined(JEMALLOC_OSATOMIC) || defined(JEMALLOC_OSSPIN))
#include <libkern/OSAtomic.h>
#endif
#ifdef JEMALLOC_ZONE
#include <mach/mach_error.h>
#include <mach/mach_init.h>
#include <mach/vm_map.h>
#endif
#include "jemalloc/internal/jemalloc_internal_macros.h"
/*
* Note that the ordering matters here; the hook itself is name-mangled. We
* want the inclusion of hooks to happen early, so that we hook as much as
* possible.
*/
#ifndef JEMALLOC_NO_PRIVATE_NAMESPACE
# ifndef JEMALLOC_JET
# include "jemalloc/internal/private_namespace.h"
# else
# include "jemalloc/internal/private_namespace_jet.h"
# endif
#endif
#include "jemalloc/internal/test_hooks.h"
#ifdef JEMALLOC_DEFINE_MADVISE_FREE
# define JEMALLOC_MADV_FREE 8
#endif
static const bool config_debug =
#ifdef JEMALLOC_DEBUG
true
#else
false
#endif
;
static const bool have_dss =
#ifdef JEMALLOC_DSS
true
#else
false
#endif
;
static const bool have_madvise_huge =
#ifdef JEMALLOC_HAVE_MADVISE_HUGE
true
#else
false
#endif
;
static const bool config_fill =
#ifdef JEMALLOC_FILL
true
#else
false
#endif
;
static const bool config_lazy_lock =
#ifdef JEMALLOC_LAZY_LOCK
true
#else
false
#endif
;
static const char * const config_malloc_conf = JEMALLOC_CONFIG_MALLOC_CONF;
static const bool config_prof =
#ifdef JEMALLOC_PROF
true
#else
false
#endif
;
static const bool config_prof_libgcc =
#ifdef JEMALLOC_PROF_LIBGCC
true
#else
false
#endif
;
static const bool config_prof_libunwind =
#ifdef JEMALLOC_PROF_LIBUNWIND
true
#else
false
#endif
;
static const bool maps_coalesce =
#ifdef JEMALLOC_MAPS_COALESCE
true
#else
false
#endif
;
static const bool config_stats =
#ifdef JEMALLOC_STATS
true
#else
false
#endif
;
static const bool config_tls =
#ifdef JEMALLOC_TLS
true
#else
false
#endif
;
static const bool config_utrace =
#ifdef JEMALLOC_UTRACE
true
#else
false
#endif
;
static const bool config_xmalloc =
#ifdef JEMALLOC_XMALLOC
true
#else
false
#endif
;
static const bool config_cache_oblivious =
#ifdef JEMALLOC_CACHE_OBLIVIOUS
true
#else
false
#endif
;
/*
* Undocumented, for jemalloc development use only at the moment. See the note
* in jemalloc/internal/log.h.
*/
static const bool config_log =
#ifdef JEMALLOC_LOG
true
#else
false
#endif
;
#ifdef JEMALLOC_HAVE_SCHED_GETCPU
/* Currently percpu_arena depends on sched_getcpu. */
#define JEMALLOC_PERCPU_ARENA
#endif
static const bool have_percpu_arena =
#ifdef JEMALLOC_PERCPU_ARENA
true
#else
false
#endif
;
/*
* Undocumented, and not recommended; the application should take full
* responsibility for tracking provenance.
*/
static const bool force_ivsalloc =
#ifdef JEMALLOC_FORCE_IVSALLOC
true
#else
false
#endif
;
static const bool have_background_thread =
#ifdef JEMALLOC_BACKGROUND_THREAD
true
#else
false
#endif
;
#endif /* JEMALLOC_PREAMBLE_H */

View File

@ -0,0 +1,43 @@
/* include/jemalloc/jemalloc_defs.h. Generated from jemalloc_defs.h.in by configure. */
/* Defined if __attribute__((...)) syntax is supported. */
#define JEMALLOC_HAVE_ATTR
/* Defined if alloc_size attribute is supported. */
#define JEMALLOC_HAVE_ATTR_ALLOC_SIZE
/* Defined if format(printf, ...) attribute is supported. */
#define JEMALLOC_HAVE_ATTR_FORMAT_PRINTF
/*
* Define overrides for non-standard allocator-related functions if they are
* present on the system.
*/
#define JEMALLOC_OVERRIDE_MEMALIGN
#define JEMALLOC_OVERRIDE_VALLOC
/*
* At least Linux omits the "const" in:
*
* size_t malloc_usable_size(const void *ptr);
*
* Match the operating system's prototype.
*/
#define JEMALLOC_USABLE_SIZE_CONST
/*
* If defined, specify throw() for the public function prototypes when compiling
* with C++. The only justification for this is to match the prototypes that
* glibc defines.
*/
#define JEMALLOC_USE_CXX_THROW
#ifdef _MSC_VER
# ifdef _WIN64
# define LG_SIZEOF_PTR_WIN 3
# else
# define LG_SIZEOF_PTR_WIN 2
# endif
#endif
/* sizeof(void *) == 2^LG_SIZEOF_PTR. */
#define LG_SIZEOF_PTR 3

View File

@ -0,0 +1,123 @@
#include <stdlib.h>
#include <stdbool.h>
#include <stdint.h>
#include <limits.h>
#include <strings.h>
#define JEMALLOC_VERSION "5.1.0-97-gcd2931ad9bbd78208565716ab102e86d858c2fff"
#define JEMALLOC_VERSION_MAJOR 5
#define JEMALLOC_VERSION_MINOR 1
#define JEMALLOC_VERSION_BUGFIX 0
#define JEMALLOC_VERSION_NREV 97
#define JEMALLOC_VERSION_GID "cd2931ad9bbd78208565716ab102e86d858c2fff"
#define JEMALLOC_VERSION_GID_IDENT cd2931ad9bbd78208565716ab102e86d858c2fff
#define MALLOCX_LG_ALIGN(la) ((int)(la))
#if LG_SIZEOF_PTR == 2
# define MALLOCX_ALIGN(a) ((int)(ffs((int)(a))-1))
#else
# define MALLOCX_ALIGN(a) \
((int)(((size_t)(a) < (size_t)INT_MAX) ? ffs((int)(a))-1 : \
ffs((int)(((size_t)(a))>>32))+31))
#endif
#define MALLOCX_ZERO ((int)0x40)
/*
* Bias tcache index bits so that 0 encodes "automatic tcache management", and 1
* encodes MALLOCX_TCACHE_NONE.
*/
#define MALLOCX_TCACHE(tc) ((int)(((tc)+2) << 8))
#define MALLOCX_TCACHE_NONE MALLOCX_TCACHE(-1)
/*
* Bias arena index bits so that 0 encodes "use an automatically chosen arena".
*/
#define MALLOCX_ARENA(a) ((((int)(a))+1) << 20)
/*
* Use as arena index in "arena.<i>.{purge,decay,dss}" and
* "stats.arenas.<i>.*" mallctl interfaces to select all arenas. This
* definition is intentionally specified in raw decimal format to support
* cpp-based string concatenation, e.g.
*
* #define STRINGIFY_HELPER(x) #x
* #define STRINGIFY(x) STRINGIFY_HELPER(x)
*
* mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", NULL, NULL, NULL,
* 0);
*/
#define MALLCTL_ARENAS_ALL 4096
/*
* Use as arena index in "stats.arenas.<i>.*" mallctl interfaces to select
* destroyed arenas.
*/
#define MALLCTL_ARENAS_DESTROYED 4097
#if defined(__cplusplus) && defined(JEMALLOC_USE_CXX_THROW)
# define JEMALLOC_CXX_THROW throw()
#else
# define JEMALLOC_CXX_THROW
#endif
#if defined(_MSC_VER)
# define JEMALLOC_ATTR(s)
# define JEMALLOC_ALIGNED(s) __declspec(align(s))
# define JEMALLOC_ALLOC_SIZE(s)
# define JEMALLOC_ALLOC_SIZE2(s1, s2)
# ifndef JEMALLOC_EXPORT
# ifdef DLLEXPORT
# define JEMALLOC_EXPORT __declspec(dllexport)
# else
# define JEMALLOC_EXPORT __declspec(dllimport)
# endif
# endif
# define JEMALLOC_FORMAT_PRINTF(s, i)
# define JEMALLOC_NOINLINE __declspec(noinline)
# ifdef __cplusplus
# define JEMALLOC_NOTHROW __declspec(nothrow)
# else
# define JEMALLOC_NOTHROW
# endif
# define JEMALLOC_SECTION(s) __declspec(allocate(s))
# define JEMALLOC_RESTRICT_RETURN __declspec(restrict)
# if _MSC_VER >= 1900 && !defined(__EDG__)
# define JEMALLOC_ALLOCATOR __declspec(allocator)
# else
# define JEMALLOC_ALLOCATOR
# endif
#elif defined(JEMALLOC_HAVE_ATTR)
# define JEMALLOC_ATTR(s) __attribute__((s))
# define JEMALLOC_ALIGNED(s) JEMALLOC_ATTR(aligned(s))
# ifdef JEMALLOC_HAVE_ATTR_ALLOC_SIZE
# define JEMALLOC_ALLOC_SIZE(s) JEMALLOC_ATTR(alloc_size(s))
# define JEMALLOC_ALLOC_SIZE2(s1, s2) JEMALLOC_ATTR(alloc_size(s1, s2))
# else
# define JEMALLOC_ALLOC_SIZE(s)
# define JEMALLOC_ALLOC_SIZE2(s1, s2)
# endif
# ifndef JEMALLOC_EXPORT
# define JEMALLOC_EXPORT JEMALLOC_ATTR(visibility("default"))
# endif
# ifdef JEMALLOC_HAVE_ATTR_FORMAT_GNU_PRINTF
# define JEMALLOC_FORMAT_PRINTF(s, i) JEMALLOC_ATTR(format(gnu_printf, s, i))
# elif defined(JEMALLOC_HAVE_ATTR_FORMAT_PRINTF)
# define JEMALLOC_FORMAT_PRINTF(s, i) JEMALLOC_ATTR(format(printf, s, i))
# else
# define JEMALLOC_FORMAT_PRINTF(s, i)
# endif
# define JEMALLOC_NOINLINE JEMALLOC_ATTR(noinline)
# define JEMALLOC_NOTHROW JEMALLOC_ATTR(nothrow)
# define JEMALLOC_SECTION(s) JEMALLOC_ATTR(section(s))
# define JEMALLOC_RESTRICT_RETURN
# define JEMALLOC_ALLOCATOR
#else
# define JEMALLOC_ATTR(s)
# define JEMALLOC_ALIGNED(s)
# define JEMALLOC_ALLOC_SIZE(s)
# define JEMALLOC_ALLOC_SIZE2(s1, s2)
# define JEMALLOC_EXPORT
# define JEMALLOC_FORMAT_PRINTF(s, i)
# define JEMALLOC_NOINLINE
# define JEMALLOC_NOTHROW
# define JEMALLOC_SECTION(s)
# define JEMALLOC_RESTRICT_RETURN
# define JEMALLOC_ALLOCATOR
#endif

View File

@ -0,0 +1,66 @@
/*
* The je_ prefix on the following public symbol declarations is an artifact
* of namespace management, and should be omitted in application code unless
* JEMALLOC_NO_DEMANGLE is defined (see jemalloc_mangle.h).
*/
extern JEMALLOC_EXPORT const char *je_malloc_conf;
extern JEMALLOC_EXPORT void (*je_malloc_message)(void *cbopaque,
const char *s);
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_malloc(size_t size)
JEMALLOC_CXX_THROW JEMALLOC_ATTR(malloc) JEMALLOC_ALLOC_SIZE(1);
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_calloc(size_t num, size_t size)
JEMALLOC_CXX_THROW JEMALLOC_ATTR(malloc) JEMALLOC_ALLOC_SIZE2(1, 2);
JEMALLOC_EXPORT int JEMALLOC_NOTHROW je_posix_memalign(void **memptr,
size_t alignment, size_t size) JEMALLOC_CXX_THROW JEMALLOC_ATTR(nonnull(1));
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_aligned_alloc(size_t alignment,
size_t size) JEMALLOC_CXX_THROW JEMALLOC_ATTR(malloc)
JEMALLOC_ALLOC_SIZE(2);
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_realloc(void *ptr, size_t size)
JEMALLOC_CXX_THROW JEMALLOC_ALLOC_SIZE(2);
JEMALLOC_EXPORT void JEMALLOC_NOTHROW je_free(void *ptr)
JEMALLOC_CXX_THROW;
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_mallocx(size_t size, int flags)
JEMALLOC_ATTR(malloc) JEMALLOC_ALLOC_SIZE(1);
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_rallocx(void *ptr, size_t size,
int flags) JEMALLOC_ALLOC_SIZE(2);
JEMALLOC_EXPORT size_t JEMALLOC_NOTHROW je_xallocx(void *ptr, size_t size,
size_t extra, int flags);
JEMALLOC_EXPORT size_t JEMALLOC_NOTHROW je_sallocx(const void *ptr,
int flags) JEMALLOC_ATTR(pure);
JEMALLOC_EXPORT void JEMALLOC_NOTHROW je_dallocx(void *ptr, int flags);
JEMALLOC_EXPORT void JEMALLOC_NOTHROW je_sdallocx(void *ptr, size_t size,
int flags);
JEMALLOC_EXPORT size_t JEMALLOC_NOTHROW je_nallocx(size_t size, int flags)
JEMALLOC_ATTR(pure);
JEMALLOC_EXPORT int JEMALLOC_NOTHROW je_mallctl(const char *name,
void *oldp, size_t *oldlenp, void *newp, size_t newlen);
JEMALLOC_EXPORT int JEMALLOC_NOTHROW je_mallctlnametomib(const char *name,
size_t *mibp, size_t *miblenp);
JEMALLOC_EXPORT int JEMALLOC_NOTHROW je_mallctlbymib(const size_t *mib,
size_t miblen, void *oldp, size_t *oldlenp, void *newp, size_t newlen);
JEMALLOC_EXPORT void JEMALLOC_NOTHROW je_malloc_stats_print(
void (*write_cb)(void *, const char *), void *je_cbopaque,
const char *opts);
JEMALLOC_EXPORT size_t JEMALLOC_NOTHROW je_malloc_usable_size(
JEMALLOC_USABLE_SIZE_CONST void *ptr) JEMALLOC_CXX_THROW;
#ifdef JEMALLOC_OVERRIDE_MEMALIGN
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_memalign(size_t alignment, size_t size)
JEMALLOC_CXX_THROW JEMALLOC_ATTR(malloc);
#endif
#ifdef JEMALLOC_OVERRIDE_VALLOC
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_valloc(size_t size) JEMALLOC_CXX_THROW
JEMALLOC_ATTR(malloc);
#endif

View File

@ -0,0 +1,77 @@
typedef struct extent_hooks_s extent_hooks_t;
/*
* void *
* extent_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size,
* size_t alignment, bool *zero, bool *commit, unsigned arena_ind);
*/
typedef void *(extent_alloc_t)(extent_hooks_t *, void *, size_t, size_t, bool *,
bool *, unsigned);
/*
* bool
* extent_dalloc(extent_hooks_t *extent_hooks, void *addr, size_t size,
* bool committed, unsigned arena_ind);
*/
typedef bool (extent_dalloc_t)(extent_hooks_t *, void *, size_t, bool,
unsigned);
/*
* void
* extent_destroy(extent_hooks_t *extent_hooks, void *addr, size_t size,
* bool committed, unsigned arena_ind);
*/
typedef void (extent_destroy_t)(extent_hooks_t *, void *, size_t, bool,
unsigned);
/*
* bool
* extent_commit(extent_hooks_t *extent_hooks, void *addr, size_t size,
* size_t offset, size_t length, unsigned arena_ind);
*/
typedef bool (extent_commit_t)(extent_hooks_t *, void *, size_t, size_t, size_t,
unsigned);
/*
* bool
* extent_decommit(extent_hooks_t *extent_hooks, void *addr, size_t size,
* size_t offset, size_t length, unsigned arena_ind);
*/
typedef bool (extent_decommit_t)(extent_hooks_t *, void *, size_t, size_t,
size_t, unsigned);
/*
* bool
* extent_purge(extent_hooks_t *extent_hooks, void *addr, size_t size,
* size_t offset, size_t length, unsigned arena_ind);
*/
typedef bool (extent_purge_t)(extent_hooks_t *, void *, size_t, size_t, size_t,
unsigned);
/*
* bool
* extent_split(extent_hooks_t *extent_hooks, void *addr, size_t size,
* size_t size_a, size_t size_b, bool committed, unsigned arena_ind);
*/
typedef bool (extent_split_t)(extent_hooks_t *, void *, size_t, size_t, size_t,
bool, unsigned);
/*
* bool
* extent_merge(extent_hooks_t *extent_hooks, void *addr_a, size_t size_a,
* void *addr_b, size_t size_b, bool committed, unsigned arena_ind);
*/
typedef bool (extent_merge_t)(extent_hooks_t *, void *, size_t, void *, size_t,
bool, unsigned);
struct extent_hooks_s {
extent_alloc_t *alloc;
extent_dalloc_t *dalloc;
extent_destroy_t *destroy;
extent_commit_t *commit;
extent_decommit_t *decommit;
extent_purge_t *purge_lazy;
extent_purge_t *purge_forced;
extent_split_t *split;
extent_merge_t *merge;
};

View File

@ -62,6 +62,7 @@ set(SRCS
)
add_library(rdkafka ${SRCS})
target_compile_options(rdkafka PRIVATE -fno-sanitize=undefined)
target_include_directories(rdkafka SYSTEM PUBLIC include)
target_include_directories(rdkafka SYSTEM PUBLIC ${RDKAFKA_SOURCE_DIR}) # Because weird logic with "include_next" is used.
target_include_directories(rdkafka SYSTEM PRIVATE ${ZSTD_INCLUDE_DIR}/common) # Because wrong path to "zstd_errors.h" is used.

View File

@ -13,6 +13,7 @@
// machines.
#include "murmurhash2.h"
#include <cstring>
// Platform-specific functions and macros
// Microsoft Visual Studio
@ -48,7 +49,8 @@ uint32_t MurmurHash2(const void * key, int len, uint32_t seed)
while (len >= 4)
{
uint32_t k = *reinterpret_cast<const uint32_t *>(data);
uint32_t k;
memcpy(&k, data, sizeof(k));
k *= m;
k ^= k >> r;
k *= m;
@ -418,4 +420,4 @@ uint32_t MurmurHashAligned2(const void * key, int len, uint32_t seed)
return h;
}
}
}

View File

@ -7,6 +7,7 @@
// non-native version will be less than optimal.
#include "murmurhash3.h"
#include <cstring>
//-----------------------------------------------------------------------------
// Platform-specific functions and macros
@ -53,7 +54,9 @@ inline uint64_t rotl64 ( uint64_t x, int8_t r )
FORCE_INLINE uint32_t getblock32 ( const uint32_t * p, int i )
{
return p[i];
uint32_t res;
memcpy(&res, p + i, sizeof(res));
return res;
}
FORCE_INLINE uint64_t getblock64 ( const uint64_t * p, int i )

View File

@ -164,6 +164,7 @@ macro(add_object_library name common_path)
endif ()
endmacro()
add_object_library(clickhouse_access src/Access)
add_object_library(clickhouse_core src/Core)
add_object_library(clickhouse_compression src/Compression)
add_object_library(clickhouse_datastreams src/DataStreams)

View File

@ -274,15 +274,24 @@ private:
pcg64 generator(randomSeed());
std::uniform_int_distribution<size_t> distribution(0, queries.size() - 1);
for (size_t i = 0; i < concurrency; ++i)
try
{
EntryPtrs connection_entries;
connection_entries.reserve(connections.size());
for (size_t i = 0; i < concurrency; ++i)
{
EntryPtrs connection_entries;
connection_entries.reserve(connections.size());
for (const auto & connection : connections)
connection_entries.emplace_back(std::make_shared<Entry>(connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
for (const auto & connection : connections)
connection_entries.emplace_back(std::make_shared<Entry>(
connection->get(ConnectionTimeouts::getTCPTimeoutsWithoutFailover(settings))));
pool.schedule(std::bind(&Benchmark::thread, this, connection_entries));
pool.scheduleOrThrowOnError(std::bind(&Benchmark::thread, this, connection_entries));
}
}
catch (...)
{
pool.wait();
throw;
}
InterruptListener interrupt_listener;

View File

@ -895,7 +895,7 @@ public:
ThreadPool thread_pool(num_threads ? num_threads : 2 * getNumberOfPhysicalCPUCores());
for (const TaskShardPtr & task_shard : task_table.all_shards)
thread_pool.schedule([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
thread_pool.scheduleOrThrowOnError([this, timeouts, task_shard]() { discoverShardPartitions(timeouts, task_shard); });
LOG_DEBUG(log, "Waiting for " << thread_pool.active() << " setup jobs");
thread_pool.wait();
@ -2038,7 +2038,7 @@ protected:
ThreadPool thread_pool(std::min<UInt64>(num_shards, getNumberOfPhysicalCPUCores()));
for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index)
thread_pool.schedule([=] { do_for_shard(shard_index); });
thread_pool.scheduleOrThrowOnError([=] { do_for_shard(shard_index); });
thread_pool.wait();
}

View File

@ -19,8 +19,8 @@
#include <Common/ClickHouseRevision.h>
#include <Common/ThreadStatus.h>
#include <Common/config_version.h>
#include <Common/quoteString.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/UseSSL.h>
#include <Parsers/parseQuery.h>
@ -221,14 +221,6 @@ catch (const Exception & e)
}
inline String getQuotedString(const String & s)
{
WriteBufferFromOwnString buf;
writeQuotedString(s, buf);
return buf.str();
}
std::string LocalServer::getInitialCreateTableQuery()
{
if (!config().has("table-structure"))
@ -241,7 +233,7 @@ std::string LocalServer::getInitialCreateTableQuery()
if (!config().has("table-file") || config().getString("table-file") == "-") /// Use Unix tools stdin naming convention
table_file = "stdin";
else /// Use regular file
table_file = getQuotedString(config().getString("table-file"));
table_file = quoteString(config().getString("table-file"));
return
"CREATE TABLE " + table_name +

View File

@ -46,7 +46,7 @@ MySQLHandler::MySQLHandler(IServer & server_, const Poco::Net::StreamSocket & so
, connection_id(connection_id_)
, public_key(public_key_)
, private_key(private_key_)
, auth_plugin(new Authentication::Native41())
, auth_plugin(new MySQLProtocol::Authentication::Native41())
{
server_capability_flags = CLIENT_PROTOCOL_41 | CLIENT_SECURE_CONNECTION | CLIENT_PLUGIN_AUTH | CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA | CLIENT_CONNECT_WITH_DB | CLIENT_DEPRECATE_EOF;
if (ssl_enabled)
@ -231,8 +231,8 @@ void MySQLHandler::authenticate(const String & user_name, const String & auth_pl
{
// For compatibility with JavaScript MySQL client, Native41 authentication plugin is used when possible (if password is specified using double SHA1). Otherwise SHA256 plugin is used.
auto user = connection_context.getUser(user_name);
if (user->password_double_sha1_hex.empty())
auth_plugin = std::make_unique<Authentication::Sha256Password>(public_key, private_key, log);
if (user->authentication.getType() != DB::Authentication::DOUBLE_SHA1_PASSWORD)
auth_plugin = std::make_unique<MySQLProtocol::Authentication::Sha256Password>(public_key, private_key, log);
try {
std::optional<String> auth_response = auth_plugin_name == auth_plugin->getName() ? std::make_optional<String>(initial_auth_response) : std::nullopt;

View File

@ -565,7 +565,7 @@ void TCPHandler::processOrdinaryQueryWithProcessors(size_t num_threads)
auto executor = pipeline.execute();
std::atomic_bool exception = false;
pool.schedule([&]()
pool.scheduleOrThrowOnError([&]()
{
/// ThreadStatus thread_status;

View File

@ -0,0 +1,397 @@
#include <Access/AllowedClientHosts.h>
#include <Common/Exception.h>
#include <common/SimpleCache.h>
#include <Common/StringUtils/StringUtils.h>
#include <IO/ReadHelpers.h>
#include <Poco/Net/SocketAddress.h>
#include <Poco/RegularExpression.h>
#include <common/logger_useful.h>
#include <ext/scope_guard.h>
#include <boost/range/algorithm/find.hpp>
#include <boost/range/algorithm/find_first_of.hpp>
namespace DB
{
namespace ErrorCodes
{
extern const int DNS_ERROR;
extern const int IP_ADDRESS_NOT_ALLOWED;
}
namespace
{
using IPAddress = Poco::Net::IPAddress;
const AllowedClientHosts::IPSubnet ALL_ADDRESSES = AllowedClientHosts::IPSubnet{IPAddress{IPAddress::IPv6}, IPAddress{IPAddress::IPv6}};
IPAddress toIPv6(const IPAddress & addr)
{
if (addr.family() == IPAddress::IPv6)
return addr;
return IPAddress("::FFFF:" + addr.toString());
}
IPAddress maskToIPv6(const IPAddress & mask)
{
if (mask.family() == IPAddress::IPv6)
return mask;
return IPAddress(96, IPAddress::IPv6) | toIPv6(mask);
}
bool isAddressOfHostImpl(const IPAddress & address, const String & host)
{
IPAddress addr_v6 = toIPv6(address);
/// Resolve by hand, because Poco don't use AI_ALL flag but we need it.
addrinfo * ai = nullptr;
SCOPE_EXIT(
{
if (ai)
freeaddrinfo(ai);
});
addrinfo hints;
memset(&hints, 0, sizeof(hints));
hints.ai_family = AF_UNSPEC;
hints.ai_flags |= AI_V4MAPPED | AI_ALL;
int ret = getaddrinfo(host.c_str(), nullptr, &hints, &ai);
if (0 != ret)
throw Exception("Cannot getaddrinfo: " + std::string(gai_strerror(ret)), ErrorCodes::DNS_ERROR);
for (; ai != nullptr; ai = ai->ai_next)
{
if (ai->ai_addrlen && ai->ai_addr)
{
if (ai->ai_family == AF_INET6)
{
if (addr_v6 == IPAddress(
&reinterpret_cast<sockaddr_in6*>(ai->ai_addr)->sin6_addr, sizeof(in6_addr),
reinterpret_cast<sockaddr_in6*>(ai->ai_addr)->sin6_scope_id))
{
return true;
}
}
else if (ai->ai_family == AF_INET)
{
if (addr_v6 == toIPv6(IPAddress(&reinterpret_cast<sockaddr_in *>(ai->ai_addr)->sin_addr, sizeof(in_addr))))
{
return true;
}
}
}
}
return false;
}
/// Cached version of isAddressOfHostImpl(). We need to cache DNS requests.
bool isAddressOfHost(const IPAddress & address, const String & host)
{
static SimpleCache<decltype(isAddressOfHostImpl), isAddressOfHostImpl> cache;
return cache(address, host);
}
String getHostByAddressImpl(const IPAddress & address)
{
Poco::Net::SocketAddress sock_addr(address, 0);
/// Resolve by hand, because Poco library doesn't have such functionality.
char host[1024];
int gai_errno = getnameinfo(sock_addr.addr(), sock_addr.length(), host, sizeof(host), nullptr, 0, NI_NAMEREQD);
if (0 != gai_errno)
throw Exception("Cannot getnameinfo: " + std::string(gai_strerror(gai_errno)), ErrorCodes::DNS_ERROR);
/// Check that PTR record is resolved back to client address
if (!isAddressOfHost(address, host))
throw Exception("Host " + String(host) + " isn't resolved back to " + address.toString(), ErrorCodes::DNS_ERROR);
return host;
}
/// Cached version of getHostByAddressImpl(). We need to cache DNS requests.
String getHostByAddress(const IPAddress & address)
{
static SimpleCache<decltype(getHostByAddressImpl), &getHostByAddressImpl> cache;
return cache(address);
}
}
String AllowedClientHosts::IPSubnet::toString() const
{
unsigned int prefix_length = mask.prefixLength();
if (IPAddress{prefix_length, mask.family()} == mask)
return prefix.toString() + "/" + std::to_string(prefix_length);
return prefix.toString() + "/" + mask.toString();
}
AllowedClientHosts::AllowedClientHosts()
{
}
AllowedClientHosts::AllowedClientHosts(AllAddressesTag)
{
addAllAddresses();
}
AllowedClientHosts::~AllowedClientHosts() = default;
AllowedClientHosts::AllowedClientHosts(const AllowedClientHosts & src)
{
*this = src;
}
AllowedClientHosts & AllowedClientHosts::operator =(const AllowedClientHosts & src)
{
addresses = src.addresses;
subnets = src.subnets;
host_names = src.host_names;
host_regexps = src.host_regexps;
compiled_host_regexps.clear();
return *this;
}
AllowedClientHosts::AllowedClientHosts(AllowedClientHosts && src)
{
*this = src;
}
AllowedClientHosts & AllowedClientHosts::operator =(AllowedClientHosts && src)
{
addresses = std::move(src.addresses);
subnets = std::move(src.subnets);
host_names = std::move(src.host_names);
host_regexps = std::move(src.host_regexps);
compiled_host_regexps = std::move(src.compiled_host_regexps);
return *this;
}
void AllowedClientHosts::clear()
{
addresses.clear();
subnets.clear();
host_names.clear();
host_regexps.clear();
compiled_host_regexps.clear();
}
bool AllowedClientHosts::empty() const
{
return addresses.empty() && subnets.empty() && host_names.empty() && host_regexps.empty();
}
void AllowedClientHosts::addAddress(const IPAddress & address)
{
IPAddress addr_v6 = toIPv6(address);
if (boost::range::find(addresses, addr_v6) == addresses.end())
addresses.push_back(addr_v6);
}
void AllowedClientHosts::addAddress(const String & address)
{
addAddress(IPAddress{address});
}
void AllowedClientHosts::addSubnet(const IPSubnet & subnet)
{
IPSubnet subnet_v6;
subnet_v6.prefix = toIPv6(subnet.prefix);
subnet_v6.mask = maskToIPv6(subnet.mask);
if (subnet_v6.mask == IPAddress(128, IPAddress::IPv6))
{
addAddress(subnet_v6.prefix);
return;
}
subnet_v6.prefix = subnet_v6.prefix & subnet_v6.mask;
if (boost::range::find(subnets, subnet_v6) == subnets.end())
subnets.push_back(subnet_v6);
}
void AllowedClientHosts::addSubnet(const IPAddress & prefix, const IPAddress & mask)
{
addSubnet(IPSubnet{prefix, mask});
}
void AllowedClientHosts::addSubnet(const IPAddress & prefix, size_t num_prefix_bits)
{
addSubnet(prefix, IPAddress(num_prefix_bits, prefix.family()));
}
void AllowedClientHosts::addSubnet(const String & subnet)
{
size_t slash = subnet.find('/');
if (slash == String::npos)
{
addAddress(subnet);
return;
}
IPAddress prefix{String{subnet, 0, slash}};
String mask(subnet, slash + 1, subnet.length() - slash - 1);
if (std::all_of(mask.begin(), mask.end(), isNumericASCII))
addSubnet(prefix, parseFromString<UInt8>(mask));
else
addSubnet(prefix, IPAddress{mask});
}
void AllowedClientHosts::addHostName(const String & host_name)
{
if (boost::range::find(host_names, host_name) == host_names.end())
host_names.push_back(host_name);
}
void AllowedClientHosts::addHostRegexp(const String & host_regexp)
{
if (boost::range::find(host_regexps, host_regexp) == host_regexps.end())
host_regexps.push_back(host_regexp);
}
void AllowedClientHosts::addAllAddresses()
{
clear();
addSubnet(ALL_ADDRESSES);
}
bool AllowedClientHosts::containsAllAddresses() const
{
return (boost::range::find(subnets, ALL_ADDRESSES) != subnets.end())
|| (boost::range::find(host_regexps, ".*") != host_regexps.end())
|| (boost::range::find(host_regexps, "$") != host_regexps.end());
}
bool AllowedClientHosts::contains(const IPAddress & address) const
{
return containsImpl(address, String(), nullptr);
}
void AllowedClientHosts::checkContains(const IPAddress & address, const String & user_name) const
{
String error;
if (!containsImpl(address, user_name, &error))
throw Exception(error, ErrorCodes::IP_ADDRESS_NOT_ALLOWED);
}
bool AllowedClientHosts::containsImpl(const IPAddress & address, const String & user_name, String * error) const
{
if (error)
error->clear();
/// Check `ip_addresses`.
IPAddress addr_v6 = toIPv6(address);
if (boost::range::find(addresses, addr_v6) != addresses.end())
return true;
/// Check `ip_subnets`.
for (const auto & subnet : subnets)
if ((addr_v6 & subnet.mask) == subnet.prefix)
return true;
/// Check `hosts`.
for (const String & host_name : host_names)
{
try
{
if (isAddressOfHost(address, host_name))
return true;
}
catch (Exception & e)
{
if (e.code() != ErrorCodes::DNS_ERROR)
e.rethrow();
/// Try to ignore DNS errors: if host cannot be resolved, skip it and try next.
LOG_WARNING(
&Logger::get("AddressPatterns"),
"Failed to check if the allowed client hosts contain address " << address.toString() << ". " << e.displayText()
<< ", code = " << e.code());
}
}
/// Check `host_regexps`.
if (!host_regexps.empty())
{
compileRegexps();
try
{
String resolved_host = getHostByAddress(address);
for (const auto & compiled_regexp : compiled_host_regexps)
{
if (compiled_regexp && compiled_regexp->match(resolved_host))
return true;
}
}
catch (Exception & e)
{
if (e.code() != ErrorCodes::DNS_ERROR)
e.rethrow();
/// Try to ignore DNS errors: if host cannot be resolved, skip it and try next.
LOG_WARNING(
&Logger::get("AddressPatterns"),
"Failed to check if the allowed client hosts contain address " << address.toString() << ". " << e.displayText()
<< ", code = " << e.code());
}
}
if (error)
{
if (user_name.empty())
*error = "It's not allowed to connect from address " + address.toString();
else
*error = "User " + user_name + " is not allowed to connect from address " + address.toString();
}
return false;
}
void AllowedClientHosts::compileRegexps() const
{
if (compiled_host_regexps.size() == host_regexps.size())
return;
size_t old_size = compiled_host_regexps.size();
compiled_host_regexps.reserve(host_regexps.size());
for (size_t i = old_size; i != host_regexps.size(); ++i)
compiled_host_regexps.emplace_back(std::make_unique<Poco::RegularExpression>(host_regexps[i]));
}
bool operator ==(const AllowedClientHosts & lhs, const AllowedClientHosts & rhs)
{
return (lhs.addresses == rhs.addresses) && (lhs.subnets == rhs.subnets) && (lhs.host_names == rhs.host_names)
&& (lhs.host_regexps == rhs.host_regexps);
}
}

View File

@ -0,0 +1,103 @@
#pragma once
#include <Core/Types.h>
#include <Poco/Net/IPAddress.h>
#include <memory>
#include <vector>
namespace Poco
{
class RegularExpression;
}
namespace DB
{
/// Represents lists of hosts an user is allowed to connect to server from.
class AllowedClientHosts
{
public:
using IPAddress = Poco::Net::IPAddress;
struct IPSubnet
{
IPAddress prefix;
IPAddress mask;
String toString() const;
friend bool operator ==(const IPSubnet & lhs, const IPSubnet & rhs) { return (lhs.prefix == rhs.prefix) && (lhs.mask == rhs.mask); }
friend bool operator !=(const IPSubnet & lhs, const IPSubnet & rhs) { return !(lhs == rhs); }
};
struct AllAddressesTag {};
AllowedClientHosts();
explicit AllowedClientHosts(AllAddressesTag);
~AllowedClientHosts();
AllowedClientHosts(const AllowedClientHosts & src);
AllowedClientHosts & operator =(const AllowedClientHosts & src);
AllowedClientHosts(AllowedClientHosts && src);
AllowedClientHosts & operator =(AllowedClientHosts && src);
/// Removes all contained addresses. This will disallow all addresses.
void clear();
bool empty() const;
/// Allows exact IP address.
/// For example, 213.180.204.3 or 2a02:6b8::3
void addAddress(const IPAddress & address);
void addAddress(const String & address);
/// Allows an IP subnet.
void addSubnet(const IPSubnet & subnet);
void addSubnet(const String & subnet);
/// Allows an IP subnet.
/// For example, 312.234.1.1/255.255.255.0 or 2a02:6b8::3/FFFF:FFFF:FFFF:FFFF::
void addSubnet(const IPAddress & prefix, const IPAddress & mask);
/// Allows an IP subnet.
/// For example, 10.0.0.1/8 or 2a02:6b8::3/64
void addSubnet(const IPAddress & prefix, size_t num_prefix_bits);
/// Allows all addresses.
void addAllAddresses();
/// Allows an exact host. The `contains()` function will check that the provided address equals to one of that host's addresses.
void addHostName(const String & host_name);
/// Allows a regular expression for the host.
void addHostRegexp(const String & host_regexp);
const std::vector<IPAddress> & getAddresses() const { return addresses; }
const std::vector<IPSubnet> & getSubnets() const { return subnets; }
const std::vector<String> & getHostNames() const { return host_names; }
const std::vector<String> & getHostRegexps() const { return host_regexps; }
/// Checks if the provided address is in the list. Returns false if not.
bool contains(const IPAddress & address) const;
/// Checks if any address is allowed.
bool containsAllAddresses() const;
/// Checks if the provided address is in the list. Throws an exception if not.
/// `username` is only used for generating an error message if the address isn't in the list.
void checkContains(const IPAddress & address, const String & user_name = String()) const;
friend bool operator ==(const AllowedClientHosts & lhs, const AllowedClientHosts & rhs);
friend bool operator !=(const AllowedClientHosts & lhs, const AllowedClientHosts & rhs) { return !(lhs == rhs); }
private:
bool containsImpl(const IPAddress & address, const String & user_name, String * error) const;
void compileRegexps() const;
std::vector<IPAddress> addresses;
std::vector<IPSubnet> subnets;
std::vector<String> host_names;
std::vector<String> host_regexps;
mutable std::vector<std::unique_ptr<Poco::RegularExpression>> compiled_host_regexps;
};
}

View File

@ -0,0 +1,207 @@
#include <Access/Authentication.h>
#include <Common/Exception.h>
#include <common/StringRef.h>
#include <Core/Defines.h>
#include <Poco/SHA1Engine.h>
#include <boost/algorithm/hex.hpp>
#include "config_core.h"
#if USE_SSL
# include <openssl/sha.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
extern const int REQUIRED_PASSWORD;
extern const int WRONG_PASSWORD;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
namespace
{
using Digest = Authentication::Digest;
Digest encodePlainText(const StringRef & text)
{
return Digest(text.data, text.data + text.size);
}
Digest encodeSHA256(const StringRef & text)
{
#if USE_SSL
Digest hash;
hash.resize(32);
SHA256_CTX ctx;
SHA256_Init(&ctx);
SHA256_Update(&ctx, reinterpret_cast<const UInt8 *>(text.data), text.size);
SHA256_Final(hash.data(), &ctx);
return hash;
#else
UNUSED(text);
throw DB::Exception("SHA256 passwords support is disabled, because ClickHouse was built without SSL library", DB::ErrorCodes::SUPPORT_IS_DISABLED);
#endif
}
Digest encodeSHA1(const StringRef & text)
{
Poco::SHA1Engine engine;
engine.update(text.data, text.size);
return engine.digest();
}
Digest encodeSHA1(const Digest & text)
{
return encodeSHA1(StringRef{reinterpret_cast<const char *>(text.data()), text.size()});
}
Digest encodeDoubleSHA1(const StringRef & text)
{
return encodeSHA1(encodeSHA1(text));
}
}
Authentication::Authentication(Authentication::Type type_)
: type(type_)
{
}
void Authentication::setPassword(const String & password_)
{
switch (type)
{
case NO_PASSWORD:
throw Exception("Cannot specify password for the 'NO_PASSWORD' authentication type", ErrorCodes::LOGICAL_ERROR);
case PLAINTEXT_PASSWORD:
setPasswordHashBinary(encodePlainText(password_));
return;
case SHA256_PASSWORD:
setPasswordHashBinary(encodeSHA256(password_));
return;
case DOUBLE_SHA1_PASSWORD:
setPasswordHashBinary(encodeDoubleSHA1(password_));
return;
}
throw Exception("Unknown authentication type: " + std::to_string(static_cast<int>(type)), ErrorCodes::LOGICAL_ERROR);
}
String Authentication::getPassword() const
{
if (type != PLAINTEXT_PASSWORD)
throw Exception("Cannot decode the password", ErrorCodes::LOGICAL_ERROR);
return String(password_hash.data(), password_hash.data() + password_hash.size());
}
void Authentication::setPasswordHashHex(const String & hash)
{
Digest digest;
digest.resize(hash.size() / 2);
boost::algorithm::unhex(hash.begin(), hash.end(), digest.data());
setPasswordHashBinary(digest);
}
String Authentication::getPasswordHashHex() const
{
String hex;
hex.resize(password_hash.size() * 2);
boost::algorithm::hex(password_hash.begin(), password_hash.end(), hex.data());
return hex;
}
void Authentication::setPasswordHashBinary(const Digest & hash)
{
switch (type)
{
case NO_PASSWORD:
throw Exception("Cannot specify password for the 'NO_PASSWORD' authentication type", ErrorCodes::LOGICAL_ERROR);
case PLAINTEXT_PASSWORD:
{
password_hash = hash;
return;
}
case SHA256_PASSWORD:
{
if (hash.size() != 32)
throw Exception(
"Password hash for the 'SHA256_PASSWORD' authentication type has length " + std::to_string(hash.size())
+ " but must be exactly 32 bytes.",
ErrorCodes::BAD_ARGUMENTS);
password_hash = hash;
return;
}
case DOUBLE_SHA1_PASSWORD:
{
if (hash.size() != 20)
throw Exception(
"Password hash for the 'DOUBLE_SHA1_PASSWORD' authentication type has length " + std::to_string(hash.size())
+ " but must be exactly 20 bytes.",
ErrorCodes::BAD_ARGUMENTS);
password_hash = hash;
return;
}
}
throw Exception("Unknown authentication type: " + std::to_string(static_cast<int>(type)), ErrorCodes::LOGICAL_ERROR);
}
bool Authentication::isCorrectPassword(const String & password_) const
{
switch (type)
{
case NO_PASSWORD:
return true;
case PLAINTEXT_PASSWORD:
return password_ == StringRef{reinterpret_cast<const char *>(password_hash.data()), password_hash.size()};
case SHA256_PASSWORD:
return encodeSHA256(password_) == password_hash;
case DOUBLE_SHA1_PASSWORD:
{
auto first_sha1 = encodeSHA1(password_);
/// If it was MySQL compatibility server, then first_sha1 already contains double SHA1.
if (first_sha1 == password_hash)
return true;
return encodeSHA1(first_sha1) == password_hash;
}
}
throw Exception("Unknown authentication type: " + std::to_string(static_cast<int>(type)), ErrorCodes::LOGICAL_ERROR);
}
void Authentication::checkPassword(const String & password_, const String & user_name) const
{
if (isCorrectPassword(password_))
return;
auto info_about_user_name = [&user_name]() { return user_name.empty() ? String() : " for user " + user_name; };
if (password_.empty() && (type != NO_PASSWORD))
throw Exception("Password required" + info_about_user_name(), ErrorCodes::REQUIRED_PASSWORD);
throw Exception("Wrong password" + info_about_user_name(), ErrorCodes::WRONG_PASSWORD);
}
bool operator ==(const Authentication & lhs, const Authentication & rhs)
{
return (lhs.type == rhs.type) && (lhs.password_hash == rhs.password_hash);
}
}

View File

@ -0,0 +1,66 @@
#pragma once
#include <Core/Types.h>
namespace DB
{
/// Authentication type and encrypted password for checking when an user logins.
class Authentication
{
public:
enum Type
{
/// User doesn't have to enter password.
NO_PASSWORD,
/// Password is stored as is.
PLAINTEXT_PASSWORD,
/// Password is encrypted in SHA256 hash.
SHA256_PASSWORD,
/// SHA1(SHA1(password)).
/// This kind of hash is used by the `mysql_native_password` authentication plugin.
DOUBLE_SHA1_PASSWORD,
};
using Digest = std::vector<UInt8>;
Authentication(Authentication::Type type = NO_PASSWORD);
Authentication(const Authentication & src) = default;
Authentication & operator =(const Authentication & src) = default;
Authentication(Authentication && src) = default;
Authentication & operator =(Authentication && src) = default;
Type getType() const { return type; }
/// Sets the password and encrypt it using the authentication type set in the constructor.
void setPassword(const String & password);
/// Returns the password. Allowed to use only for Type::PLAINTEXT_PASSWORD.
String getPassword() const;
/// Sets the password as a string of hexadecimal digits.
void setPasswordHashHex(const String & hash);
String getPasswordHashHex() const;
/// Sets the password in binary form.
void setPasswordHashBinary(const Digest & hash);
const Digest & getPasswordHashBinary() const { return password_hash; }
/// Checks if the provided password is correct. Returns false if not.
bool isCorrectPassword(const String & password) const;
/// Checks if the provided password is correct. Throws an exception if not.
/// `user_name` is only used for generating an error message if the password is incorrect.
void checkPassword(const String & password, const String & user_name = String()) const;
friend bool operator ==(const Authentication & lhs, const Authentication & rhs);
friend bool operator !=(const Authentication & lhs, const Authentication & rhs) { return !(lhs == rhs); }
private:
Type type = Type::NO_PASSWORD;
Digest password_hash;
};
}

View File

View File

@ -12,8 +12,8 @@ namespace
AggregateFunctionPtr createAggregateFunctionCount(const std::string & name, const DataTypes & argument_types, const Array & parameters)
{
assertNoParameters(name, parameters);
assertArityAtMost<1>(name, argument_types);
/// 'count' accept any number of arguments and (in this case of non-Nullable types) simply ignore them.
return std::make_shared<AggregateFunctionCount>(argument_types);
}

View File

@ -113,69 +113,4 @@ public:
const char * getHeaderFilePath() const override { return __FILE__; }
};
/// Count number of calls where all arguments are not NULL.
class AggregateFunctionCountNotNullVariadic final : public IAggregateFunctionDataHelper<AggregateFunctionCountData, AggregateFunctionCountNotNullVariadic>
{
public:
AggregateFunctionCountNotNullVariadic(const DataTypes & arguments, const Array & params)
: IAggregateFunctionDataHelper<AggregateFunctionCountData, AggregateFunctionCountNotNullVariadic>(arguments, params)
{
number_of_arguments = arguments.size();
if (number_of_arguments == 1)
throw Exception("Logical error: single argument is passed to AggregateFunctionCountNotNullVariadic", ErrorCodes::LOGICAL_ERROR);
if (number_of_arguments > MAX_ARGS)
throw Exception("Maximum number of arguments for aggregate function with Nullable types is " + toString(size_t(MAX_ARGS)),
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
for (size_t i = 0; i < number_of_arguments; ++i)
is_nullable[i] = arguments[i]->isNullable();
}
String getName() const override { return "count"; }
DataTypePtr getReturnType() const override
{
return std::make_shared<DataTypeUInt64>();
}
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
for (size_t i = 0; i < number_of_arguments; ++i)
if (is_nullable[i] && assert_cast<const ColumnNullable &>(*columns[i]).isNullAt(row_num))
return;
++data(place).count;
}
void merge(AggregateDataPtr place, ConstAggregateDataPtr rhs, Arena *) const override
{
data(place).count += data(rhs).count;
}
void serialize(ConstAggregateDataPtr place, WriteBuffer & buf) const override
{
writeVarUInt(data(place).count, buf);
}
void deserialize(AggregateDataPtr place, ReadBuffer & buf, Arena *) const override
{
readVarUInt(data(place).count, buf);
}
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
assert_cast<ColumnUInt64 &>(to).getData().push_back(data(place).count);
}
const char * getHeaderFilePath() const override { return __FILE__; }
private:
enum { MAX_ARGS = 8 };
size_t number_of_arguments = 0;
std::array<char, MAX_ARGS> is_nullable; /// Plain array is better than std::vector due to one indirection less.
};
}

View File

@ -1,9 +1,12 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionGroupBitmap.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeAggregateFunction.h>
// TODO include this last because of a broken roaring header. See the comment
// inside.
#include <AggregateFunctions/AggregateFunctionGroupBitmap.h>
namespace DB
{
@ -36,15 +39,13 @@ AggregateFunctionPtr createAggregateFunctionBitmapL2(const std::string & name, c
assertUnary(name, argument_types);
DataTypePtr argument_type_ptr = argument_types[0];
WhichDataType which(*argument_type_ptr);
if (which.idx == TypeIndex::AggregateFunction)
{
const DataTypeAggregateFunction& datatype_aggfunc = dynamic_cast<const DataTypeAggregateFunction&>(*argument_type_ptr);
AggregateFunctionPtr aggfunc = datatype_aggfunc.getFunction();
argument_type_ptr = aggfunc->getArgumentTypes()[0];
}
if (which.idx != TypeIndex::AggregateFunction)
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
const DataTypeAggregateFunction& datatype_aggfunc = dynamic_cast<const DataTypeAggregateFunction&>(*argument_type_ptr);
AggregateFunctionPtr aggfunc = datatype_aggfunc.getFunction();
argument_type_ptr = aggfunc->getArgumentTypes()[0];
AggregateFunctionPtr res(createWithUnsignedIntegerType<AggregateFunctionTemplate, AggregateFunctionGroupBitmapData>(*argument_type_ptr, argument_type_ptr));
if (!res)
throw Exception("Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name, ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);

View File

@ -3,10 +3,13 @@
#include <Columns/ColumnVector.h>
#include <Common/assert_cast.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionGroupBitmapData.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnAggregateFunction.h>
// TODO include this last because of a broken roaring header. See the comment
// inside.
#include <AggregateFunctions/AggregateFunctionGroupBitmapData.h>
namespace DB
{
@ -71,7 +74,7 @@ public:
void add(AggregateDataPtr place, const IColumn ** columns, size_t row_num, Arena *) const override
{
Data & data_lhs = this->data(place);
const Data & data_rhs = this->data(static_cast<const ColumnAggregateFunction &>(*columns[0]).getData()[row_num]);
const Data & data_rhs = this->data(assert_cast<const ColumnAggregateFunction &>(*columns[0]).getData()[row_num]);
if (!data_lhs.doneFirst)
{
data_lhs.doneFirst = true;
@ -110,7 +113,7 @@ public:
void insertResultInto(ConstAggregateDataPtr place, IColumn & to) const override
{
static_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).rbs.size());
assert_cast<ColumnVector<T> &>(to).getData().push_back(this->data(place).rbs.size());
}
const char * getHeaderFilePath() const override { return __FILE__; }

View File

@ -1,14 +1,18 @@
#pragma once
#include <algorithm>
#include <roaring/roaring.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <boost/noncopyable.hpp>
#include <roaring/roaring.hh>
#include <Common/HashTable/SmallTable.h>
#include <Common/PODArray.h>
// Include this header last, because it is an auto-generated dump of questionable
// garbage that breaks the build (e.g. it changes _POSIX_C_SOURCE).
// TODO: find out what it is. On github, they have proper inteface headers like
// this one: https://github.com/RoaringBitmap/CRoaring/blob/master/include/roaring/roaring.h
#include <roaring/roaring.h>
namespace DB
{
/**

View File

@ -53,12 +53,7 @@ public:
/// Special case for 'count' function. It could be called with Nullable arguments
/// - that means - count number of calls, when all arguments are not NULL.
if (nested_function && nested_function->getName() == "count")
{
if (arguments.size() == 1)
return std::make_shared<AggregateFunctionCountNotNullUnary>(arguments[0], params);
else
return std::make_shared<AggregateFunctionCountNotNullVariadic>(arguments, params);
}
return std::make_shared<AggregateFunctionCountNotNullUnary>(arguments[0], params);
if (has_null_types)
return std::make_shared<AggregateFunctionNothing>(arguments, params);

View File

@ -0,0 +1,39 @@
#include <AggregateFunctions/AggregateFunctionOrFill.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
namespace DB
{
template <bool UseNull>
class AggregateFunctionCombinatorOrFill final : public IAggregateFunctionCombinator
{
public:
String getName() const override
{
if constexpr (UseNull)
return "OrNull";
else
return "OrDefault";
}
AggregateFunctionPtr transformAggregateFunction(
const AggregateFunctionPtr & nested_function,
const DataTypes & arguments,
const Array & params) const override
{
return std::make_shared<AggregateFunctionOrFill<UseNull>>(
nested_function,
arguments,
params);
}
};
void registerAggregateFunctionCombinatorOrFill(AggregateFunctionCombinatorFactory & factory)
{
factory.registerCombinator(std::make_shared<AggregateFunctionCombinatorOrFill<false>>());
factory.registerCombinator(std::make_shared<AggregateFunctionCombinatorOrFill<true>>());
}
}

View File

@ -0,0 +1,179 @@
#pragma once
#include <AggregateFunctions/IAggregateFunction.h>
#include <Columns/ColumnNullable.h>
#include <Common/typeid_cast.h>
#include <DataTypes/DataTypeNullable.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ARGUMENT_OUT_OF_BOUND;
}
/**
* -OrDefault and -OrNull combinators for aggregate functions.
* If there are no input values, return NULL or a default value, accordingly.
* Use a single additional byte of data after the nested function data:
* 0 means there was no input, 1 means there was some.
*/
template <bool UseNull>
class AggregateFunctionOrFill final : public IAggregateFunctionHelper<AggregateFunctionOrFill<UseNull>>
{
private:
AggregateFunctionPtr nested_function;
size_t size_of_data;
DataTypePtr inner_type;
bool inner_nullable;
public:
AggregateFunctionOrFill(AggregateFunctionPtr nested_function_, const DataTypes & arguments, const Array & params)
: IAggregateFunctionHelper<AggregateFunctionOrFill>{arguments, params}
, nested_function{nested_function_}
, size_of_data {nested_function->sizeOfData()}
, inner_type {nested_function->getReturnType()}
, inner_nullable {inner_type->isNullable()}
{
// nothing
}
String getName() const override
{
if constexpr (UseNull)
return nested_function->getName() + "OrNull";
else
return nested_function->getName() + "OrDefault";
}
const char * getHeaderFilePath() const override
{
return __FILE__;
}
bool isState() const override
{
return nested_function->isState();
}
bool allocatesMemoryInArena() const override
{
return nested_function->allocatesMemoryInArena();
}
bool hasTrivialDestructor() const override
{
return nested_function->hasTrivialDestructor();
}
size_t sizeOfData() const override
{
return size_of_data + sizeof(char);
}
size_t alignOfData() const override
{
return nested_function->alignOfData();
}
void create(AggregateDataPtr place) const override
{
nested_function->create(place);
place[size_of_data] = 0;
}
void destroy(AggregateDataPtr place) const noexcept override
{
nested_function->destroy(place);
}
void add(
AggregateDataPtr place,
const IColumn ** columns,
size_t row_num,
Arena * arena) const override
{
nested_function->add(place, columns, row_num, arena);
place[size_of_data] = 1;
}
void merge(
AggregateDataPtr place,
ConstAggregateDataPtr rhs,
Arena * arena) const override
{
nested_function->merge(place, rhs, arena);
}
void serialize(
ConstAggregateDataPtr place,
WriteBuffer & buf) const override
{
nested_function->serialize(place, buf);
}
void deserialize(
AggregateDataPtr place,
ReadBuffer & buf,
Arena * arena) const override
{
nested_function->deserialize(place, buf, arena);
}
DataTypePtr getReturnType() const override
{
if constexpr (UseNull)
{
// -OrNull
if (inner_nullable)
return inner_type;
return std::make_shared<DataTypeNullable>(inner_type);
}
else
{
// -OrDefault
return inner_type;
}
}
void insertResultInto(
ConstAggregateDataPtr place,
IColumn & to) const override
{
if (place[size_of_data])
{
if constexpr (UseNull)
{
// -OrNull
if (inner_nullable)
nested_function->insertResultInto(place, to);
else
{
ColumnNullable & col = typeid_cast<ColumnNullable &>(to);
col.getNullMapColumn().insertDefault();
nested_function->insertResultInto(place, col.getNestedColumn());
}
}
else
{
// -OrDefault
nested_function->insertResultInto(place, to);
}
}
else
to.insertDefault();
}
};
}

View File

@ -29,8 +29,8 @@ private:
size_t step;
size_t total;
size_t aod;
size_t sod;
size_t align_of_data;
size_t size_of_data;
public:
AggregateFunctionResample(
@ -47,8 +47,8 @@ public:
, end{end_}
, step{step_}
, total{0}
, aod{nested_function->alignOfData()}
, sod{(nested_function->sizeOfData() + aod - 1) / aod * aod}
, align_of_data{nested_function->alignOfData()}
, size_of_data{(nested_function->sizeOfData() + align_of_data - 1) / align_of_data * align_of_data}
{
// notice: argument types has been checked before
if (step == 0)
@ -94,24 +94,24 @@ public:
size_t sizeOfData() const override
{
return total * sod;
return total * size_of_data;
}
size_t alignOfData() const override
{
return aod;
return align_of_data;
}
void create(AggregateDataPtr place) const override
{
for (size_t i = 0; i < total; ++i)
nested_function->create(place + i * sod);
nested_function->create(place + i * size_of_data);
}
void destroy(AggregateDataPtr place) const noexcept override
{
for (size_t i = 0; i < total; ++i)
nested_function->destroy(place + i * sod);
nested_function->destroy(place + i * size_of_data);
}
void add(
@ -132,7 +132,7 @@ public:
size_t pos = (key - begin) / step;
nested_function->add(place + pos * sod, columns, row_num, arena);
nested_function->add(place + pos * size_of_data, columns, row_num, arena);
}
void merge(
@ -141,7 +141,7 @@ public:
Arena * arena) const override
{
for (size_t i = 0; i < total; ++i)
nested_function->merge(place + i * sod, rhs + i * sod, arena);
nested_function->merge(place + i * size_of_data, rhs + i * size_of_data, arena);
}
void serialize(
@ -149,7 +149,7 @@ public:
WriteBuffer & buf) const override
{
for (size_t i = 0; i < total; ++i)
nested_function->serialize(place + i * sod, buf);
nested_function->serialize(place + i * size_of_data, buf);
}
void deserialize(
@ -158,7 +158,7 @@ public:
Arena * arena) const override
{
for (size_t i = 0; i < total; ++i)
nested_function->deserialize(place + i * sod, buf, arena);
nested_function->deserialize(place + i * size_of_data, buf, arena);
}
DataTypePtr getReturnType() const override
@ -174,7 +174,7 @@ public:
auto & col_offsets = assert_cast<ColumnArray::ColumnOffsets &>(col.getOffsetsColumn());
for (size_t i = 0; i < total; ++i)
nested_function->insertResultInto(place + i * sod, col.getData());
nested_function->insertResultInto(place + i * size_of_data, col.getData());
col_offsets.getData().push_back(col.getData().size());
}

View File

@ -2,6 +2,7 @@
#include <Core/Field.h>
#include <DataTypes/IDataType.h>
#include <IO/WriteHelpers.h>
namespace DB
@ -31,4 +32,22 @@ inline void assertBinary(const std::string & name, const DataTypes & argument_ty
throw Exception("Aggregate function " + name + " require two arguments", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
template<std::size_t maximal_arity>
inline void assertArityAtMost(const std::string & name, const DataTypes & argument_types)
{
if (argument_types.size() <= maximal_arity)
return;
if constexpr (maximal_arity == 0)
throw Exception("Aggregate function " + name + " cannot have arguments",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if constexpr (maximal_arity == 1)
throw Exception("Aggregate function " + name + " requires zero or one argument",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
throw Exception("Aggregate function " + name + " requires at most " + toString(maximal_arity) + " arguments",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
}
}

View File

@ -42,6 +42,7 @@ void registerAggregateFunctionCombinatorForEach(AggregateFunctionCombinatorFacto
void registerAggregateFunctionCombinatorState(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorMerge(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorNull(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorOrFill(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctionCombinatorResample(AggregateFunctionCombinatorFactory &);
void registerAggregateFunctions()
@ -88,6 +89,7 @@ void registerAggregateFunctions()
registerAggregateFunctionCombinatorState(factory);
registerAggregateFunctionCombinatorMerge(factory);
registerAggregateFunctionCombinatorNull(factory);
registerAggregateFunctionCombinatorOrFill(factory);
registerAggregateFunctionCombinatorResample(factory);
}
}

View File

@ -1,3 +1,4 @@
add_subdirectory (Access)
add_subdirectory (Columns)
add_subdirectory (Common)
add_subdirectory (Core)

View File

@ -3,6 +3,7 @@
#include <Poco/Net/NetException.h>
#include <Poco/Net/DNS.h>
#include <Common/BitHelpers.h>
#include <Common/getFQDNOrHostName.h>
#include <Common/isLocalAddress.h>
#include <Common/ProfileEvents.h>

View File

@ -3,6 +3,7 @@
#include <AggregateFunctions/IAggregateFunction.h>
#include <Columns/IColumn.h>
#include <Common/PODArray.h>
#include <Core/Field.h>

View File

@ -1,6 +1,7 @@
#pragma once
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
#include <Columns/ColumnVector.h>
#include <Core/Defines.h>
#include <Common/typeid_cast.h>

View File

@ -2,6 +2,7 @@
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsCommon.h>
#include <Common/PODArray.h>
#include <Common/typeid_cast.h>

View File

@ -4,6 +4,7 @@
#include <Common/typeid_cast.h>
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
#include <Columns/ColumnVectorHelper.h>
#include <Core/Field.h>

View File

@ -5,6 +5,7 @@
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
#include <Columns/ColumnVectorHelper.h>
#include <Core/Field.h>

View File

@ -1,6 +1,7 @@
#include <Interpreters/ExpressionActions.h>
#include <Columns/ColumnFunction.h>
#include <Columns/ColumnsCommon.h>
#include <Common/PODArray.h>
#include <IO/WriteHelpers.h>
#include <Functions/IFunction.h>

View File

@ -1,6 +1,7 @@
#pragma once
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
#include <Columns/ColumnsNumber.h>
#include <Common/typeid_cast.h>
#include <Common/assert_cast.h>

View File

@ -4,6 +4,7 @@
#include <cassert>
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
#include <Common/PODArray.h>
#include <Common/SipHash.h>
#include <Common/memcpySmall.h>

View File

@ -94,16 +94,17 @@ MutableColumnPtr ColumnTuple::cloneResized(size_t new_size) const
Field ColumnTuple::operator[](size_t n) const
{
return Tuple{ext::map<TupleBackend>(columns, [n] (const auto & column) { return (*column)[n]; })};
return ext::map<Tuple>(columns, [n] (const auto & column) { return (*column)[n]; });
}
void ColumnTuple::get(size_t n, Field & res) const
{
const size_t tuple_size = columns.size();
res = Tuple(TupleBackend(tuple_size));
TupleBackend & res_arr = DB::get<Tuple &>(res).toUnderType();
Tuple tuple(tuple_size);
for (const auto i : ext::range(0, tuple_size))
columns[i]->get(n, res_arr[i]);
columns[i]->get(n, tuple[i]);
res = tuple;
}
StringRef ColumnTuple::getDataAt(size_t) const
@ -118,7 +119,7 @@ void ColumnTuple::insertData(const char *, size_t)
void ColumnTuple::insert(const Field & x)
{
const TupleBackend & tuple = DB::get<const Tuple &>(x).toUnderType();
auto & tuple = DB::get<const Tuple &>(x);
const size_t tuple_size = columns.size();
if (tuple.size() != tuple_size)
@ -352,14 +353,14 @@ void ColumnTuple::getExtremes(Field & min, Field & max) const
{
const size_t tuple_size = columns.size();
min = Tuple(TupleBackend(tuple_size));
max = Tuple(TupleBackend(tuple_size));
auto & min_backend = min.get<Tuple &>().toUnderType();
auto & max_backend = max.get<Tuple &>().toUnderType();
Tuple min_tuple(tuple_size);
Tuple max_tuple(tuple_size);
for (const auto i : ext::range(0, tuple_size))
columns[i]->getExtremes(min_backend[i], max_backend[i]);
columns[i]->getExtremes(min_tuple[i], max_tuple[i]);
min = min_tuple;
max = max_tuple;
}
void ColumnTuple::forEachSubcolumn(ColumnCallback callback)

View File

@ -2,6 +2,7 @@
#include <cmath>
#include <Columns/IColumn.h>
#include <Columns/IColumnImpl.h>
#include <Columns/ColumnVectorHelper.h>
#include <common/unaligned.h>
#include <Core/Field.h>

View File

@ -1,10 +1,11 @@
#pragma once
#include <Common/COW.h>
#include <Common/PODArray.h>
#include <Common/PODArray_fwd.h>
#include <Common/Exception.h>
#include <Common/typeid_cast.h>
#include <common/StringRef.h>
#include <Core/Types.h>
class SipHash;
@ -373,32 +374,7 @@ protected:
/// Template is to devirtualize calls to insertFrom method.
/// In derived classes (that use final keyword), implement scatter method as call to scatterImpl.
template <typename Derived>
std::vector<MutablePtr> scatterImpl(ColumnIndex num_columns, const Selector & selector) const
{
size_t num_rows = size();
if (num_rows != selector.size())
throw Exception(
"Size of selector: " + std::to_string(selector.size()) + " doesn't match size of column: " + std::to_string(num_rows),
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
std::vector<MutablePtr> columns(num_columns);
for (auto & column : columns)
column = cloneEmpty();
{
size_t reserve_size = num_rows * 1.1 / num_columns; /// 1.1 is just a guess. Better to use n-sigma rule.
if (reserve_size > 1)
for (auto & column : columns)
column->reserve(reserve_size);
}
for (size_t i = 0; i < num_rows; ++i)
static_cast<Derived &>(*columns[selector[i]]).insertFrom(*this, i);
return columns;
}
std::vector<MutablePtr> scatterImpl(ColumnIndex num_columns, const Selector & selector) const;
};
using ColumnPtr = IColumn::Ptr;

View File

@ -1,6 +1,7 @@
#pragma once
#include <Common/Arena.h>
#include <Common/PODArray.h>
#include <Columns/IColumn.h>
#include <Columns/ColumnsCommon.h>

View File

@ -0,0 +1,45 @@
/**
* This file implements template methods of IColumn that depend on other types
* we don't want to include.
* Currently, this is only the scatterImpl method that depends on PODArray
* implementation.
*/
#pragma once
#include <Columns/IColumn.h>
#include <Common/PODArray.h>
namespace DB
{
template <typename Derived>
std::vector<IColumn::MutablePtr> IColumn::scatterImpl(ColumnIndex num_columns,
const Selector & selector) const
{
size_t num_rows = size();
if (num_rows != selector.size())
throw Exception(
"Size of selector: " + std::to_string(selector.size()) + " doesn't match size of column: " + std::to_string(num_rows),
ErrorCodes::SIZES_OF_COLUMNS_DOESNT_MATCH);
std::vector<MutablePtr> columns(num_columns);
for (auto & column : columns)
column = cloneEmpty();
{
size_t reserve_size = num_rows * 1.1 / num_columns; /// 1.1 is just a guess. Better to use n-sigma rule.
if (reserve_size > 1)
for (auto & column : columns)
column->reserve(reserve_size);
}
for (size_t i = 0; i < num_rows; ++i)
static_cast<Derived &>(*columns[selector[i]]).insertFrom(*this, i);
return columns;
}
}

View File

@ -30,6 +30,8 @@
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <Common/Allocator_fwd.h>
/// Required for older Darwin builds, that lack definition of MAP_ANONYMOUS
#ifndef MAP_ANONYMOUS
@ -84,7 +86,7 @@ namespace ErrorCodes
* - random hint address for mmap
* - mmap_threshold for using mmap less or more
*/
template <bool clear_memory_, bool mmap_populate = false>
template <bool clear_memory_, bool mmap_populate>
class Allocator
{
public:
@ -270,7 +272,7 @@ private:
/** Allocator with optimization to place small memory ranges in automatic memory.
*/
template <typename Base, size_t N = 64, size_t Alignment = 1>
template <typename Base, size_t N, size_t Alignment>
class AllocatorWithStackMemory : private Base
{
private:

View File

@ -0,0 +1,10 @@
/**
* This file provides forward declarations for Allocator.
*/
#pragma once
template <bool clear_memory_, bool mmap_populate = false>
class Allocator;
template <typename Base, size_t N = 64, size_t Alignment = 1>
class AllocatorWithStackMemory;

View File

@ -1,6 +1,6 @@
#include <Common/DiskSpaceMonitor.h>
#include <Common/escapeForFileName.h>
#include <IO/WriteHelpers.h>
#include <Common/quoteString.h>
#include <set>

View File

@ -72,9 +72,8 @@ String FieldVisitorDump::operator() (const Array & x) const
return wb.str();
}
String FieldVisitorDump::operator() (const Tuple & x_def) const
String FieldVisitorDump::operator() (const Tuple & x) const
{
auto & x = x_def.toUnderType();
WriteBufferFromOwnString wb;
wb << "Tuple_(";
@ -149,9 +148,8 @@ String FieldVisitorToString::operator() (const Array & x) const
return wb.str();
}
String FieldVisitorToString::operator() (const Tuple & x_def) const
String FieldVisitorToString::operator() (const Tuple & x) const
{
auto & x = x_def.toUnderType();
WriteBufferFromOwnString wb;
wb << '(';
@ -211,6 +209,16 @@ void FieldVisitorHash::operator() (const String & x) const
hash.update(x.data(), x.size());
}
void FieldVisitorHash::operator() (const Tuple & x) const
{
UInt8 type = Field::Types::Tuple;
hash.update(type);
hash.update(x.size());
for (const auto & elem : x)
applyVisitor(*this, elem);
}
void FieldVisitorHash::operator() (const Array & x) const
{
UInt8 type = Field::Types::Array;

View File

@ -231,6 +231,7 @@ public:
void operator() (const Float64 & x) const;
void operator() (const String & x) const;
void operator() (const Array & x) const;
void operator() (const Tuple & x) const;
void operator() (const DecimalField<Decimal32> & x) const;
void operator() (const DecimalField<Decimal64> & x) const;
void operator() (const DecimalField<Decimal128> & x) const;
@ -479,6 +480,7 @@ public:
bool operator() (Null &) const { throw Exception("Cannot sum Nulls", ErrorCodes::LOGICAL_ERROR); }
bool operator() (String &) const { throw Exception("Cannot sum Strings", ErrorCodes::LOGICAL_ERROR); }
bool operator() (Array &) const { throw Exception("Cannot sum Arrays", ErrorCodes::LOGICAL_ERROR); }
bool operator() (Tuple &) const { throw Exception("Cannot sum Tuples", ErrorCodes::LOGICAL_ERROR); }
bool operator() (UInt128 &) const { throw Exception("Cannot sum UUIDs", ErrorCodes::LOGICAL_ERROR); }
bool operator() (AggregateFunctionStateData &) const { throw Exception("Cannot sum AggregateFunctionStates", ErrorCodes::LOGICAL_ERROR); }

View File

@ -1,5 +1,10 @@
#pragma once
#ifdef __clang__
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wreserved-id-macro"
#endif
#define __msan_unpoison(X, Y)
#define __msan_test_shadow(X, Y) (false)
#define __msan_print_shadow(X, Y)
@ -11,3 +16,7 @@
# include <sanitizer/msan_interface.h>
# endif
#endif
#ifdef __clang__
#pragma clang diagnostic pop
#endif

View File

@ -21,6 +21,8 @@
#include <sys/mman.h>
#endif
#include <Common/PODArray_fwd.h>
namespace DB
{
@ -30,11 +32,6 @@ namespace ErrorCodes
extern const int CANNOT_MPROTECT;
}
inline constexpr size_t integerRoundUp(size_t value, size_t dividend)
{
return ((value + dividend - 1) / dividend) * dividend;
}
/** A dynamic array for POD types.
* Designed for a small number of large arrays (rather than a lot of small ones).
* To be more precise - for use in ColumnVector.
@ -258,7 +255,7 @@ public:
}
};
template <typename T, size_t initial_bytes = 4096, typename TAllocator = Allocator<false>, size_t pad_right_ = 0, size_t pad_left_ = 0>
template <typename T, size_t initial_bytes, typename TAllocator, size_t pad_right_, size_t pad_left_>
class PODArray : public PODArrayBase<sizeof(T), initial_bytes, TAllocator, pad_right_, pad_left_>
{
protected:
@ -625,17 +622,5 @@ void swap(PODArray<T, initial_bytes, TAllocator, pad_right_> & lhs, PODArray<T,
lhs.swap(rhs);
}
/** For columns. Padding is enough to read and write xmm-register at the address of the last element. */
template <typename T, size_t initial_bytes = 4096, typename TAllocator = Allocator<false>>
using PaddedPODArray = PODArray<T, initial_bytes, TAllocator, 15, 16>;
/** A helper for declaring PODArray that uses inline memory.
* The initial size is set to use all the inline bytes, since using less would
* only add some extra allocation calls.
*/
template <typename T, size_t inline_bytes,
size_t rounded_bytes = integerRoundUp(inline_bytes, sizeof(T))>
using PODArrayWithStackMemory = PODArray<T, rounded_bytes,
AllocatorWithStackMemory<Allocator<false>, rounded_bytes, alignof(T)>>;
}

View File

@ -0,0 +1,35 @@
/**
* This file contains some using-declarations that define various kinds of
* PODArray.
*/
#pragma once
#include <Common/Allocator_fwd.h>
namespace DB
{
inline constexpr size_t integerRoundUp(size_t value, size_t dividend)
{
return ((value + dividend - 1) / dividend) * dividend;
}
template <typename T, size_t initial_bytes = 4096,
typename TAllocator = Allocator<false>, size_t pad_right_ = 0,
size_t pad_left_ = 0>
class PODArray;
/** For columns. Padding is enough to read and write xmm-register at the address of the last element. */
template <typename T, size_t initial_bytes = 4096, typename TAllocator = Allocator<false>>
using PaddedPODArray = PODArray<T, initial_bytes, TAllocator, 15, 16>;
/** A helper for declaring PODArray that uses inline memory.
* The initial size is set to use all the inline bytes, since using less would
* only add some extra allocation calls.
*/
template <typename T, size_t inline_bytes,
size_t rounded_bytes = integerRoundUp(inline_bytes, sizeof(T))>
using PODArrayWithStackMemory = PODArray<T, rounded_bytes,
AllocatorWithStackMemory<Allocator<false>, rounded_bytes, alignof(T)>>;
}

View File

@ -121,13 +121,13 @@ ReturnType ThreadPoolImpl<Thread>::scheduleImpl(Job job, int priority, std::opti
}
template <typename Thread>
void ThreadPoolImpl<Thread>::schedule(Job job, int priority)
void ThreadPoolImpl<Thread>::scheduleOrThrowOnError(Job job, int priority)
{
scheduleImpl<void>(std::move(job), priority, std::nullopt);
}
template <typename Thread>
bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_microseconds)
bool ThreadPoolImpl<Thread>::trySchedule(Job job, int priority, uint64_t wait_microseconds) noexcept
{
return scheduleImpl<bool>(std::move(job), priority, wait_microseconds);
}

View File

@ -36,18 +36,23 @@ public:
ThreadPoolImpl(size_t max_threads_, size_t max_free_threads_, size_t queue_size_);
/// Add new job. Locks until number of scheduled jobs is less than maximum or exception in one of threads was thrown.
/// If an exception in some thread was thrown, method silently returns, and exception will be rethrown only on call to 'wait' function.
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
/// Also throws an exception if cannot create thread.
/// Priority: greater is higher.
void schedule(Job job, int priority = 0);
/// NOTE: Probably you should call wait() if exception was thrown. If some previously scheduled jobs are using some objects,
/// located on stack of current thread, the stack must not be unwinded until all jobs finished. However,
/// if ThreadPool is a local object, it will wait for all scheduled jobs in own destructor.
void scheduleOrThrowOnError(Job job, int priority = 0);
/// Wait for specified amount of time and schedule a job or return false.
bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0);
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or return false.
bool trySchedule(Job job, int priority = 0, uint64_t wait_microseconds = 0) noexcept;
/// Wait for specified amount of time and schedule a job or throw an exception.
/// Similar to scheduleOrThrowOnError(...). Wait for specified amount of time and schedule a job or throw an exception.
void scheduleOrThrow(Job job, int priority = 0, uint64_t wait_microseconds = 0);
/// Wait for all currently active jobs to be done.
/// You may call schedule and wait many times in arbitary order.
/// You may call schedule and wait many times in arbitrary order.
/// If any thread was throw an exception, first exception will be rethrown from this method,
/// and exception will be cleared.
void wait();
@ -140,7 +145,7 @@ public:
explicit ThreadFromGlobalPool(Function && func, Args &&... args)
: state(std::make_shared<Poco::Event>())
{
/// NOTE: If this will throw an exception, the descructor won't be called.
/// NOTE: If this will throw an exception, the destructor won't be called.
GlobalThreadPool::instance().scheduleOrThrow([
state = state,
func = std::forward<Function>(func),

View File

@ -1,8 +1,13 @@
#pragma once
#include <string>
#include <IO/WriteBuffer.h>
namespace DB
{
class WriteBuffer;
}
/// Displays the passed size in bytes as 123.45 GiB.
void formatReadableSizeWithBinarySuffix(double value, DB::WriteBuffer & out, int precision = 2);

View File

@ -24,7 +24,7 @@ std::vector<std::string> getMultipleValuesFromConfig(const Poco::Util::AbstractC
{
std::vector<std::string> values;
for (const auto & key : DB::getMultipleKeysFromConfig(config, root, name))
values.emplace_back(config.getString(key));
values.emplace_back(config.getString(root.empty() ? key : root + "." + key));
return values;
}

View File

@ -0,0 +1,37 @@
#include <Common/quoteString.h>
#include <IO/WriteHelpers.h>
#include <IO/WriteBufferFromString.h>
namespace DB
{
String quoteString(const StringRef & x)
{
String res(x.size, '\0');
WriteBufferFromString wb(res);
writeQuotedString(x, wb);
return res;
}
String backQuote(const StringRef & x)
{
String res(x.size, '\0');
{
WriteBufferFromString wb(res);
writeBackQuotedString(x, wb);
}
return res;
}
String backQuoteIfNeed(const StringRef & x)
{
String res(x.size, '\0');
{
WriteBufferFromString wb(res);
writeProbablyBackQuotedString(x, wb);
}
return res;
}
}

View File

@ -0,0 +1,17 @@
#pragma once
#include <Core/Types.h>
#include <common/StringRef.h>
namespace DB
{
/// Quote the string.
String quoteString(const StringRef & x);
/// Quote the identifier with backquotes.
String backQuote(const StringRef & x);
/// Quote the identifier with backquotes, if required.
String backQuoteIfNeed(const StringRef & x);
}

View File

@ -0,0 +1,26 @@
#include <Common/getMultipleKeysFromConfig.h>
#include <Poco/AutoPtr.h>
#include <Poco/Util/XMLConfiguration.h>
#include <gtest/gtest.h>
using namespace DB;
TEST(Common, getMultipleValuesFromConfig)
{
std::istringstream xml_isteam(R"END(<?xml version="1.0"?>
<yandex>
<first_level>
<second_level>0</second_level>
<second_level>1</second_level>
<second_level>2</second_level>
<second_level>3</second_level>
</first_level>
</yandex>)END");
Poco::AutoPtr<Poco::Util::XMLConfiguration> config = new Poco::Util::XMLConfiguration(xml_isteam);
std::vector<std::string> answer = getMultipleValuesFromConfig(*config, "first_level", "second_level");
std::vector<std::string> right_answer = {"0", "1", "2", "3"};
EXPECT_EQ(answer, right_answer);
}

View File

@ -21,14 +21,14 @@ TEST(ThreadPool, ConcurrentWait)
ThreadPool pool(num_threads);
for (size_t i = 0; i < num_jobs; ++i)
pool.schedule(worker);
pool.scheduleOrThrowOnError(worker);
constexpr size_t num_waiting_threads = 4;
ThreadPool waiting_pool(num_waiting_threads);
for (size_t i = 0; i < num_waiting_threads; ++i)
waiting_pool.schedule([&pool]{ pool.wait(); });
waiting_pool.scheduleOrThrowOnError([&pool] { pool.wait(); });
waiting_pool.wait();
}

View File

@ -30,11 +30,11 @@ TEST(ThreadPool, GlobalFull1)
ThreadPool pool(num_jobs);
for (size_t i = 0; i < capacity; ++i)
pool.schedule(func);
pool.scheduleOrThrowOnError(func);
for (size_t i = capacity; i < num_jobs; ++i)
{
EXPECT_THROW(pool.schedule(func), DB::Exception);
EXPECT_THROW(pool.scheduleOrThrowOnError(func), DB::Exception);
++counter;
}
@ -67,10 +67,10 @@ TEST(ThreadPool, GlobalFull2)
ThreadPool pool(capacity, 0, capacity);
for (size_t i = 0; i < capacity; ++i)
pool.schedule(func);
pool.scheduleOrThrowOnError(func);
ThreadPool another_pool(1);
EXPECT_THROW(another_pool.schedule(func), DB::Exception);
EXPECT_THROW(another_pool.scheduleOrThrowOnError(func), DB::Exception);
++counter;
@ -79,7 +79,7 @@ TEST(ThreadPool, GlobalFull2)
global_pool.wait();
for (size_t i = 0; i < capacity; ++i)
another_pool.schedule([&] { ++counter; });
another_pool.scheduleOrThrowOnError([&] { ++counter; });
another_pool.wait();
EXPECT_EQ(counter, capacity * 2 + 1);

View File

@ -14,7 +14,7 @@ int test()
std::atomic<int> counter{0};
for (size_t i = 0; i < 10; ++i)
pool.schedule([&]{ ++counter; });
pool.scheduleOrThrowOnError([&]{ ++counter; });
pool.wait();
return counter;

View File

@ -14,7 +14,7 @@ TEST(ThreadPool, Loop)
size_t threads = 16;
ThreadPool pool(threads);
for (size_t j = 0; j < threads; ++j)
pool.schedule([&]{ ++res; });
pool.scheduleOrThrowOnError([&] { ++res; });
pool.wait();
}

View File

@ -9,12 +9,12 @@ bool check()
{
ThreadPool pool(10);
pool.schedule([]{ throw std::runtime_error("Hello, world!"); });
pool.scheduleOrThrowOnError([] { throw std::runtime_error("Hello, world!"); });
try
{
for (size_t i = 0; i < 100; ++i)
pool.schedule([]{}); /// An exception will be rethrown from this method.
pool.scheduleOrThrowOnError([] {}); /// An exception will be rethrown from this method.
}
catch (const std::runtime_error &)
{

View File

@ -37,8 +37,8 @@ int main(int, char **)
ThreadPool tp(8);
for (size_t i = 0; i < n; ++i)
{
tp.schedule(std::bind(thread1, std::ref(x), std::ref(results[i])));
tp.schedule(std::bind(thread2, std::ref(x), (rand() % 2) ? s1 : s2));
tp.scheduleOrThrowOnError(std::bind(thread1, std::ref(x), std::ref(results[i])));
tp.scheduleOrThrowOnError(std::bind(thread2, std::ref(x), (rand() % 2) ? s1 : s2));
}
tp.wait();

View File

@ -284,7 +284,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate1,
pool.scheduleOrThrowOnError(std::bind(aggregate1,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
@ -338,7 +338,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate12,
pool.scheduleOrThrowOnError(std::bind(aggregate12,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
@ -397,7 +397,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate1,
pool.scheduleOrThrowOnError(std::bind(aggregate1,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
@ -473,7 +473,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate2,
pool.scheduleOrThrowOnError(std::bind(aggregate2,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
@ -499,7 +499,7 @@ int main(int argc, char ** argv)
watch.restart();
for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i)
pool.schedule(std::bind(merge2,
pool.scheduleOrThrowOnError(std::bind(merge2,
maps.data(), num_threads, i));
pool.wait();
@ -527,7 +527,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate22,
pool.scheduleOrThrowOnError(std::bind(aggregate22,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));
@ -553,7 +553,7 @@ int main(int argc, char ** argv)
watch.restart();
for (size_t i = 0; i < MapTwoLevel::NUM_BUCKETS; ++i)
pool.schedule(std::bind(merge2, maps.data(), num_threads, i));
pool.scheduleOrThrowOnError(std::bind(merge2, maps.data(), num_threads, i));
pool.wait();
@ -592,7 +592,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate3,
pool.scheduleOrThrowOnError(std::bind(aggregate3,
std::ref(local_maps[i]),
std::ref(global_map),
std::ref(mutex),
@ -658,7 +658,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate33,
pool.scheduleOrThrowOnError(std::bind(aggregate33,
std::ref(local_maps[i]),
std::ref(global_map),
std::ref(mutex),
@ -727,7 +727,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate4,
pool.scheduleOrThrowOnError(std::bind(aggregate4,
std::ref(local_maps[i]),
std::ref(global_map),
mutexes.data(),
@ -797,7 +797,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate5,
pool.scheduleOrThrowOnError(std::bind(aggregate5,
std::ref(local_maps[i]),
std::ref(global_map),
data.begin() + (data.size() * i) / num_threads,
@ -860,7 +860,7 @@ int main(int argc, char ** argv)
Stopwatch watch;
for (size_t i = 0; i < num_threads; ++i)
pool.schedule(std::bind(aggregate1,
pool.scheduleOrThrowOnError(std::bind(aggregate1,
std::ref(maps[i]),
data.begin() + (data.size() * i) / num_threads,
data.begin() + (data.size() * (i + 1)) / num_threads));

View File

@ -42,7 +42,7 @@ struct AggregateIndependent
auto end = data.begin() + (data.size() * (i + 1)) / num_threads;
auto & map = *results[i];
pool.schedule([&, begin, end]()
pool.scheduleOrThrowOnError([&, begin, end]()
{
for (auto it = begin; it != end; ++it)
{
@ -85,7 +85,7 @@ struct AggregateIndependentWithSequentialKeysOptimization
auto end = data.begin() + (data.size() * (i + 1)) / num_threads;
auto & map = *results[i];
pool.schedule([&, begin, end]()
pool.scheduleOrThrowOnError([&, begin, end]()
{
typename Map::LookupResult place = nullptr;
Key prev_key {};
@ -180,7 +180,7 @@ struct MergeParallelForTwoLevelTable
ThreadPool & pool)
{
for (size_t bucket = 0; bucket < Map::NUM_BUCKETS; ++bucket)
pool.schedule([&, bucket, num_maps]
pool.scheduleOrThrowOnError([&, bucket, num_maps]
{
std::vector<typename Map::Impl *> section(num_maps);
for (size_t i = 0; i < num_maps; ++i)

View File

@ -66,7 +66,7 @@ int main(int argc, char ** argv)
test(n, "Create and destroy ThreadPool each iteration", []
{
ThreadPool tp(1);
tp.schedule(f);
tp.scheduleOrThrowOnError(f);
tp.wait();
});
@ -90,7 +90,7 @@ int main(int argc, char ** argv)
test(n, "Schedule job for Threadpool each iteration", [&tp]
{
tp.schedule(f);
tp.scheduleOrThrowOnError(f);
tp.wait();
});
}
@ -100,7 +100,7 @@ int main(int argc, char ** argv)
test(n, "Schedule job for Threadpool with 128 threads each iteration", [&tp]
{
tp.schedule(f);
tp.scheduleOrThrowOnError(f);
tp.wait();
});
}

View File

@ -1,5 +1,6 @@
#include <Compression/CompressionCodecMultiple.h>
#include <Compression/CompressionInfo.h>
#include <Common/PODArray.h>
#include <common/unaligned.h>
#include <Compression/CompressionFactory.h>
#include <IO/ReadHelpers.h>

View File

@ -4,7 +4,6 @@
#include <IO/ReadBuffer.h>
#include <IO/WriteBuffer.h>
#include <IO/BufferWithOwnMemory.h>
#include <Common/PODArray.h>
#include <DataTypes/IDataType.h>
#include <boost/noncopyable.hpp>
#include <IO/UncompressedCache.h>

View File

@ -152,9 +152,8 @@ namespace DB
buf.write(res.data(), res.size());
}
void readBinary(Tuple & x_def, ReadBuffer & buf)
void readBinary(Tuple & x, ReadBuffer & buf)
{
auto & x = x_def.toUnderType();
size_t size;
DB::readBinary(size, buf);
@ -231,9 +230,8 @@ namespace DB
}
}
void writeBinary(const Tuple & x_def, WriteBuffer & buf)
void writeBinary(const Tuple & x, WriteBuffer & buf)
{
auto & x = x_def.toUnderType();
const size_t size = x.size();
DB::writeBinary(size, buf);
@ -292,7 +290,12 @@ namespace DB
void writeText(const Tuple & x, WriteBuffer & buf)
{
DB::String res = applyVisitor(DB::FieldVisitorToString(), DB::Field(x));
writeFieldText(DB::Field(x), buf);
}
void writeFieldText(const Field & x, WriteBuffer & buf)
{
DB::String res = applyVisitor(DB::FieldVisitorToString(), x);
buf.write(res.data(), res.size());
}

View File

@ -34,9 +34,23 @@ template <typename T>
using NearestFieldType = typename NearestFieldTypeImpl<T>::Type;
class Field;
using Array = std::vector<Field>;
using TupleBackend = std::vector<Field>;
STRONG_TYPEDEF(TupleBackend, Tuple) /// Array and Tuple are different types with equal representation inside Field.
using FieldVector = std::vector<Field>;
/// Array and Tuple use the same storage type -- FieldVector, but we declare
/// distinct types for them, so that the caller can choose whether it wants to
/// construct a Field of Array or a Tuple type. An alternative approach would be
/// to construct both of these types from FieldVector, and have the caller
/// specify the desired Field type explicitly.
#define DEFINE_FIELD_VECTOR(X) \
struct X : public FieldVector \
{ \
using FieldVector::FieldVector; \
}
DEFINE_FIELD_VECTOR(Array);
DEFINE_FIELD_VECTOR(Tuple);
#undef DEFINE_FIELD_VECTOR
struct AggregateFunctionStateData
{
@ -457,7 +471,6 @@ private:
void createConcrete(T && x)
{
using UnqualifiedType = std::decay_t<T>;
which = TypeToEnum<UnqualifiedType>::value;
// In both Field and PODArray, small types may be stored as wider types,
// e.g. char is stored as UInt64. Field can return this extended value
@ -466,6 +479,7 @@ private:
// nominal type.
using StorageType = NearestFieldType<UnqualifiedType>;
new (&storage) StorageType(std::forward<T>(x));
which = TypeToEnum<UnqualifiedType>::value;
}
/// Assuming same types.
@ -748,5 +762,7 @@ void writeBinary(const Tuple & x, WriteBuffer & buf);
void writeText(const Tuple & x, WriteBuffer & buf);
void writeFieldText(const Field & x, WriteBuffer & buf);
[[noreturn]] inline void writeQuoted(const Tuple &, WriteBuffer &) { throw Exception("Cannot write Tuple quoted.", ErrorCodes::NOT_IMPLEMENTED); }
}

View File

@ -919,10 +919,10 @@ public:
auto user = context.getUser(user_name);
if (user->password_double_sha1_hex.empty())
if (user->authentication.getType() != DB::Authentication::DOUBLE_SHA1_PASSWORD)
throw Exception("Cannot use " + getName() + " auth plugin for user " + user_name + " since its password isn't specified using double SHA1.", ErrorCodes::UNKNOWN_EXCEPTION);
Poco::SHA1Engine::Digest double_sha1_value = Poco::DigestEngine::digestFromHex(user->password_double_sha1_hex);
Poco::SHA1Engine::Digest double_sha1_value = user->authentication.getPasswordHashBinary();
assert(double_sha1_value.size() == Poco::SHA1Engine::DIGEST_SIZE);
Poco::SHA1Engine engine;

View File

@ -176,8 +176,9 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, input_format_skip_unknown_fields, false, "Skip columns with unknown names from input data (it works for JSONEachRow, CSVWithNames, TSVWithNames and TSKV formats).") \
M(SettingBool, input_format_with_names_use_header, false, "For TSVWithNames and CSVWithNames input formats this controls whether format parser is to assume that column data appear in the input exactly as they are specified in the header.") \
M(SettingBool, input_format_import_nested_json, false, "Map nested JSON data to nested tables (it works for JSONEachRow format).") \
M(SettingBool, input_format_defaults_for_omitted_fields, true, "For input data calculate default expressions for omitted fields (it works for JSONEachRow format).") \
M(SettingBool, input_format_null_as_default, false, "For CSV format initialize null fields with default values if data type of this field is not nullable") \
M(SettingBool, input_format_defaults_for_omitted_fields, true, "For input data calculate default expressions for omitted fields (it works for JSONEachRow, CSV and TSV formats).") \
M(SettingBool, input_format_tsv_empty_as_default, false, "Treat empty fields in TSV input as default values.") \
M(SettingBool, input_format_null_as_default, false, "For text input formats initialize null fields with default values if data type of this field is not nullable") \
\
M(SettingBool, input_format_values_interpret_expressions, true, "For Values format: if field could not be parsed by streaming parser, run SQL parser and try to interpret it as SQL expression.") \
M(SettingBool, input_format_values_deduce_templates_of_expressions, false, "For Values format: if field could not be parsed by streaming parser, run SQL parser, deduce template of the SQL expression, try to parse all rows using template and then interpret expression for all rows.") \
@ -202,8 +203,8 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingBool, fsync_metadata, 1, "Do fsync after changing metadata for tables and databases (.sql files). Could be disabled in case of poor latency on server with high load of DDL queries and high load of disk subsystem.") \
\
M(SettingUInt64, input_format_allow_errors_num, 0, "Maximum absolute amount of errors while reading text formats (like CSV, TSV). In case of error, if both absolute and relative values are non-zero, and at least absolute or relative amount of errors is lower than corresponding value, will skip until next line and continue.") \
M(SettingFloat, input_format_allow_errors_ratio, 0, "Maximum relative amount of errors while reading text formats (like CSV, TSV). In case of error, if both absolute and relative values are non-zero, and at least absolute or relative amount of errors is lower than corresponding value, will skip until next line and continue.") \
M(SettingUInt64, input_format_allow_errors_num, 0, "Maximum absolute amount of errors while reading text formats (like CSV, TSV). In case of error, if at least absolute or relative amount of errors is lower than corresponding value, will skip until next line and continue.") \
M(SettingFloat, input_format_allow_errors_ratio, 0, "Maximum relative amount of errors while reading text formats (like CSV, TSV). In case of error, if at least absolute or relative amount of errors is lower than corresponding value, will skip until next line and continue.") \
\
M(SettingBool, join_use_nulls, 0, "Use NULLs for non-joined rows of outer JOINs for types that can be inside Nullable. If false, use default value of corresponding columns data type.") \
\
@ -306,8 +307,9 @@ struct Settings : public SettingsCollection<Settings>
M(SettingBool, join_any_take_last_row, false, "When disabled (default) ANY JOIN will take the first found row for a key. When enabled, it will take the last row seen if there are multiple rows for the same key.") \
M(SettingBool, partial_merge_join, false, "Use partial merge join instead of hash join for LEFT and INNER JOINs.") \
M(SettingBool, partial_merge_join_optimizations, false, "Enable optimizations in partial merge join") \
M(SettingUInt64, default_max_bytes_in_join, 100000000, "Maximum size of right-side table if limit's required but max_bytes_in_join is not set.") \
M(SettingUInt64, partial_merge_join_rows_in_right_blocks, 10000, "Split right-hand joining data in blocks of specified size. It's a portion of data indexed by min-max values and possibly unloaded on disk.") \
M(SettingFloat, partial_merge_join_rows_in_left_blocks, 10000, "Group left-hand joining data in bigger blocks. Setting it to a bigger value increase JOIN performance and memory usage.") \
M(SettingUInt64, partial_merge_join_rows_in_left_blocks, 10000, "Group left-hand joining data in bigger blocks. Setting it to a bigger value increase JOIN performance and memory usage.") \
\
M(SettingUInt64, max_rows_to_transfer, 0, "Maximum size (in rows) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \
M(SettingUInt64, max_bytes_to_transfer, 0, "Maximum size (in uncompressed bytes) of the transmitted external table obtained when the GLOBAL IN/JOIN section is executed.") \
@ -344,7 +346,6 @@ struct Settings : public SettingsCollection<Settings>
\
M(SettingBool, prefer_localhost_replica, 1, "1 - always send query to local replica, if it exists. 0 - choose replica to send query between local and remote ones according to load_balancing") \
M(SettingUInt64, max_fetch_partition_retries_count, 5, "Amount of retries while fetching partition from another host.") \
M(SettingBool, asterisk_left_columns_only, 0, "If it is set to true, the asterisk only return left of join query.") \
M(SettingUInt64, http_max_multipart_form_data_size, 1024 * 1024 * 1024, "Limit on size of multipart/form-data content. This setting cannot be parsed from URL parameters and should be set in user profile. Note that content is parsed and external tables are created in memory before start of query execution. And this is the only limit that has effect on that stage (limits on max memory usage and max execution time have no effect while reading HTTP form data).") \
M(SettingBool, calculate_text_stack_trace, 1, "Calculate text stack trace in case of exceptions during query execution. This is the default. It requires symbol lookups that may slow down fuzzing tests when huge amount of wrong queries are executed. In normal cases you should not disable this option.") \
M(SettingBool, allow_ddl, true, "If it is set to true, then a user is allowed to executed DDL queries.") \

View File

@ -36,7 +36,7 @@ void AsynchronousBlockInputStream::next()
{
ready.reset();
pool.schedule([this, thread_group = CurrentThread::getGroup()] ()
pool.scheduleOrThrowOnError([this, thread_group = CurrentThread::getGroup()]()
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::QueryThread};

View File

@ -4,6 +4,7 @@
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnsNumber.h>
#include <Common/assert_cast.h>
#include <Common/quoteString.h>
#include <Common/FieldVisitors.h>

View File

@ -1,85 +0,0 @@
#include <Interpreters/ExpressionActions.h>
#include <DataStreams/CheckNonEmptySetBlockInputStream.h>
#include <Interpreters/Set.h>
#include <Interpreters/Join.h>
#include <Columns/ColumnSet.h>
#include <Columns/ColumnConst.h>
namespace DB
{
CheckNonEmptySetBlockInputStream::CheckNonEmptySetBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const NameSet sets_)
: expression(expression_), sets(sets_)
{
children.push_back(input);
cached_header = children.back()->getHeader();
}
String CheckNonEmptySetBlockInputStream::getName() const { return "CheckNonEmptySet"; }
Block CheckNonEmptySetBlockInputStream::getTotals()
{
return children.back()->getTotals();
}
Block CheckNonEmptySetBlockInputStream::getHeader() const
{
return cached_header.cloneEmpty();
}
Block CheckNonEmptySetBlockInputStream::readImpl()
{
if (!initialized)
{
/// CheckNonEmptyBlockInputStream in the downstream with CreatingSetsBlockInputStream. So set has been created.
cached_result = inOrInnerRightJoinWithEmpty();
initialized = true;
}
Block res;
if (isCancelledOrThrowIfKilled() || cached_result)
return res;
return children.back()->read();
}
bool CheckNonEmptySetBlockInputStream::inOrInnerRightJoinWithEmpty() const
{
InOrInnerRightJoinWithEmpty checker;
for (const auto & action : expression->getActions())
{
if (action.type == ExpressionAction::ARRAY_JOIN)
{
return false;
}
else if (action.type == ExpressionAction::JOIN)
{
if (const auto * join = dynamic_cast<Join *>(action.join.get()))
{
checker.hasJoin = true;
checker.innerRightJoinWithEmpty &= join->getTotalRowCount() == 0 && isInnerOrRight(join->getKind());
}
}
else if (action.type == ExpressionAction::ADD_COLUMN)
{
if (!sets.count(action.result_name))
continue;
checker.hasIn = true;
ColumnPtr column_set_ptr = action.added_column;
const ColumnSet * column_set = typeid_cast<const ColumnSet *>(&*column_set_ptr);
checker.inWithEmpty &= column_set && column_set->getData()->getTotalRowCount() == 0;
}
}
/// Get the final result.
return checker.result();
}
}

View File

@ -1,63 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Core/Names.h>
namespace DB
{
class CheckNonEmptySetBlockInputStream : public IBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
public:
CheckNonEmptySetBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_, const NameSet sets_);
String getName() const override;
Block getTotals() override;
Block getHeader() const override;
protected:
Block readImpl() override;
private:
Block cached_header;
ExpressionActionsPtr expression;
bool initialized = false;
bool cached_result = false;
NameSet sets;
bool inOrInnerRightJoinWithEmpty() const;
/**
* Used to determine if actions are IN OR INNER/RIGHT JOIN with empty.
*/
struct InOrInnerRightJoinWithEmpty
{
bool hasJoin = false;
bool hasIn = false;
bool innerRightJoinWithEmpty = true;
bool inWithEmpty = true;
bool result()
{
if (hasJoin && !hasIn)
return innerRightJoinWithEmpty;
else if (hasIn && !hasJoin)
return inWithEmpty;
else if (hasJoin && hasIn)
return innerRightJoinWithEmpty && inWithEmpty;
return false;
}
};
};
}

View File

@ -2,6 +2,7 @@
#include <Interpreters/castColumn.h>
#include <Columns/ColumnConst.h>
#include <Common/assert_cast.h>
#include <Common/quoteString.h>
#include <Parsers/IAST.h>

View File

@ -30,6 +30,14 @@ Block ExpressionBlockInputStream::getHeader() const
Block ExpressionBlockInputStream::readImpl()
{
if (!initialized)
{
if (expression->resultIsAlwaysEmpty())
return {};
initialized = true;
}
Block res = children.back()->read();
if (res)
expression->execute(res);

View File

@ -31,6 +31,7 @@ protected:
private:
ExpressionActionsPtr expression;
Block cached_header;
bool initialized = false;
};
}

View File

@ -17,9 +17,11 @@ namespace ErrorCodes
}
FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_,
const String & filter_column_name, bool remove_filter_)
: remove_filter(remove_filter_), expression(expression_)
FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, ExpressionActionsPtr expression_,
String filter_column_name_, bool remove_filter_)
: remove_filter(remove_filter_)
, expression(std::move(expression_))
, filter_column_name(std::move(filter_column_name_))
{
children.push_back(input);
@ -72,6 +74,9 @@ Block FilterBlockInputStream::readImpl()
if (constant_filter_description.always_false)
return removeFilterIfNeed(std::move(res));
if (expression->checkColumnIsAlwaysFalse(filter_column_name))
return {};
/// Until non-empty block after filtering or end of stream.
while (1)
{

View File

@ -20,8 +20,8 @@ private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
public:
FilterBlockInputStream(const BlockInputStreamPtr & input, const ExpressionActionsPtr & expression_,
const String & filter_column_name_, bool remove_filter_ = false);
FilterBlockInputStream(const BlockInputStreamPtr & input, ExpressionActionsPtr expression_,
String filter_column_name_, bool remove_filter_ = false);
String getName() const override;
Block getTotals() override;
@ -35,6 +35,7 @@ protected:
private:
ExpressionActionsPtr expression;
Block header;
String filter_column_name;
ssize_t filter_column;
ConstantFilterDescription constant_filter_description;

View File

@ -1,4 +1,5 @@
#include <DataStreams/LimitByBlockInputStream.h>
#include <Common/PODArray.h>
#include <Common/SipHash.h>

View File

@ -2,7 +2,7 @@
#include <DataStreams/MergeSortingBlockInputStream.h>
#include <DataStreams/MergingSortedBlockInputStream.h>
#include <DataStreams/NativeBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/TemporaryFileStream.h>
#include <DataStreams/processConstants.h>
#include <Common/formatReadable.h>
#include <IO/WriteBufferFromFile.h>
@ -84,14 +84,11 @@ Block MergeSortingBlockInputStream::readImpl()
temporary_files.emplace_back(createTemporaryFile(tmp_path));
const std::string & path = temporary_files.back()->path();
WriteBufferFromFile file_buf(path);
CompressedWriteBuffer compressed_buf(file_buf);
NativeBlockOutputStream block_out(compressed_buf, 0, header_without_constants);
MergeSortingBlocksBlockInputStream block_in(blocks, description, max_merged_block_size, limit);
LOG_INFO(log, "Sorting and writing part of data into temporary file " + path);
ProfileEvents::increment(ProfileEvents::ExternalSortWritePart);
copyData(block_in, block_out, &is_cancelled); /// NOTE. Possibly limit disk usage.
TemporaryFileStream::write(path, header_without_constants, block_in, &is_cancelled); /// NOTE. Possibly limit disk usage.
LOG_INFO(log, "Done writing part of data into temporary file " + path);
blocks.clear();
@ -138,7 +135,7 @@ Block MergeSortingBlockInputStream::readImpl()
MergeSortingBlocksBlockInputStream::MergeSortingBlocksBlockInputStream(
Blocks & blocks_, SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_)
Blocks & blocks_, const SortDescription & description_, size_t max_merged_block_size_, UInt64 limit_)
: blocks(blocks_), header(blocks.at(0).cloneEmpty()), description(description_), max_merged_block_size(max_merged_block_size_), limit(limit_)
{
Blocks nonempty_blocks;

View File

@ -3,14 +3,13 @@
#include <queue>
#include <common/logger_useful.h>
#include <Common/filesystemHelpers.h>
#include <Common/filesystemHelpers.h>
#include <Core/SortDescription.h>
#include <Core/SortCursor.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/NativeBlockInputStream.h>
#include <DataStreams/TemporaryFileStream.h>
#include <IO/ReadBufferFromFile.h>
#include <Compression/CompressedReadBuffer.h>
@ -19,6 +18,8 @@
namespace DB
{
struct TemporaryFileStream;
namespace ErrorCodes
{
extern const int NOT_ENOUGH_SPACE;
@ -34,7 +35,7 @@ class MergeSortingBlocksBlockInputStream : public IBlockInputStream
{
public:
/// limit - if not 0, allowed to return just first 'limit' rows in sorted order.
MergeSortingBlocksBlockInputStream(Blocks & blocks_, SortDescription & description_,
MergeSortingBlocksBlockInputStream(Blocks & blocks_, const SortDescription & description_,
size_t max_merged_block_size_, UInt64 limit_ = 0);
String getName() const override { return "MergeSortingBlocks"; }

Some files were not shown because too many files have changed in this diff Show More