Merge with consistent_metadata2

This commit is contained in:
alesapin 2020-05-28 19:45:51 +03:00
commit 860dc2c814
138 changed files with 2196 additions and 1090 deletions

View File

@ -112,16 +112,21 @@ if (PROTOBUF_GENERATE_CPP_SCRIPT_MODE)
set (intermediate_dir ${DIR}/intermediate)
set (intermediate_output "${intermediate_dir}/${FILENAME}")
if (COMPILER_ID STREQUAL "GNU")
if (COMPILER_ID STREQUAL "Clang")
set (pragma_push "#pragma clang diagnostic push\n")
set (pragma_pop "#pragma clang diagnostic pop\n")
set (pragma_disable_warnings "#pragma clang diagnostic ignored \"-Weverything\"\n")
elseif (COMPILER_ID MATCHES "GNU")
set (pragma_push "#pragma GCC diagnostic push\n")
set (pragma_pop "#pragma GCC diagnostic pop\n")
set (pragma_disable_warnings "#pragma GCC diagnostic ignored \"-Wall\"\n"
"#pragma GCC diagnostic ignored \"-Wextra\"\n"
"#pragma GCC diagnostic ignored \"-Warray-bounds\"\n")
elseif (COMPILER_ID MATCHES "Clang")
set (pragma_push "#pragma clang diagnostic push\n")
set (pragma_pop "#pragma clang diagnostic pop\n")
set (pragma_disable_warnings "#pragma clang diagnostic ignored \"-Weverything\"\n")
"#pragma GCC diagnostic ignored \"-Warray-bounds\"\n"
"#pragma GCC diagnostic ignored \"-Wold-style-cast\"\n"
"#pragma GCC diagnostic ignored \"-Wshadow\"\n"
"#pragma GCC diagnostic ignored \"-Wsuggest-override\"\n"
"#pragma GCC diagnostic ignored \"-Wcast-qual\"\n"
"#pragma GCC diagnostic ignored \"-Wunused-parameter\"\n")
endif()
if (${FILENAME} MATCHES ".*\\.h")

View File

@ -618,7 +618,12 @@ if (USE_INTERNAL_CCTZ)
add_library(tzdata STATIC ${TZ_OBJS})
set_target_properties(tzdata PROPERTIES LINKER_LANGUAGE C)
target_link_libraries(cctz -Wl,--whole-archive tzdata -Wl,--no-whole-archive) # whole-archive prevents symbols from being discarded
# whole-archive prevents symbols from being discarded for unknown reason
# CMake can shuffle each of target_link_libraries arguments with other
# libraries in linker command. To avoid this we hardcode whole-archive
# library into single string.
add_dependencies(cctz tzdata)
target_link_libraries(cctz INTERFACE "-Wl,--whole-archive $<TARGET_FILE:tzdata> -Wl,--no-whole-archive")
endif ()
else ()

2
contrib/jemalloc vendored

@ -1 +1 @@
Subproject commit cd2931ad9bbd78208565716ab102e86d858c2fff
Subproject commit ea6b3e973b477b8061e0076bb257dbd7f3faa756

View File

@ -17,7 +17,13 @@ if (ENABLE_JEMALLOC)
#
# By enabling percpu_arena number of arenas limited to number of CPUs and hence
# this problem should go away.
set (JEMALLOC_CONFIG_MALLOC_CONF "percpu_arena:percpu" CACHE STRING "Change default configuration string of JEMalloc" )
set (JEMALLOC_CONFIG_MALLOC_CONF "percpu_arena:percpu,oversize_threshold:0")
# CACHE variable is empty, to allow changing defaults without necessity
# to purge cache
set (JEMALLOC_CONFIG_MALLOC_CONF_OVERRIDE "" CACHE STRING "Change default configuration string of JEMalloc" )
if (JEMALLOC_CONFIG_MALLOC_CONF_OVERRIDE)
set (JEMALLOC_CONFIG_MALLOC_CONF "${JEMALLOC_CONFIG_MALLOC_CONF_OVERRIDE}")
endif()
message (STATUS "jemalloc malloc_conf: ${JEMALLOC_CONFIG_MALLOC_CONF}")
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/jemalloc")
@ -55,6 +61,7 @@ if (ENABLE_JEMALLOC)
${LIBRARY_DIR}/src/ticker.c
${LIBRARY_DIR}/src/tsd.c
${LIBRARY_DIR}/src/witness.c
${LIBRARY_DIR}/src/safety_check.c
)
if (OS_DARWIN)
list(APPEND SRCS ${LIBRARY_DIR}/src/zone.c)
@ -89,6 +96,8 @@ if (ENABLE_JEMALLOC)
endif ()
target_compile_options(jemalloc PRIVATE -Wno-redundant-decls)
# for RTLD_NEXT
target_compile_options(jemalloc PRIVATE -D_GNU_SOURCE)
else ()
find_library(LIBRARY_JEMALLOC jemalloc)
find_path(INCLUDE_JEMALLOC jemalloc/jemalloc.h)

View File

@ -5,6 +5,12 @@
/* Defined if alloc_size attribute is supported. */
#define JEMALLOC_HAVE_ATTR_ALLOC_SIZE
/* Defined if format_arg(...) attribute is supported. */
#define JEMALLOC_HAVE_ATTR_FORMAT_ARG
/* Defined if format(gnu_printf, ...) attribute is supported. */
#define JEMALLOC_HAVE_ATTR_FORMAT_GNU_PRINTF
/* Defined if format(printf, ...) attribute is supported. */
#define JEMALLOC_HAVE_ATTR_FORMAT_PRINTF

View File

@ -4,12 +4,13 @@
#include <limits.h>
#include <strings.h>
#define JEMALLOC_VERSION "5.1.0-56-g41b7372eadee941b9164751b8d4963f915d3ceae"
#define JEMALLOC_VERSION "5.2.1-0-gea6b3e973b477b8061e0076bb257dbd7f3faa756"
#define JEMALLOC_VERSION_MAJOR 5
#define JEMALLOC_VERSION_MINOR 1
#define JEMALLOC_VERSION_BUGFIX 0
#define JEMALLOC_VERSION_NREV 56
#define JEMALLOC_VERSION_GID "41b7372eadee941b9164751b8d4963f915d3ceae"
#define JEMALLOC_VERSION_MINOR 2
#define JEMALLOC_VERSION_BUGFIX 1
#define JEMALLOC_VERSION_NREV 0
#define JEMALLOC_VERSION_GID "ea6b3e973b477b8061e0076bb257dbd7f3faa756"
#define JEMALLOC_VERSION_GID_IDENT ea6b3e973b477b8061e0076bb257dbd7f3faa756
#define MALLOCX_LG_ALIGN(la) ((int)(la))
#if LG_SIZEOF_PTR == 2
@ -68,6 +69,7 @@
# define JEMALLOC_EXPORT __declspec(dllimport)
# endif
# endif
# define JEMALLOC_FORMAT_ARG(i)
# define JEMALLOC_FORMAT_PRINTF(s, i)
# define JEMALLOC_NOINLINE __declspec(noinline)
# ifdef __cplusplus
@ -95,6 +97,11 @@
# ifndef JEMALLOC_EXPORT
# define JEMALLOC_EXPORT JEMALLOC_ATTR(visibility("default"))
# endif
# ifdef JEMALLOC_HAVE_ATTR_FORMAT_ARG
# define JEMALLOC_FORMAT_ARG(i) JEMALLOC_ATTR(__format_arg__(3))
# else
# define JEMALLOC_FORMAT_ARG(i)
# endif
# ifdef JEMALLOC_HAVE_ATTR_FORMAT_GNU_PRINTF
# define JEMALLOC_FORMAT_PRINTF(s, i) JEMALLOC_ATTR(format(gnu_printf, s, i))
# elif defined(JEMALLOC_HAVE_ATTR_FORMAT_PRINTF)

View File

@ -17,6 +17,7 @@
# define je_malloc_stats_print malloc_stats_print
# define je_malloc_usable_size malloc_usable_size
# define je_mallocx mallocx
# define je_smallocx_ea6b3e973b477b8061e0076bb257dbd7f3faa756 smallocx_ea6b3e973b477b8061e0076bb257dbd7f3faa756
# define je_nallocx nallocx
# define je_posix_memalign posix_memalign
# define je_rallocx rallocx

View File

@ -65,13 +65,13 @@ typedef bool (extent_merge_t)(extent_hooks_t *, void *, size_t, void *, size_t,
bool, unsigned);
struct extent_hooks_s {
extent_alloc_t *alloc;
extent_dalloc_t *dalloc;
extent_destroy_t *destroy;
extent_commit_t *commit;
extent_decommit_t *decommit;
extent_purge_t *purge_lazy;
extent_purge_t *purge_forced;
extent_split_t *split;
extent_merge_t *merge;
extent_alloc_t *alloc;
extent_dalloc_t *dalloc;
extent_destroy_t *destroy;
extent_commit_t *commit;
extent_decommit_t *decommit;
extent_purge_t *purge_lazy;
extent_purge_t *purge_forced;
extent_split_t *split;
extent_merge_t *merge;
};

View File

@ -1,12 +1,6 @@
/* include/jemalloc/internal/jemalloc_internal_defs.h. Generated from jemalloc_internal_defs.h.in by configure. */
#ifndef JEMALLOC_INTERNAL_DEFS_H_
#define JEMALLOC_INTERNAL_DEFS_H_
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
/*
* If JEMALLOC_PREFIX is defined via --with-jemalloc-prefix, it will cause all
* public APIs to be prefixed. This makes it possible, with some care, to use
@ -25,7 +19,7 @@
#define JEMALLOC_OVERRIDE___LIBC_MEMALIGN
#define JEMALLOC_OVERRIDE___LIBC_REALLOC
#define JEMALLOC_OVERRIDE___LIBC_VALLOC
#define JEMALLOC_OVERRIDE___POSIX_MEMALIGN
/* #undef JEMALLOC_OVERRIDE___POSIX_MEMALIGN */
/*
* JEMALLOC_PRIVATE_NAMESPACE is used as a prefix for all library-private APIs.
@ -41,7 +35,7 @@
*/
#define CPU_SPINWAIT
/* 1 if CPU_SPINWAIT is defined, 0 otherwise. */
#define HAVE_CPU_SPINWAIT 0
#define HAVE_CPU_SPINWAIT 9
/*
* Number of significant bits in virtual addresses. This may be less than the
@ -55,25 +49,13 @@
/* Defined if GCC __atomic atomics are available. */
#define JEMALLOC_GCC_ATOMIC_ATOMICS 1
/* and the 8-bit variant support. */
#define JEMALLOC_GCC_U8_ATOMIC_ATOMICS 1
/* Defined if GCC __sync atomics are available. */
#define JEMALLOC_GCC_SYNC_ATOMICS 1
/*
* Defined if __sync_add_and_fetch(uint32_t *, uint32_t) and
* __sync_sub_and_fetch(uint32_t *, uint32_t) are available, despite
* __GCC_HAVE_SYNC_COMPARE_AND_SWAP_4 not being defined (which means the
* functions are defined in libgcc instead of being inlines).
*/
/* #undef JE_FORCE_SYNC_COMPARE_AND_SWAP_4 */
/*
* Defined if __sync_add_and_fetch(uint64_t *, uint64_t) and
* __sync_sub_and_fetch(uint64_t *, uint64_t) are available, despite
* __GCC_HAVE_SYNC_COMPARE_AND_SWAP_8 not being defined (which means the
* functions are defined in libgcc instead of being inlines).
*/
/* #undef JE_FORCE_SYNC_COMPARE_AND_SWAP_8 */
/* and the 8-bit variant support. */
#define JEMALLOC_GCC_U8_SYNC_ATOMICS 1
/*
* Defined if __builtin_clz() and __builtin_clzl() are available.
@ -85,19 +67,13 @@
*/
/* #undef JEMALLOC_OS_UNFAIR_LOCK */
/*
* Defined if OSSpin*() functions are available, as provided by Darwin, and
* documented in the spinlock(3) manual page.
*/
/* #undef JEMALLOC_OSSPIN */
/* Defined if syscall(2) is usable. */
#define JEMALLOC_USE_SYSCALL
/*
* Defined if secure_getenv(3) is available.
*/
#define JEMALLOC_HAVE_SECURE_GETENV
// #define JEMALLOC_HAVE_SECURE_GETENV
/*
* Defined if issetugid(2) is available.
@ -243,6 +219,12 @@
#define JEMALLOC_INTERNAL_FFSL __builtin_ffsl
#define JEMALLOC_INTERNAL_FFS __builtin_ffs
/*
* popcount*() functions to use for bitmapping.
*/
#define JEMALLOC_INTERNAL_POPCOUNTL __builtin_popcountl
#define JEMALLOC_INTERNAL_POPCOUNT __builtin_popcount
/*
* If defined, explicitly attempt to more uniformly distribute large allocation
* pointer alignments across all cache indices.
@ -297,7 +279,7 @@
* MADV_FREE, though typically with higher
* system overhead.
*/
// #define JEMALLOC_PURGE_MADVISE_FREE
#define JEMALLOC_PURGE_MADVISE_FREE
#define JEMALLOC_PURGE_MADVISE_DONTNEED
#define JEMALLOC_PURGE_MADVISE_DONTNEED_ZEROS
@ -379,4 +361,7 @@
*/
#define JEMALLOC_STRERROR_R_RETURNS_CHAR_WITH_GNU_SOURCE
/* Performs additional safety checks when defined. */
/* #undef JEMALLOC_OPT_SAFETY_CHECKS */
#endif /* JEMALLOC_INTERNAL_DEFS_H_ */

View File

@ -21,7 +21,7 @@
# include "jemalloc/jemalloc.h"
#endif
#if (defined(JEMALLOC_OSATOMIC) || defined(JEMALLOC_OSSPIN))
#if defined(JEMALLOC_OSATOMIC)
#include <libkern/OSAtomic.h>
#endif
@ -161,7 +161,26 @@ static const bool config_log =
false
#endif
;
#ifdef JEMALLOC_HAVE_SCHED_GETCPU
/*
* Are extra safety checks enabled; things like checking the size of sized
* deallocations, double-frees, etc.
*/
static const bool config_opt_safety_checks =
#ifdef JEMALLOC_OPT_SAFETY_CHECKS
true
#elif defined(JEMALLOC_DEBUG)
/*
* This lets us only guard safety checks by one flag instead of two; fast
* checks can guard solely by config_opt_safety_checks and run in debug mode
* too.
*/
true
#else
false
#endif
;
#if defined(_WIN32) || defined(JEMALLOC_HAVE_SCHED_GETCPU)
/* Currently percpu_arena depends on sched_getcpu. */
#define JEMALLOC_PERCPU_ARENA
#endif

View File

@ -1,123 +0,0 @@
#include <stdlib.h>
#include <stdbool.h>
#include <stdint.h>
#include <limits.h>
#include <strings.h>
#define JEMALLOC_VERSION "5.1.0-97-gcd2931ad9bbd78208565716ab102e86d858c2fff"
#define JEMALLOC_VERSION_MAJOR 5
#define JEMALLOC_VERSION_MINOR 1
#define JEMALLOC_VERSION_BUGFIX 0
#define JEMALLOC_VERSION_NREV 97
#define JEMALLOC_VERSION_GID "cd2931ad9bbd78208565716ab102e86d858c2fff"
#define JEMALLOC_VERSION_GID_IDENT cd2931ad9bbd78208565716ab102e86d858c2fff
#define MALLOCX_LG_ALIGN(la) ((int)(la))
#if LG_SIZEOF_PTR == 2
# define MALLOCX_ALIGN(a) ((int)(ffs((int)(a))-1))
#else
# define MALLOCX_ALIGN(a) \
((int)(((size_t)(a) < (size_t)INT_MAX) ? ffs((int)(a))-1 : \
ffs((int)(((size_t)(a))>>32))+31))
#endif
#define MALLOCX_ZERO ((int)0x40)
/*
* Bias tcache index bits so that 0 encodes "automatic tcache management", and 1
* encodes MALLOCX_TCACHE_NONE.
*/
#define MALLOCX_TCACHE(tc) ((int)(((tc)+2) << 8))
#define MALLOCX_TCACHE_NONE MALLOCX_TCACHE(-1)
/*
* Bias arena index bits so that 0 encodes "use an automatically chosen arena".
*/
#define MALLOCX_ARENA(a) ((((int)(a))+1) << 20)
/*
* Use as arena index in "arena.<i>.{purge,decay,dss}" and
* "stats.arenas.<i>.*" mallctl interfaces to select all arenas. This
* definition is intentionally specified in raw decimal format to support
* cpp-based string concatenation, e.g.
*
* #define STRINGIFY_HELPER(x) #x
* #define STRINGIFY(x) STRINGIFY_HELPER(x)
*
* mallctl("arena." STRINGIFY(MALLCTL_ARENAS_ALL) ".purge", NULL, NULL, NULL,
* 0);
*/
#define MALLCTL_ARENAS_ALL 4096
/*
* Use as arena index in "stats.arenas.<i>.*" mallctl interfaces to select
* destroyed arenas.
*/
#define MALLCTL_ARENAS_DESTROYED 4097
#if defined(__cplusplus) && defined(JEMALLOC_USE_CXX_THROW)
# define JEMALLOC_CXX_THROW throw()
#else
# define JEMALLOC_CXX_THROW
#endif
#if defined(_MSC_VER)
# define JEMALLOC_ATTR(s)
# define JEMALLOC_ALIGNED(s) __declspec(align(s))
# define JEMALLOC_ALLOC_SIZE(s)
# define JEMALLOC_ALLOC_SIZE2(s1, s2)
# ifndef JEMALLOC_EXPORT
# ifdef DLLEXPORT
# define JEMALLOC_EXPORT __declspec(dllexport)
# else
# define JEMALLOC_EXPORT __declspec(dllimport)
# endif
# endif
# define JEMALLOC_FORMAT_PRINTF(s, i)
# define JEMALLOC_NOINLINE __declspec(noinline)
# ifdef __cplusplus
# define JEMALLOC_NOTHROW __declspec(nothrow)
# else
# define JEMALLOC_NOTHROW
# endif
# define JEMALLOC_SECTION(s) __declspec(allocate(s))
# define JEMALLOC_RESTRICT_RETURN __declspec(restrict)
# if _MSC_VER >= 1900 && !defined(__EDG__)
# define JEMALLOC_ALLOCATOR __declspec(allocator)
# else
# define JEMALLOC_ALLOCATOR
# endif
#elif defined(JEMALLOC_HAVE_ATTR)
# define JEMALLOC_ATTR(s) __attribute__((s))
# define JEMALLOC_ALIGNED(s) JEMALLOC_ATTR(aligned(s))
# ifdef JEMALLOC_HAVE_ATTR_ALLOC_SIZE
# define JEMALLOC_ALLOC_SIZE(s) JEMALLOC_ATTR(alloc_size(s))
# define JEMALLOC_ALLOC_SIZE2(s1, s2) JEMALLOC_ATTR(alloc_size(s1, s2))
# else
# define JEMALLOC_ALLOC_SIZE(s)
# define JEMALLOC_ALLOC_SIZE2(s1, s2)
# endif
# ifndef JEMALLOC_EXPORT
# define JEMALLOC_EXPORT JEMALLOC_ATTR(visibility("default"))
# endif
# ifdef JEMALLOC_HAVE_ATTR_FORMAT_GNU_PRINTF
# define JEMALLOC_FORMAT_PRINTF(s, i) JEMALLOC_ATTR(format(gnu_printf, s, i))
# elif defined(JEMALLOC_HAVE_ATTR_FORMAT_PRINTF)
# define JEMALLOC_FORMAT_PRINTF(s, i) JEMALLOC_ATTR(format(printf, s, i))
# else
# define JEMALLOC_FORMAT_PRINTF(s, i)
# endif
# define JEMALLOC_NOINLINE JEMALLOC_ATTR(noinline)
# define JEMALLOC_NOTHROW JEMALLOC_ATTR(nothrow)
# define JEMALLOC_SECTION(s) JEMALLOC_ATTR(section(s))
# define JEMALLOC_RESTRICT_RETURN
# define JEMALLOC_ALLOCATOR
#else
# define JEMALLOC_ATTR(s)
# define JEMALLOC_ALIGNED(s)
# define JEMALLOC_ALLOC_SIZE(s)
# define JEMALLOC_ALLOC_SIZE2(s1, s2)
# define JEMALLOC_EXPORT
# define JEMALLOC_FORMAT_PRINTF(s, i)
# define JEMALLOC_NOINLINE
# define JEMALLOC_NOTHROW
# define JEMALLOC_SECTION(s)
# define JEMALLOC_RESTRICT_RETURN
# define JEMALLOC_ALLOCATOR
#endif

View File

@ -1,77 +0,0 @@
typedef struct extent_hooks_s extent_hooks_t;
/*
* void *
* extent_alloc(extent_hooks_t *extent_hooks, void *new_addr, size_t size,
* size_t alignment, bool *zero, bool *commit, unsigned arena_ind);
*/
typedef void *(extent_alloc_t)(extent_hooks_t *, void *, size_t, size_t, bool *,
bool *, unsigned);
/*
* bool
* extent_dalloc(extent_hooks_t *extent_hooks, void *addr, size_t size,
* bool committed, unsigned arena_ind);
*/
typedef bool (extent_dalloc_t)(extent_hooks_t *, void *, size_t, bool,
unsigned);
/*
* void
* extent_destroy(extent_hooks_t *extent_hooks, void *addr, size_t size,
* bool committed, unsigned arena_ind);
*/
typedef void (extent_destroy_t)(extent_hooks_t *, void *, size_t, bool,
unsigned);
/*
* bool
* extent_commit(extent_hooks_t *extent_hooks, void *addr, size_t size,
* size_t offset, size_t length, unsigned arena_ind);
*/
typedef bool (extent_commit_t)(extent_hooks_t *, void *, size_t, size_t, size_t,
unsigned);
/*
* bool
* extent_decommit(extent_hooks_t *extent_hooks, void *addr, size_t size,
* size_t offset, size_t length, unsigned arena_ind);
*/
typedef bool (extent_decommit_t)(extent_hooks_t *, void *, size_t, size_t,
size_t, unsigned);
/*
* bool
* extent_purge(extent_hooks_t *extent_hooks, void *addr, size_t size,
* size_t offset, size_t length, unsigned arena_ind);
*/
typedef bool (extent_purge_t)(extent_hooks_t *, void *, size_t, size_t, size_t,
unsigned);
/*
* bool
* extent_split(extent_hooks_t *extent_hooks, void *addr, size_t size,
* size_t size_a, size_t size_b, bool committed, unsigned arena_ind);
*/
typedef bool (extent_split_t)(extent_hooks_t *, void *, size_t, size_t, size_t,
bool, unsigned);
/*
* bool
* extent_merge(extent_hooks_t *extent_hooks, void *addr_a, size_t size_a,
* void *addr_b, size_t size_b, bool committed, unsigned arena_ind);
*/
typedef bool (extent_merge_t)(extent_hooks_t *, void *, size_t, void *, size_t,
bool, unsigned);
struct extent_hooks_s {
extent_alloc_t *alloc;
extent_dalloc_t *dalloc;
extent_destroy_t *destroy;
extent_commit_t *commit;
extent_decommit_t *decommit;
extent_purge_t *purge_lazy;
extent_purge_t *purge_forced;
extent_split_t *split;
extent_merge_t *merge;
};

View File

@ -1,11 +1,6 @@
/* include/jemalloc/internal/jemalloc_internal_defs.h. Generated from jemalloc_internal_defs.h.in by configure. */
#ifndef JEMALLOC_INTERNAL_DEFS_H_
#define JEMALLOC_INTERNAL_DEFS_H_
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
/*
* If JEMALLOC_PREFIX is defined via --with-jemalloc-prefix, it will cause all
* public APIs to be prefixed. This makes it possible, with some care, to use
@ -24,7 +19,7 @@
#define JEMALLOC_OVERRIDE___LIBC_MEMALIGN
#define JEMALLOC_OVERRIDE___LIBC_REALLOC
#define JEMALLOC_OVERRIDE___LIBC_VALLOC
#define JEMALLOC_OVERRIDE___POSIX_MEMALIGN
/* #undef JEMALLOC_OVERRIDE___POSIX_MEMALIGN */
/*
* JEMALLOC_PRIVATE_NAMESPACE is used as a prefix for all library-private APIs.
@ -54,25 +49,13 @@
/* Defined if GCC __atomic atomics are available. */
#define JEMALLOC_GCC_ATOMIC_ATOMICS 1
/* and the 8-bit variant support. */
#define JEMALLOC_GCC_U8_ATOMIC_ATOMICS 1
/* Defined if GCC __sync atomics are available. */
#define JEMALLOC_GCC_SYNC_ATOMICS 1
/*
* Defined if __sync_add_and_fetch(uint32_t *, uint32_t) and
* __sync_sub_and_fetch(uint32_t *, uint32_t) are available, despite
* __GCC_HAVE_SYNC_COMPARE_AND_SWAP_4 not being defined (which means the
* functions are defined in libgcc instead of being inlines).
*/
/* #undef JE_FORCE_SYNC_COMPARE_AND_SWAP_4 */
/*
* Defined if __sync_add_and_fetch(uint64_t *, uint64_t) and
* __sync_sub_and_fetch(uint64_t *, uint64_t) are available, despite
* __GCC_HAVE_SYNC_COMPARE_AND_SWAP_8 not being defined (which means the
* functions are defined in libgcc instead of being inlines).
*/
/* #undef JE_FORCE_SYNC_COMPARE_AND_SWAP_8 */
/* and the 8-bit variant support. */
#define JEMALLOC_GCC_U8_SYNC_ATOMICS 1
/*
* Defined if __builtin_clz() and __builtin_clzl() are available.
@ -84,20 +67,13 @@
*/
/* #undef JEMALLOC_OS_UNFAIR_LOCK */
/*
* Defined if OSSpin*() functions are available, as provided by Darwin, and
* documented in the spinlock(3) manual page.
*/
/* #undef JEMALLOC_OSSPIN */
/* Defined if syscall(2) is usable. */
#define JEMALLOC_USE_SYSCALL
/*
* Defined if secure_getenv(3) is available.
*/
// Don't want dependency on newer GLIBC
//#define JEMALLOC_HAVE_SECURE_GETENV
// #define JEMALLOC_HAVE_SECURE_GETENV
/*
* Defined if issetugid(2) is available.
@ -160,6 +136,9 @@
/* JEMALLOC_STATS enables statistics calculation. */
#define JEMALLOC_STATS
/* JEMALLOC_EXPERIMENTAL_SMALLOCX_API enables experimental smallocx API. */
/* #undef JEMALLOC_EXPERIMENTAL_SMALLOCX_API */
/* JEMALLOC_PROF enables allocation profiling. */
/* #undef JEMALLOC_PROF */
@ -240,6 +219,12 @@
#define JEMALLOC_INTERNAL_FFSL __builtin_ffsl
#define JEMALLOC_INTERNAL_FFS __builtin_ffs
/*
* popcount*() functions to use for bitmapping.
*/
#define JEMALLOC_INTERNAL_POPCOUNTL __builtin_popcountl
#define JEMALLOC_INTERNAL_POPCOUNT __builtin_popcount
/*
* If defined, explicitly attempt to more uniformly distribute large allocation
* pointer alignments across all cache indices.
@ -252,6 +237,12 @@
*/
/* #undef JEMALLOC_LOG */
/*
* If defined, use readlinkat() (instead of readlink()) to follow
* /etc/malloc_conf.
*/
/* #undef JEMALLOC_READLINKAT */
/*
* Darwin (OS X) uses zones to work around Mach-O symbol override shortcomings.
*/
@ -288,7 +279,7 @@
* MADV_FREE, though typically with higher
* system overhead.
*/
//#define JEMALLOC_PURGE_MADVISE_FREE
#define JEMALLOC_PURGE_MADVISE_FREE
#define JEMALLOC_PURGE_MADVISE_DONTNEED
#define JEMALLOC_PURGE_MADVISE_DONTNEED_ZEROS
@ -370,4 +361,7 @@
*/
#define JEMALLOC_STRERROR_R_RETURNS_CHAR_WITH_GNU_SOURCE
/* Performs additional safety checks when defined. */
/* #undef JEMALLOC_OPT_SAFETY_CHECKS */
#endif /* JEMALLOC_INTERNAL_DEFS_H_ */

View File

@ -21,7 +21,7 @@
# include "jemalloc/jemalloc.h"
#endif
#if (defined(JEMALLOC_OSATOMIC) || defined(JEMALLOC_OSSPIN))
#if defined(JEMALLOC_OSATOMIC)
#include <libkern/OSAtomic.h>
#endif
@ -161,7 +161,26 @@ static const bool config_log =
false
#endif
;
#ifdef JEMALLOC_HAVE_SCHED_GETCPU
/*
* Are extra safety checks enabled; things like checking the size of sized
* deallocations, double-frees, etc.
*/
static const bool config_opt_safety_checks =
#ifdef JEMALLOC_OPT_SAFETY_CHECKS
true
#elif defined(JEMALLOC_DEBUG)
/*
* This lets us only guard safety checks by one flag instead of two; fast
* checks can guard solely by config_opt_safety_checks and run in debug mode
* too.
*/
true
#else
false
#endif
;
#if defined(_WIN32) || defined(JEMALLOC_HAVE_SCHED_GETCPU)
/* Currently percpu_arena depends on sched_getcpu. */
#define JEMALLOC_PERCPU_ARENA
#endif

View File

@ -1,43 +0,0 @@
/* include/jemalloc/jemalloc_defs.h. Generated from jemalloc_defs.h.in by configure. */
/* Defined if __attribute__((...)) syntax is supported. */
#define JEMALLOC_HAVE_ATTR
/* Defined if alloc_size attribute is supported. */
#define JEMALLOC_HAVE_ATTR_ALLOC_SIZE
/* Defined if format(printf, ...) attribute is supported. */
#define JEMALLOC_HAVE_ATTR_FORMAT_PRINTF
/*
* Define overrides for non-standard allocator-related functions if they are
* present on the system.
*/
#define JEMALLOC_OVERRIDE_MEMALIGN
#define JEMALLOC_OVERRIDE_VALLOC
/*
* At least Linux omits the "const" in:
*
* size_t malloc_usable_size(const void *ptr);
*
* Match the operating system's prototype.
*/
#define JEMALLOC_USABLE_SIZE_CONST
/*
* If defined, specify throw() for the public function prototypes when compiling
* with C++. The only justification for this is to match the prototypes that
* glibc defines.
*/
#define JEMALLOC_USE_CXX_THROW
#ifdef _MSC_VER
# ifdef _WIN64
# define LG_SIZEOF_PTR_WIN 3
# else
# define LG_SIZEOF_PTR_WIN 2
# endif
#endif
/* sizeof(void *) == 2^LG_SIZEOF_PTR. */
#define LG_SIZEOF_PTR 3

View File

@ -1,66 +0,0 @@
/*
* The je_ prefix on the following public symbol declarations is an artifact
* of namespace management, and should be omitted in application code unless
* JEMALLOC_NO_DEMANGLE is defined (see jemalloc_mangle.h).
*/
extern JEMALLOC_EXPORT const char *je_malloc_conf;
extern JEMALLOC_EXPORT void (*je_malloc_message)(void *cbopaque,
const char *s);
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_malloc(size_t size)
JEMALLOC_CXX_THROW JEMALLOC_ATTR(malloc) JEMALLOC_ALLOC_SIZE(1);
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_calloc(size_t num, size_t size)
JEMALLOC_CXX_THROW JEMALLOC_ATTR(malloc) JEMALLOC_ALLOC_SIZE2(1, 2);
JEMALLOC_EXPORT int JEMALLOC_NOTHROW je_posix_memalign(void **memptr,
size_t alignment, size_t size) JEMALLOC_CXX_THROW JEMALLOC_ATTR(nonnull(1));
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_aligned_alloc(size_t alignment,
size_t size) JEMALLOC_CXX_THROW JEMALLOC_ATTR(malloc)
JEMALLOC_ALLOC_SIZE(2);
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_realloc(void *ptr, size_t size)
JEMALLOC_CXX_THROW JEMALLOC_ALLOC_SIZE(2);
JEMALLOC_EXPORT void JEMALLOC_NOTHROW je_free(void *ptr)
JEMALLOC_CXX_THROW;
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_mallocx(size_t size, int flags)
JEMALLOC_ATTR(malloc) JEMALLOC_ALLOC_SIZE(1);
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_rallocx(void *ptr, size_t size,
int flags) JEMALLOC_ALLOC_SIZE(2);
JEMALLOC_EXPORT size_t JEMALLOC_NOTHROW je_xallocx(void *ptr, size_t size,
size_t extra, int flags);
JEMALLOC_EXPORT size_t JEMALLOC_NOTHROW je_sallocx(const void *ptr,
int flags) JEMALLOC_ATTR(pure);
JEMALLOC_EXPORT void JEMALLOC_NOTHROW je_dallocx(void *ptr, int flags);
JEMALLOC_EXPORT void JEMALLOC_NOTHROW je_sdallocx(void *ptr, size_t size,
int flags);
JEMALLOC_EXPORT size_t JEMALLOC_NOTHROW je_nallocx(size_t size, int flags)
JEMALLOC_ATTR(pure);
JEMALLOC_EXPORT int JEMALLOC_NOTHROW je_mallctl(const char *name,
void *oldp, size_t *oldlenp, void *newp, size_t newlen);
JEMALLOC_EXPORT int JEMALLOC_NOTHROW je_mallctlnametomib(const char *name,
size_t *mibp, size_t *miblenp);
JEMALLOC_EXPORT int JEMALLOC_NOTHROW je_mallctlbymib(const size_t *mib,
size_t miblen, void *oldp, size_t *oldlenp, void *newp, size_t newlen);
JEMALLOC_EXPORT void JEMALLOC_NOTHROW je_malloc_stats_print(
void (*write_cb)(void *, const char *), void *je_cbopaque,
const char *opts);
JEMALLOC_EXPORT size_t JEMALLOC_NOTHROW je_malloc_usable_size(
JEMALLOC_USABLE_SIZE_CONST void *ptr) JEMALLOC_CXX_THROW;
#ifdef JEMALLOC_OVERRIDE_MEMALIGN
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_memalign(size_t alignment, size_t size)
JEMALLOC_CXX_THROW JEMALLOC_ATTR(malloc);
#endif
#ifdef JEMALLOC_OVERRIDE_VALLOC
JEMALLOC_EXPORT JEMALLOC_ALLOCATOR JEMALLOC_RESTRICT_RETURN
void JEMALLOC_NOTHROW *je_valloc(size_t size) JEMALLOC_CXX_THROW
JEMALLOC_ATTR(malloc);
#endif

View File

@ -21,7 +21,7 @@ RUN apt-get update \
locales \
ca-certificates \
wget \
tzata \
tzdata \
&& rm -rf \
/var/lib/apt/lists/* \
/var/cache/debconf \

View File

@ -317,7 +317,7 @@ function report
rm -r report ||:
mkdir report report/tmp ||:
rm ./*.{rep,svg} test-times.tsv test-dump.tsv unstable.tsv unstable-query-ids.tsv unstable-query-metrics.tsv changed-perf.tsv unstable-tests.tsv unstable-queries.tsv bad-tests.tsv slow-on-client.tsv all-queries.tsv ||:
rm ./*.{rep,svg} test-times.tsv test-dump.tsv unstable.tsv unstable-query-ids.tsv unstable-query-metrics.tsv changed-perf.tsv unstable-tests.tsv unstable-queries.tsv bad-tests.tsv slow-on-client.tsv all-queries.tsv run-errors.tsv ||:
build_log_column_definitions
@ -434,7 +434,7 @@ create table wall_clock engine Memory as select *
from file('wall-clock-times.tsv', TSV, 'test text, real float, user float, system float');
create table slow_on_client_tsv engine File(TSV, 'report/slow-on-client.tsv') as
select client, server, floor(client/server, 3) p, query_display_name
select client, server, floor(client/server, 3) p, test, query_display_name
from query_time left join query_display_names using (test, query_index)
where p > 1.02 order by p desc;

View File

@ -189,7 +189,7 @@ if args.report == 'main':
slow_on_client_rows = tsvRows('report/slow-on-client.tsv')
error_tests += len(slow_on_client_rows)
printSimpleTable('Slow on client',
['Client time, s', 'Server time, s', 'Ratio', 'Query'],
['Client time, s', 'Server time, s', 'Ratio', 'Test', 'Query'],
slow_on_client_rows)
def print_changes():

View File

@ -37,6 +37,8 @@ The supported formats are:
| [Avro](#data-format-avro) | ✔ | ✔ |
| [AvroConfluent](#data-format-avro-confluent) | ✔ | ✗ |
| [Parquet](#data-format-parquet) | ✔ | ✔ |
| [Arrow](#data-format-arrow) | ✔ | ✔ |
| [ArrowStream](#data-format-arrow-stream) | ✔ | ✔ |
| [ORC](#data-format-orc) | ✔ | ✗ |
| [RowBinary](#rowbinary) | ✔ | ✔ |
| [RowBinaryWithNamesAndTypes](#rowbinarywithnamesandtypes) | ✔ | ✔ |
@ -985,9 +987,9 @@ See also [how to read/write length-delimited protobuf messages in popular langua
## Avro {#data-format-avro}
[Apache Avro](http://avro.apache.org/) is a row-oriented data serialization framework developed within Apaches Hadoop project.
[Apache Avro](https://avro.apache.org/) is a row-oriented data serialization framework developed within Apaches Hadoop project.
ClickHouse Avro format supports reading and writing [Avro data files](http://avro.apache.org/docs/current/spec.html#Object+Container+Files).
ClickHouse Avro format supports reading and writing [Avro data files](https://avro.apache.org/docs/current/spec.html#Object+Container+Files).
### Data Types Matching {#data_types-matching}
@ -1009,7 +1011,7 @@ The table below shows supported data types and how they match ClickHouse [data t
| `long (timestamp-millis)` \* | [DateTime64(3)](../sql-reference/data-types/datetime.md) | `long (timestamp-millis)` \* |
| `long (timestamp-micros)` \* | [DateTime64(6)](../sql-reference/data-types/datetime.md) | `long (timestamp-micros)` \* |
\* [Avro logical types](http://avro.apache.org/docs/current/spec.html#Logical+Types)
\* [Avro logical types](https://avro.apache.org/docs/current/spec.html#Logical+Types)
Unsupported Avro data types: `record` (non-root), `map`
@ -1095,7 +1097,7 @@ SELECT * FROM topic1_stream;
## Parquet {#data-format-parquet}
[Apache Parquet](http://parquet.apache.org/) is a columnar storage format widespread in the Hadoop ecosystem. ClickHouse supports read and write operations for this format.
[Apache Parquet](https://parquet.apache.org/) is a columnar storage format widespread in the Hadoop ecosystem. ClickHouse supports read and write operations for this format.
### Data Types Matching {#data_types-matching-2}
@ -1141,6 +1143,16 @@ $ clickhouse-client --query="SELECT * FROM {some_table} FORMAT Parquet" > {some_
To exchange data with Hadoop, you can use [HDFS table engine](../engines/table-engines/integrations/hdfs.md).
## Arrow {#data-format-arrow}
[Apache Arrow](https://arrow.apache.org/) comes with two built-in columnar storage formats. ClickHouse supports read and write operations for these formats.
`Arrow` is Apache Arrow's "file mode" format. It is designed for in-memory random access.
## ArrowStream {#data-format-arrow-stream}
`ArrowStream` is Apache Arrow's "stream mode" format. It is designed for in-memory stream processing.
## ORC {#data-format-orc}
[Apache ORC](https://orc.apache.org/) is a columnar storage format widespread in the Hadoop ecosystem. You can only insert data in this format to ClickHouse.

View File

@ -41,6 +41,7 @@ toc_title: Adopters
| [Integros](https://integros.com){.favicon} | Platform for video services | Analytics | — | — | [Slides in Russian, May 2019](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup22/strategies.pdf) |
| [Kodiak Data](https://www.kodiakdata.com/){.favicon} | Clouds | Main product | — | — | [Slides in Engish, April 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup13/kodiak_data.pdf) |
| [Kontur](https://kontur.ru){.favicon} | Software Development | Metrics | — | — | [Talk in Russian, November 2018](https://www.youtube.com/watch?v=U4u4Bd0FtrY) |
| [Lawrence Berkeley National Laboratory](https://www.lbl.gov){.favicon} | Research | Traffic analysis | 1 server | 11.8 TiB | [Slides in English, April 2019](https://www.smitasin.com/presentations/2019-04-17_DOE-NSM.pdf) |
| [LifeStreet](https://lifestreet.com/){.favicon} | Ad network | Main product | 75 servers (3 replicas) | 5.27 PiB | [Blog post in Russian, February 2017](https://habr.com/en/post/322620/) |
| [Mail.ru Cloud Solutions](https://mcs.mail.ru/){.favicon} | Cloud services | Main product | — | — | [Article in Russian](https://mcs.mail.ru/help/db-create/clickhouse#) |
| [MessageBird](https://www.messagebird.com){.favicon} | Telecommunications | Statistics | — | — | [Slides in English, November 2018](https://github.com/ClickHouse/clickhouse-presentations/blob/master/meetup20/messagebird.pdf) |

View File

@ -404,6 +404,35 @@ Possible values:
Default value: 0.
## partial_merge_join_optimizations {#partial_merge_join_optimizations}
Disables optimizations in partial merge join algorithm for [JOIN](../../sql-reference/statements/select/join.md) queries.
By default, this setting enables improvements that could lead to wrong results. If you see suspicious results in your queries, disable optimizations by this setting. Optimizations can be different in different versions of the ClickHouse server.
Possible values:
- 0 — Optimizations disabled.
- 1 — Optimizations enabled.
Default value: 1.
## partial_merge_join_rows_in_right_blocks {#partial_merge_join_rows_in_right_blocks}
Limits sizes of right-hand join data blocks in partial merge join algorithm for [JOIN](../../sql-reference/statements/select/join.md) queries.
ClickHouse server:
1. Splits right-hand join data into blocks with up to the specified number of rows.
2. Indexes each block with their minimum and maximum values
3. Unloads prepared blocks to disk if possible.
Possible values:
- Any positive integer. Recommended range of values: [1000, 100000].
Default value: 65536.
## any_join_distinct_right_table_keys {#any_join_distinct_right_table_keys}
Enables legacy ClickHouse server behavior in `ANY INNER|LEFT JOIN` operations.

View File

@ -117,6 +117,10 @@ Returns the part of the domain that includes top-level subdomains up to the “f
For example, `cutToFirstSignificantSubdomain('https://news.yandex.com.tr/') = 'yandex.com.tr'`.
### port(URL[, default_port = 0]) {#port}
Returns the port or `default_port` if there is no port in the URL (or in case of validation error).
### path {#path}
Returns the path. Example: `/top/news.html` The path does not include the query string.

View File

@ -21,7 +21,7 @@ mkdocs-htmlproofer-plugin==0.0.3
mkdocs-macros-plugin==0.4.9
nltk==3.5
nose==1.3.7
protobuf==3.12.1
protobuf==3.12.2
numpy==1.18.4
Pygments==2.5.2
pymdown-extensions==7.1

View File

@ -6,14 +6,9 @@ set(CLICKHOUSE_CLIENT_SOURCES
set(CLICKHOUSE_CLIENT_LINK PRIVATE clickhouse_common_config clickhouse_functions clickhouse_aggregate_functions clickhouse_common_io clickhouse_parsers string_utils ${Boost_PROGRAM_OPTIONS_LIBRARY})
include(CheckSymbolExists)
check_symbol_exists(readpassphrase readpassphrase.h HAVE_READPASSPHRASE)
configure_file(config_client.h.in ${ConfigIncludePath}/config_client.h)
if(NOT HAVE_READPASSPHRASE)
add_subdirectory(readpassphrase)
list(APPEND CLICKHOUSE_CLIENT_LINK PRIVATE readpassphrase)
endif()
# Always use internal readpassphrase
add_subdirectory(readpassphrase)
list(APPEND CLICKHOUSE_CLIENT_LINK PRIVATE readpassphrase)
clickhouse_program_add(client)

View File

@ -39,7 +39,6 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/typeid_cast.h>
#include <Common/Config/ConfigProcessor.h>
#include <Common/config_version.h>
#include <Core/Types.h>
#include <Core/QueryProcessingStage.h>
#include <Core/ExternalTable.h>
@ -77,6 +76,10 @@
#include <common/argsToConfig.h>
#include <Common/TerminalSize.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config_version.h>
#endif
#ifndef __clang__
#pragma GCC optimize("-fno-var-tracking-assignments")
#endif

View File

@ -9,7 +9,7 @@
#include <Common/Exception.h>
#include <common/setTerminalEcho.h>
#include <ext/scope_guard.h>
#include <readpassphrase.h>
#include "readpassphrase/readpassphrase.h"
namespace DB
{

View File

@ -1,3 +0,0 @@
#pragma once
#cmakedefine HAVE_READPASSPHRASE

View File

@ -1,13 +1,7 @@
# wget https://raw.githubusercontent.com/openssh/openssh-portable/master/openbsd-compat/readpassphrase.c
# wget https://raw.githubusercontent.com/openssh/openssh-portable/master/openbsd-compat/readpassphrase.h
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -Wno-unused-result -Wno-reserved-id-macro")
add_library(readpassphrase readpassphrase.c)
configure_file(includes.h.in ${CMAKE_CURRENT_BINARY_DIR}/include/includes.h)
add_library(readpassphrase ${CMAKE_CURRENT_SOURCE_DIR}/readpassphrase.c)
set_target_properties(readpassphrase
PROPERTIES LINKER_LANGUAGE C
)
# . to allow #include <readpassphrase.h>
target_include_directories(readpassphrase PUBLIC . ${CMAKE_CURRENT_BINARY_DIR}/include)
set_target_properties(readpassphrase PROPERTIES LINKER_LANGUAGE C)
target_compile_options(readpassphrase PRIVATE -Wno-unused-result -Wno-reserved-id-macro)

View File

@ -1,6 +1,6 @@
#pragma once
#cmakedefine HAVE_READPASSPHRASE
/* #undef HAVE_READPASSPHRASE */
#if !defined(HAVE_READPASSPHRASE)
# ifndef _PATH_TTY

View File

@ -25,13 +25,11 @@
#include "includes.h"
#ifndef HAVE_READPASSPHRASE
#include <termios.h>
#include <signal.h>
#include <ctype.h>
#include <fcntl.h>
#include <readpassphrase.h>
#include "readpassphrase.h"
#include <errno.h>
#include <string.h>
#include <unistd.h>
@ -193,19 +191,7 @@ restart:
}
//DEF_WEAK(readpassphrase);
#if 0
char *
getpass(const char *prompt)
{
static char buf[_PASSWORD_LEN + 1];
return(readpassphrase(prompt, buf, sizeof(buf), RPP_ECHO_OFF));
}
#endif
static void handler(int s)
{
signo[s] = 1;
}
#endif /* HAVE_READPASSPHRASE */

View File

@ -23,39 +23,22 @@
/* OPENBSD ORIGINAL: include/readpassphrase.h */
#pragma once
// #ifndef _READPASSPHRASE_H_
// #define _READPASSPHRASE_H_
//#include "includes.h"
#include "config_client.h"
// Should not be included on BSD systems, but if it happen...
#ifdef HAVE_READPASSPHRASE
# include_next <readpassphrase.h>
#if defined(__cplusplus)
extern "C" {
#endif
#ifndef HAVE_READPASSPHRASE
# ifdef __cplusplus
extern "C" {
# endif
# define RPP_ECHO_OFF 0x00 /* Turn off echo (default). */
# define RPP_ECHO_ON 0x01 /* Leave echo on. */
# define RPP_REQUIRE_TTY 0x02 /* Fail if there is no tty. */
# define RPP_FORCELOWER 0x04 /* Force input to lower case. */
# define RPP_FORCEUPPER 0x08 /* Force input to upper case. */
# define RPP_SEVENBIT 0x10 /* Strip the high bit from input. */
# define RPP_STDIN 0x20 /* Read from stdin, not /dev/tty */
#define RPP_ECHO_OFF 0x00 /* Turn off echo (default). */
#define RPP_ECHO_ON 0x01 /* Leave echo on. */
#define RPP_REQUIRE_TTY 0x02 /* Fail if there is no tty. */
#define RPP_FORCELOWER 0x04 /* Force input to lower case. */
#define RPP_FORCEUPPER 0x08 /* Force input to upper case. */
#define RPP_SEVENBIT 0x10 /* Strip the high bit from input. */
#define RPP_STDIN 0x20 /* Read from stdin, not /dev/tty */
char * readpassphrase(const char *, char *, size_t, int);
# ifdef __cplusplus
#if defined(__cplusplus)
}
# endif
#endif /* HAVE_READPASSPHRASE */
// #endif /* !_READPASSPHRASE_H_ */
#endif

View File

@ -0,0 +1,7 @@
LIBRARY()
SRCS(
readpassphrase.c
)
END()

View File

@ -8,11 +8,8 @@
#include <string>
#include <utility> /// pair
#if __has_include("config_tools.h")
#include "config_tools.h"
#endif
#if __has_include("config_core.h")
#include "config_core.h"
#if !defined(ARCADIA_BUILD)
# include "config_tools.h"
#endif
#include <Common/StringUtils/StringUtils.h>
@ -22,31 +19,31 @@
/// Universal executable for various clickhouse applications
#if ENABLE_CLICKHOUSE_SERVER || !defined(ENABLE_CLICKHOUSE_SERVER)
#if ENABLE_CLICKHOUSE_SERVER
int mainEntryClickHouseServer(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_CLIENT || !defined(ENABLE_CLICKHOUSE_CLIENT)
#if ENABLE_CLICKHOUSE_CLIENT
int mainEntryClickHouseClient(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_LOCAL || !defined(ENABLE_CLICKHOUSE_LOCAL)
#if ENABLE_CLICKHOUSE_LOCAL
int mainEntryClickHouseLocal(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_BENCHMARK || !defined(ENABLE_CLICKHOUSE_BENCHMARK)
#if ENABLE_CLICKHOUSE_BENCHMARK
int mainEntryClickHouseBenchmark(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_EXTRACT_FROM_CONFIG || !defined(ENABLE_CLICKHOUSE_EXTRACT_FROM_CONFIG)
#if ENABLE_CLICKHOUSE_EXTRACT_FROM_CONFIG
int mainEntryClickHouseExtractFromConfig(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_COMPRESSOR || !defined(ENABLE_CLICKHOUSE_COMPRESSOR)
#if ENABLE_CLICKHOUSE_COMPRESSOR
int mainEntryClickHouseCompressor(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_FORMAT || !defined(ENABLE_CLICKHOUSE_FORMAT)
#if ENABLE_CLICKHOUSE_FORMAT
int mainEntryClickHouseFormat(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_COPIER || !defined(ENABLE_CLICKHOUSE_COPIER)
#if ENABLE_CLICKHOUSE_COPIER
int mainEntryClickHouseClusterCopier(int argc, char ** argv);
#endif
#if ENABLE_CLICKHOUSE_OBFUSCATOR || !defined(ENABLE_CLICKHOUSE_OBFUSCATOR)
#if ENABLE_CLICKHOUSE_OBFUSCATOR
int mainEntryClickHouseObfuscator(int argc, char ** argv);
#endif
@ -60,31 +57,31 @@ using MainFunc = int (*)(int, char**);
/// Add an item here to register new application
std::pair<const char *, MainFunc> clickhouse_applications[] =
{
#if ENABLE_CLICKHOUSE_LOCAL || !defined(ENABLE_CLICKHOUSE_LOCAL)
#if ENABLE_CLICKHOUSE_LOCAL
{"local", mainEntryClickHouseLocal},
#endif
#if ENABLE_CLICKHOUSE_CLIENT || !defined(ENABLE_CLICKHOUSE_CLIENT)
#if ENABLE_CLICKHOUSE_CLIENT
{"client", mainEntryClickHouseClient},
#endif
#if ENABLE_CLICKHOUSE_BENCHMARK || !defined(ENABLE_CLICKHOUSE_BENCHMARK)
#if ENABLE_CLICKHOUSE_BENCHMARK
{"benchmark", mainEntryClickHouseBenchmark},
#endif
#if ENABLE_CLICKHOUSE_SERVER || !defined(ENABLE_CLICKHOUSE_SERVER)
#if ENABLE_CLICKHOUSE_SERVER
{"server", mainEntryClickHouseServer},
#endif
#if ENABLE_CLICKHOUSE_EXTRACT_FROM_CONFIG || !defined(ENABLE_CLICKHOUSE_EXTRACT_FROM_CONFIG)
#if ENABLE_CLICKHOUSE_EXTRACT_FROM_CONFIG
{"extract-from-config", mainEntryClickHouseExtractFromConfig},
#endif
#if ENABLE_CLICKHOUSE_COMPRESSOR || !defined(ENABLE_CLICKHOUSE_COMPRESSOR)
#if ENABLE_CLICKHOUSE_COMPRESSOR
{"compressor", mainEntryClickHouseCompressor},
#endif
#if ENABLE_CLICKHOUSE_FORMAT || !defined(ENABLE_CLICKHOUSE_FORMAT)
#if ENABLE_CLICKHOUSE_FORMAT
{"format", mainEntryClickHouseFormat},
#endif
#if ENABLE_CLICKHOUSE_COPIER || !defined(ENABLE_CLICKHOUSE_COPIER)
#if ENABLE_CLICKHOUSE_COPIER
{"copier", mainEntryClickHouseClusterCopier},
#endif
#if ENABLE_CLICKHOUSE_OBFUSCATOR || !defined(ENABLE_CLICKHOUSE_OBFUSCATOR)
#if ENABLE_CLICKHOUSE_OBFUSCATOR
{"obfuscator", mainEntryClickHouseObfuscator},
#endif
};
@ -127,9 +124,10 @@ enum class InstructionFail
SSSE3 = 2,
SSE4_1 = 3,
SSE4_2 = 4,
AVX = 5,
AVX2 = 6,
AVX512 = 7
POPCNT = 5,
AVX = 6,
AVX2 = 7,
AVX512 = 8
};
const char * instructionFailToString(InstructionFail fail)
@ -146,6 +144,8 @@ const char * instructionFailToString(InstructionFail fail)
return "SSE4.1";
case InstructionFail::SSE4_2:
return "SSE4.2";
case InstructionFail::POPCNT:
return "POPCNT";
case InstructionFail::AVX:
return "AVX";
case InstructionFail::AVX2:
@ -189,6 +189,16 @@ void checkRequiredInstructionsImpl(volatile InstructionFail & fail)
__asm__ volatile ("pcmpgtq %%xmm0, %%xmm0" : : : "xmm0");
#endif
/// Defined by -msse4.2
#if defined(__POPCNT__)
fail = InstructionFail::POPCNT;
{
uint64_t a = 0;
uint64_t b = 0;
__asm__ volatile ("popcnt %1, %0" : "=r"(a) :"r"(b) :);
}
#endif
#if defined(__AVX__)
fail = InstructionFail::AVX;
__asm__ volatile ("vaddpd %%ymm0, %%ymm0, %%ymm0" : : : "ymm0");

View File

@ -1,21 +1,6 @@
set(CLICKHOUSE_SERVER_SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/HTTPHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/HTTPHandlerFactory.cpp
${CMAKE_CURRENT_SOURCE_DIR}/InterserverIOHTTPHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/MetricsTransmitter.cpp
${CMAKE_CURRENT_SOURCE_DIR}/NotFoundHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/PrometheusMetricsWriter.cpp
${CMAKE_CURRENT_SOURCE_DIR}/PrometheusRequestHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/ReplicasStatusHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/StaticRequestHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/Server.cpp
${CMAKE_CURRENT_SOURCE_DIR}/TCPHandler.cpp
)
set(CLICKHOUSE_SERVER_SOURCES
${CLICKHOUSE_SERVER_SOURCES}
${CMAKE_CURRENT_SOURCE_DIR}/MySQLHandler.cpp
${CMAKE_CURRENT_SOURCE_DIR}/MySQLHandlerFactory.cpp
MetricsTransmitter.cpp
Server.cpp
)
set (CLICKHOUSE_SERVER_LINK

View File

@ -53,13 +53,13 @@
#include <Dictionaries/registerDictionaries.h>
#include <Disks/registerDisks.h>
#include <Common/Config/ConfigReloader.h>
#include "HTTPHandlerFactory.h"
#include <Server/HTTPHandlerFactory.h>
#include "MetricsTransmitter.h"
#include <Common/StatusFile.h>
#include "TCPHandlerFactory.h"
#include <Server/TCPHandlerFactory.h>
#include <Common/SensitiveDataMasker.h>
#include <Common/ThreadFuzzer.h>
#include "MySQLHandlerFactory.h"
#include <Server/MySQLHandlerFactory.h>
#if !defined(ARCADIA_BUILD)
# include "config_core.h"

View File

@ -1,6 +1,6 @@
#pragma once
#include "IServer.h"
#include <Server/IServer.h>
#include <daemon/BaseDaemon.h>

View File

@ -11,19 +11,8 @@ PEERDIR(
SRCS(
clickhouse-server.cpp
HTTPHandler.cpp
HTTPHandlerFactory.cpp
InterserverIOHTTPHandler.cpp
MetricsTransmitter.cpp
MySQLHandler.cpp
MySQLHandlerFactory.cpp
NotFoundHandler.cpp
PrometheusMetricsWriter.cpp
PrometheusRequestHandler.cpp
ReplicasStatusHandler.cpp
StaticRequestHandler.cpp
Server.cpp
TCPHandler.cpp
)
END()

View File

@ -1,3 +1,27 @@
RECURSE(
server
PROGRAM(clickhouse)
CFLAGS(
-DENABLE_CLICKHOUSE_CLIENT
-DENABLE_CLICKHOUSE_EXTRACT_FROM_CONFIG
-DENABLE_CLICKHOUSE_SERVER
)
PEERDIR(
clickhouse/base/daemon
clickhouse/base/loggers
clickhouse/programs/client/readpassphrase
clickhouse/src
)
SRCS(
main.cpp
client/Client.cpp
client/ConnectionParameters.cpp
client/Suggest.cpp
extract-from-config/ExtractFromConfig.cpp
server/Server.cpp
server/MetricsTransmitter.cpp
)
END()

View File

@ -58,6 +58,7 @@ add_subdirectory (TableFunctions)
add_subdirectory (Processors)
add_subdirectory (Formats)
add_subdirectory (Compression)
add_subdirectory (Server)
set(dbms_headers)
@ -145,6 +146,7 @@ add_object_library(clickhouse_storages_distributed Storages/Distributed)
add_object_library(clickhouse_storages_mergetree Storages/MergeTree)
add_object_library(clickhouse_storages_liveview Storages/LiveView)
add_object_library(clickhouse_client Client)
add_object_library(clickhouse_server Server)
add_object_library(clickhouse_formats Formats)
add_object_library(clickhouse_processors Processors)
add_object_library(clickhouse_processors_executors Processors/Executors)

View File

@ -94,6 +94,12 @@ namespace CurrentMetrics
amount = new_amount;
}
void sub(Value value = 1)
{
what->fetch_sub(value, std::memory_order_relaxed);
amount -= value;
}
/// Subtract value before destructor.
void destroy()
{

View File

@ -25,6 +25,9 @@ CFLAGS (GLOBAL -DDBMS_VERSION_MINOR=0)
CFLAGS (GLOBAL -DDBMS_VERSION_PATCH=0)
CFLAGS (GLOBAL -DVERSION_FULL=\"ClickHouse\")
CFLAGS (GLOBAL -DVERSION_INTEGER=0)
CFLAGS (GLOBAL -DVERSION_MAJOR=0)
CFLAGS (GLOBAL -DVERSION_MINOR=0)
CFLAGS (GLOBAL -DVERSION_PATCH=0)
CFLAGS (GLOBAL -DVERSION_NAME=\"ClickHouse\")
CFLAGS (GLOBAL -DVERSION_OFFICIAL=\"\\\(arcadia\\\)\")
CFLAGS (GLOBAL -DVERSION_REVISION=0)

View File

@ -156,7 +156,7 @@ struct Settings : public SettingsCollection<Settings>
M(SettingUInt64, priority, 0, "Priority of the query. 1 - the highest, higher value - lower priority; 0 - do not use priorities.", 0) \
M(SettingInt64, os_thread_priority, 0, "If non zero - set corresponding 'nice' value for query processing threads. Can be used to adjust query priority for OS scheduler.", 0) \
\
M(SettingBool, log_queries, 0, "Log requests and write the log to the system table.", 0) \
M(SettingBool, log_queries, 1, "Log requests and write the log to the system table.", 0) \
M(SettingLogQueriesType, log_queries_min_type, QueryLogElementType::QUERY_START, "query_log minimal type to log, possible values (from low to high): QUERY_START, QUERY_FINISH, EXCEPTION_BEFORE_START, EXCEPTION_WHILE_PROCESSING.", 0) \
M(SettingUInt64, log_queries_cut_to_length, 100000, "If query length is greater than specified threshold (in bytes), then cut query when writing to query log. Also limit length of printed query in ordinary text log.", 0) \
\

View File

@ -21,19 +21,10 @@ class NullAndDoCopyBlockInputStream : public IBlockInputStream
{
public:
NullAndDoCopyBlockInputStream(const BlockInputStreamPtr & input_, BlockOutputStreamPtr output_)
: input(std::move(input_))
, output(std::move(output_))
{
input_streams.push_back(input_);
output_streams.push_back(output_);
for (auto & input_stream : input_streams)
children.push_back(input_stream);
}
NullAndDoCopyBlockInputStream(const BlockInputStreams & input_, BlockOutputStreams & output_)
: input_streams(input_), output_streams(output_)
{
for (auto & input_stream : input_)
children.push_back(input_stream);
children.push_back(input);
}
/// Suppress readPrefix and readSuffix, because they are called by copyData.
@ -53,16 +44,13 @@ protected:
/// If query was cancelled, it will be processed by child streams.
/// Part of the data will be processed.
if (input_streams.size() == 1 && output_streams.size() == 1)
copyData(*input_streams.at(0), *output_streams.at(0));
else
copyData(input_streams, output_streams);
copyData(*input, *output);
return Block();
}
private:
BlockInputStreams input_streams;
BlockOutputStreams output_streams;
BlockInputStreamPtr input;
BlockOutputStreamPtr output;
};
}

View File

@ -5,6 +5,8 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Columns/ColumnConst.h>
#include <Interpreters/addTypeConversionToAST.h>
#include <Storages/TTLMode.h>
#include <Interpreters/Context.h>
namespace DB
{
@ -67,6 +69,32 @@ TTLBlockInputStream::TTLBlockInputStream(
default_expr_list, storage.getColumns().getAllPhysical());
defaults_expression = ExpressionAnalyzer{default_expr_list, syntax_result, storage.global_context}.getActions(true);
}
if (storage.hasRowsTTL() && storage.getRowsTTL().mode == TTLMode::GROUP_BY)
{
current_key_value.resize(storage.getRowsTTL().group_by_keys.size());
ColumnNumbers keys;
for (const auto & key : storage.getRowsTTL().group_by_keys)
keys.push_back(header.getPositionByName(key));
agg_key_columns.resize(storage.getRowsTTL().group_by_keys.size());
AggregateDescriptions aggregates = storage.getRowsTTL().aggregate_descriptions;
for (auto & descr : aggregates)
if (descr.arguments.empty())
for (const auto & name : descr.argument_names)
descr.arguments.push_back(header.getPositionByName(name));
agg_aggregate_columns.resize(storage.getRowsTTL().aggregate_descriptions.size());
const Settings & settings = storage.global_context.getSettingsRef();
Aggregator::Params params(header, keys, aggregates,
false, settings.max_rows_to_group_by, settings.group_by_overflow_mode,
SettingUInt64(0), SettingUInt64(0),
settings.max_bytes_before_external_group_by, settings.empty_result_for_aggregation_by_empty_set,
storage.global_context.getTemporaryVolume(), settings.max_threads, settings.min_free_disk_space_for_temporary_data);
aggregator = std::make_unique<Aggregator>(params);
}
}
bool TTLBlockInputStream::isTTLExpired(time_t ttl) const
@ -77,7 +105,8 @@ bool TTLBlockInputStream::isTTLExpired(time_t ttl) const
Block TTLBlockInputStream::readImpl()
{
/// Skip all data if table ttl is expired for part
if (storage.hasRowsTTL() && isTTLExpired(old_ttl_infos.table_ttl.max))
if (storage.hasRowsTTL() && !storage.getRowsTTL().where_expression &&
storage.getRowsTTL().mode != TTLMode::GROUP_BY && isTTLExpired(old_ttl_infos.table_ttl.max))
{
rows_removed = data_part->rows_count;
return {};
@ -85,7 +114,16 @@ Block TTLBlockInputStream::readImpl()
Block block = children.at(0)->read();
if (!block)
{
if (aggregator && !agg_result.empty())
{
MutableColumns result_columns = header.cloneEmptyColumns();
finalizeAggregates(result_columns);
block = header.cloneWithColumns(std::move(result_columns));
}
return block;
}
if (storage.hasRowsTTL() && (force || isTTLExpired(old_ttl_infos.table_ttl.min)))
removeRowsWithExpiredTableTTL(block);
@ -114,36 +152,148 @@ void TTLBlockInputStream::readSuffixImpl()
void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)
{
const auto & rows_ttl = storage.getRowsTTL();
rows_ttl.expression->execute(block);
if (rows_ttl.where_expression)
rows_ttl.where_expression->execute(block);
const IColumn * ttl_column =
block.getByName(rows_ttl.result_column).column.get();
const IColumn * where_result_column = storage.getRowsTTL().where_expression ?
block.getByName(storage.getRowsTTL().where_result_column).column.get() : nullptr;
const auto & column_names = header.getNames();
MutableColumns result_columns;
result_columns.reserve(column_names.size());
for (auto it = column_names.begin(); it != column_names.end(); ++it)
if (!aggregator)
{
const IColumn * values_column = block.getByName(*it).column.get();
MutableColumnPtr result_column = values_column->cloneEmpty();
result_column->reserve(block.rows());
MutableColumns result_columns;
result_columns.reserve(column_names.size());
for (auto it = column_names.begin(); it != column_names.end(); ++it)
{
const IColumn * values_column = block.getByName(*it).column.get();
MutableColumnPtr result_column = values_column->cloneEmpty();
result_column->reserve(block.rows());
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
bool where_filter_passed = !where_result_column || where_result_column->getBool(i);
if (!isTTLExpired(cur_ttl) || !where_filter_passed)
{
new_ttl_infos.table_ttl.update(cur_ttl);
result_column->insertFrom(*values_column, i);
}
else if (it == column_names.begin())
++rows_removed;
}
result_columns.emplace_back(std::move(result_column));
}
block = header.cloneWithColumns(std::move(result_columns));
}
else
{
MutableColumns result_columns = header.cloneEmptyColumns();
MutableColumns aggregate_columns = header.cloneEmptyColumns();
size_t rows_aggregated = 0;
size_t current_key_start = 0;
size_t rows_with_current_key = 0;
for (size_t i = 0; i < block.rows(); ++i)
{
UInt32 cur_ttl = getTimestampByIndex(ttl_column, i);
if (!isTTLExpired(cur_ttl))
bool where_filter_passed = !where_result_column || where_result_column->getBool(i);
bool ttl_expired = isTTLExpired(cur_ttl) && where_filter_passed;
bool same_as_current = true;
for (size_t j = 0; j < storage.getRowsTTL().group_by_keys.size(); ++j)
{
const String & key_column = storage.getRowsTTL().group_by_keys[j];
const IColumn * values_column = block.getByName(key_column).column.get();
if (!same_as_current || (*values_column)[i] != current_key_value[j])
{
values_column->get(i, current_key_value[j]);
same_as_current = false;
}
}
if (!same_as_current)
{
if (rows_with_current_key)
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
finalizeAggregates(result_columns);
current_key_start = rows_aggregated;
rows_with_current_key = 0;
}
if (ttl_expired)
{
++rows_with_current_key;
++rows_aggregated;
for (const auto & name : column_names)
{
const IColumn * values_column = block.getByName(name).column.get();
auto & column = aggregate_columns[header.getPositionByName(name)];
column->insertFrom(*values_column, i);
}
}
else
{
new_ttl_infos.table_ttl.update(cur_ttl);
result_column->insertFrom(*values_column, i);
for (const auto & name : column_names)
{
const IColumn * values_column = block.getByName(name).column.get();
auto & column = result_columns[header.getPositionByName(name)];
column->insertFrom(*values_column, i);
}
}
else if (it == column_names.begin())
++rows_removed;
}
result_columns.emplace_back(std::move(result_column));
}
block = header.cloneWithColumns(std::move(result_columns));
if (rows_with_current_key)
calculateAggregates(aggregate_columns, current_key_start, rows_with_current_key);
block = header.cloneWithColumns(std::move(result_columns));
}
}
void TTLBlockInputStream::calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length)
{
Columns aggregate_chunk;
aggregate_chunk.reserve(aggregate_columns.size());
for (const auto & name : header.getNames())
{
const auto & column = aggregate_columns[header.getPositionByName(name)];
ColumnPtr chunk_column = column->cut(start_pos, length);
aggregate_chunk.emplace_back(std::move(chunk_column));
}
aggregator->executeOnBlock(aggregate_chunk, length, agg_result, agg_key_columns,
agg_aggregate_columns, agg_no_more_keys);
}
void TTLBlockInputStream::finalizeAggregates(MutableColumns & result_columns)
{
if (!agg_result.empty())
{
auto aggregated_res = aggregator->convertToBlocks(agg_result, true, 1);
for (auto & agg_block : aggregated_res)
{
for (const auto & it : storage.getRowsTTL().set_parts)
it.expression->execute(agg_block);
for (const auto & name : storage.getRowsTTL().group_by_keys)
{
const IColumn * values_column = agg_block.getByName(name).column.get();
auto & result_column = result_columns[header.getPositionByName(name)];
result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
}
for (const auto & it : storage.getRowsTTL().set_parts)
{
const IColumn * values_column = agg_block.getByName(it.expression_result_column_name).column.get();
auto & result_column = result_columns[header.getPositionByName(it.column_name)];
result_column->insertRangeFrom(*values_column, 0, agg_block.rows());
}
}
}
agg_result.invalidate();
}
void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)

View File

@ -3,6 +3,7 @@
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h>
#include <Interpreters/Aggregator.h>
#include <common/DateLUT.h>
@ -39,6 +40,13 @@ private:
time_t current_time;
bool force;
std::unique_ptr<Aggregator> aggregator;
std::vector<Field> current_key_value;
AggregatedDataVariants agg_result;
ColumnRawPtrs agg_key_columns;
Aggregator::AggregateColumns agg_aggregate_columns;
bool agg_no_more_keys = false;
IMergeTreeDataPart::TTLInfos old_ttl_infos;
IMergeTreeDataPart::TTLInfos new_ttl_infos;
NameSet empty_columns;
@ -59,6 +67,12 @@ private:
/// Removes rows with expired table ttl and computes new ttl_infos for part
void removeRowsWithExpiredTableTTL(Block & block);
// Calculate aggregates of aggregate_columns into agg_result
void calculateAggregates(const MutableColumns & aggregate_columns, size_t start_pos, size_t length);
/// Finalize agg_result into result_columns
void finalizeAggregates(MutableColumns & result_columns);
/// Updates TTL for moves
void updateMovesTTL(Block & block);

View File

@ -1,9 +1,6 @@
#include <thread>
#include <DataStreams/IBlockInputStream.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/copyData.h>
#include <DataStreams/ParallelInputsProcessor.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/ThreadPool.h>
@ -55,79 +52,6 @@ void copyDataImpl(IBlockInputStream & from, IBlockOutputStream & to, TCancelCall
inline void doNothing(const Block &) {}
namespace
{
struct ParallelInsertsHandler
{
using CencellationHook = std::function<void()>;
explicit ParallelInsertsHandler(BlockOutputStreams & output_streams, CencellationHook cancellation_hook_, size_t num_threads)
: outputs(output_streams.size()), cancellation_hook(std::move(cancellation_hook_))
{
exceptions.resize(num_threads);
for (auto & output : output_streams)
outputs.push(output.get());
}
void onBlock(Block & block, size_t /*thread_num*/)
{
IBlockOutputStream * out = nullptr;
outputs.pop(out);
out->write(block);
outputs.push(out);
}
void onFinishThread(size_t /*thread_num*/) {}
void onFinish() {}
void onException(std::exception_ptr & exception, size_t thread_num)
{
exceptions[thread_num] = exception;
cancellation_hook();
}
void rethrowFirstException()
{
for (auto & exception : exceptions)
if (exception)
std::rethrow_exception(exception);
}
ConcurrentBoundedQueue<IBlockOutputStream *> outputs;
std::vector<std::exception_ptr> exceptions;
CencellationHook cancellation_hook;
};
}
static void copyDataImpl(BlockInputStreams & inputs, BlockOutputStreams & outputs)
{
for (auto & output : outputs)
output->writePrefix();
using Processor = ParallelInputsProcessor<ParallelInsertsHandler>;
Processor * processor_ptr = nullptr;
ParallelInsertsHandler handler(outputs, [&processor_ptr]() { processor_ptr->cancel(false); }, inputs.size());
ParallelInputsProcessor<ParallelInsertsHandler> processor(inputs, nullptr, inputs.size(), handler);
processor_ptr = &processor;
processor.process();
processor.wait();
handler.rethrowFirstException();
/// readPrefix is called in ParallelInputsProcessor.
for (auto & input : inputs)
input->readSuffix();
for (auto & output : outputs)
output->writeSuffix();
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled)
{
auto is_cancelled_pred = [is_cancelled] ()
@ -138,11 +62,6 @@ void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<boo
copyDataImpl(from, to, is_cancelled_pred, doNothing);
}
void copyData(BlockInputStreams & inputs, BlockOutputStreams & outputs)
{
copyDataImpl(inputs, outputs);
}
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled)
{
copyDataImpl(from, to, is_cancelled, doNothing);

View File

@ -16,8 +16,6 @@ class Block;
*/
void copyData(IBlockInputStream & from, IBlockOutputStream & to, std::atomic<bool> * is_cancelled = nullptr);
void copyData(BlockInputStreams & inputs, BlockOutputStreams & outputs);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled);
void copyData(IBlockInputStream & from, IBlockOutputStream & to, const std::function<bool()> & is_cancelled,

129
src/Functions/URL/port.cpp Normal file
View File

@ -0,0 +1,129 @@
#include <Functions/FunctionFactory.h>
#include <Functions/IFunctionImpl.h>
#include <Common/StringUtils/StringUtils.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include "domain.h"
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
struct FunctionPort : public IFunction
{
static constexpr auto name = "port";
static FunctionPtr create(const Context &) { return std::make_shared<FunctionPort>(); }
String getName() const override { return name; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
bool useDefaultImplementationForConstants() const override { return true; }
ColumnNumbers getArgumentsThatAreAlwaysConstant() const override { return {1}; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() != 1 && arguments.size() != 2)
throw Exception("Number of arguments for function " + getName() + " doesn't match: passed "
+ std::to_string(arguments.size()) + ", should be 1 or 2",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
if (!WhichDataType(arguments[0].type).isString())
throw Exception("Illegal type " + arguments[0].type->getName() + " of first argument of function " + getName() + ". Must be String.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (arguments.size() == 2 && !WhichDataType(arguments[1].type).isUInt16())
throw Exception("Illegal type " + arguments[1].type->getName() + " of second argument of function " + getName() + ". Must be UInt16.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeUInt16>();
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t) override
{
UInt16 default_port = 0;
if (arguments.size() == 2)
{
const auto * port_column = checkAndGetColumn<ColumnConst>(block.getByPosition(arguments[1]).column.get());
if (!port_column)
throw Exception("Second argument for function " + getName() + " must be constant UInt16", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
default_port = port_column->getValue<UInt16>();
}
const ColumnPtr url_column = block.getByPosition(arguments[0]).column;
if (const ColumnString * url_strs = checkAndGetColumn<ColumnString>(url_column.get()))
{
auto col_res = ColumnVector<UInt16>::create();
typename ColumnVector<UInt16>::Container & vec_res = col_res->getData();
vec_res.resize(url_column->size());
vector(default_port, url_strs->getChars(), url_strs->getOffsets(), vec_res);
block.getByPosition(result).column = std::move(col_res);
}
else
throw Exception(
"Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
private:
static void vector(UInt16 default_port, const ColumnString::Chars & data, const ColumnString::Offsets & offsets, PaddedPODArray<UInt16> & res)
{
size_t size = offsets.size();
ColumnString::Offset prev_offset = 0;
for (size_t i = 0; i < size; ++i)
{
res[i] = extractPort(default_port, data, prev_offset, offsets[i] - prev_offset - 1);
prev_offset = offsets[i];
}
}
static UInt16 extractPort(UInt16 default_port, const ColumnString::Chars & buf, size_t offset, size_t size)
{
const char * p = reinterpret_cast<const char *>(&buf[0]) + offset;
const char * end = p + size;
StringRef host = getURLHost(p, size);
if (!host.size)
return default_port;
if (host.size == size)
return default_port;
p = host.data + host.size;
if (*p++ != ':')
return default_port;
Int64 port = default_port;
while (p < end)
{
if (*p == '/')
break;
if (!isNumericASCII(*p))
return default_port;
port = (port * 10) + (*p - '0');
if (port < 0 || port > UInt16(-1))
return default_port;
++p;
}
return port;
}
};
void registerFunctionPort(FunctionFactory & factory)
{
factory.registerFunction<FunctionPort>();
}
}

View File

@ -8,6 +8,7 @@ void registerFunctionDomain(FunctionFactory & factory);
void registerFunctionDomainWithoutWWW(FunctionFactory & factory);
void registerFunctionFirstSignificantSubdomain(FunctionFactory & factory);
void registerFunctionTopLevelDomain(FunctionFactory & factory);
void registerFunctionPort(FunctionFactory & factory);
void registerFunctionPath(FunctionFactory & factory);
void registerFunctionPathFull(FunctionFactory & factory);
void registerFunctionQueryString(FunctionFactory & factory);
@ -33,6 +34,7 @@ void registerFunctionsURL(FunctionFactory & factory)
registerFunctionDomainWithoutWWW(factory);
registerFunctionFirstSignificantSubdomain(factory);
registerFunctionTopLevelDomain(factory);
registerFunctionPort(factory);
registerFunctionPath(factory);
registerFunctionPathFull(factory);
registerFunctionQueryString(factory);

View File

@ -425,6 +425,7 @@ SRCS(
URL/path.cpp
URL/pathFull.cpp
URL/protocol.cpp
URL/port.cpp
URL/queryStringAndFragment.cpp
URL/queryString.cpp
URL/registerFunctionsURL.cpp

View File

@ -192,61 +192,65 @@ void ExpressionAnalyzer::analyzeAggregation()
if (has_aggregation)
{
getSelectQuery(); /// assertSelect()
/// Find out aggregation keys.
if (select_query->groupBy())
if (select_query)
{
NameSet unique_keys;
ASTs & group_asts = select_query->groupBy()->children;
for (ssize_t i = 0; i < ssize_t(group_asts.size()); ++i)
if (select_query->groupBy())
{
ssize_t size = group_asts.size();
getRootActionsNoMakeSet(group_asts[i], true, temp_actions, false);
const auto & column_name = group_asts[i]->getColumnName();
const auto & block = temp_actions->getSampleBlock();
if (!block.has(column_name))
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
const auto & col = block.getByName(column_name);
/// Constant expressions have non-null column pointer at this stage.
if (col.column && isColumnConst(*col.column))
NameSet unique_keys;
ASTs & group_asts = select_query->groupBy()->children;
for (ssize_t i = 0; i < ssize_t(group_asts.size()); ++i)
{
/// But don't remove last key column if no aggregate functions, otherwise aggregation will not work.
if (!aggregate_descriptions.empty() || size > 1)
ssize_t size = group_asts.size();
getRootActionsNoMakeSet(group_asts[i], true, temp_actions, false);
const auto & column_name = group_asts[i]->getColumnName();
const auto & block = temp_actions->getSampleBlock();
if (!block.has(column_name))
throw Exception("Unknown identifier (in GROUP BY): " + column_name, ErrorCodes::UNKNOWN_IDENTIFIER);
const auto & col = block.getByName(column_name);
/// Constant expressions have non-null column pointer at this stage.
if (col.column && isColumnConst(*col.column))
{
if (i + 1 < static_cast<ssize_t>(size))
group_asts[i] = std::move(group_asts.back());
/// But don't remove last key column if no aggregate functions, otherwise aggregation will not work.
if (!aggregate_descriptions.empty() || size > 1)
{
if (i + 1 < static_cast<ssize_t>(size))
group_asts[i] = std::move(group_asts.back());
group_asts.pop_back();
group_asts.pop_back();
--i;
continue;
--i;
continue;
}
}
NameAndTypePair key{column_name, col.type};
/// Aggregation keys are uniqued.
if (!unique_keys.count(key.name))
{
unique_keys.insert(key.name);
aggregation_keys.push_back(key);
/// Key is no longer needed, therefore we can save a little by moving it.
aggregated_columns.push_back(std::move(key));
}
}
NameAndTypePair key{column_name, col.type};
/// Aggregation keys are uniqued.
if (!unique_keys.count(key.name))
if (group_asts.empty())
{
unique_keys.insert(key.name);
aggregation_keys.push_back(key);
/// Key is no longer needed, therefore we can save a little by moving it.
aggregated_columns.push_back(std::move(key));
select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, {});
has_aggregation = select_query->having() || !aggregate_descriptions.empty();
}
}
if (group_asts.empty())
{
select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, {});
has_aggregation = select_query->having() || !aggregate_descriptions.empty();
}
}
else
aggregated_columns = temp_actions->getSampleBlock().getNamesAndTypesList();
for (const auto & desc : aggregate_descriptions)
aggregated_columns.emplace_back(desc.column_name, desc.function->getReturnType());
@ -926,7 +930,7 @@ void ExpressionAnalyzer::appendExpression(ExpressionActionsChain & chain, const
ExpressionActionsPtr ExpressionAnalyzer::getActions(bool add_aliases, bool project_result)
{
ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(sourceColumns(), context);
ExpressionActionsPtr actions = std::make_shared<ExpressionActions>(aggregated_columns, context);
NamesWithAliases result_columns;
Names result_names;

View File

@ -28,6 +28,11 @@
#include <Storages/StorageDistributed.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/checkStackSize.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/NullSink.h>
#include <Processors/Transforms/ConvertingTransform.h>
#include <Processors/Sources/SinkToOutputStream.h>
#include <Processors/ConcatProcessor.h>
namespace DB
@ -117,8 +122,6 @@ BlockIO InterpreterInsertQuery::execute()
if (!query.table_function)
context.checkAccess(AccessType::INSERT, query.table_id, query_sample_block.getNames());
BlockInputStreams in_streams;
BlockOutputStreams out_streams;
bool is_distributed_insert_select = false;
if (query.select && table->isRemote() && settings.parallel_distributed_insert_select)
@ -159,6 +162,8 @@ BlockIO InterpreterInsertQuery::execute()
const auto & cluster = storage_src->getCluster();
const auto & shards_info = cluster->getShardsInfo();
std::vector<QueryPipeline> pipelines;
String new_query_str = queryToString(new_query);
for (size_t shard_index : ext::range(0, shards_info.size()))
{
@ -166,8 +171,7 @@ BlockIO InterpreterInsertQuery::execute()
if (shard_info.isLocal())
{
InterpreterInsertQuery interpreter(new_query, context);
auto block_io = interpreter.execute();
in_streams.push_back(block_io.in);
pipelines.emplace_back(interpreter.execute().pipeline);
}
else
{
@ -179,13 +183,20 @@ BlockIO InterpreterInsertQuery::execute()
/// INSERT SELECT query returns empty block
auto in_stream = std::make_shared<RemoteBlockInputStream>(std::move(connections), new_query_str, Block{}, context);
in_streams.push_back(in_stream);
pipelines.emplace_back();
pipelines.back().init(Pipe(std::make_shared<SourceFromInputStream>(std::move(in_stream))));
pipelines.back().setSinks([](const Block & header, QueryPipeline::StreamType) -> ProcessorPtr
{
return std::make_shared<EmptySink>(header);
});
}
out_streams.push_back(std::make_shared<NullBlockOutputStream>(Block()));
}
res.pipeline.unitePipelines(std::move(pipelines), {});
}
}
BlockOutputStreams out_streams;
if (!is_distributed_insert_select || query.watch)
{
size_t out_streams_size = 1;
@ -193,27 +204,21 @@ BlockIO InterpreterInsertQuery::execute()
{
/// Passing 1 as subquery_depth will disable limiting size of intermediate result.
InterpreterSelectWithUnionQuery interpreter_select{ query.select, context, SelectQueryOptions(QueryProcessingStage::Complete, 1)};
res.pipeline = interpreter_select.executeWithProcessors();
if (table->supportsParallelInsert() && settings.max_insert_threads > 1)
{
in_streams = interpreter_select.executeWithMultipleStreams(res.pipeline);
out_streams_size = std::min(size_t(settings.max_insert_threads), in_streams.size());
}
out_streams_size = std::min(size_t(settings.max_insert_threads), res.pipeline.getNumStreams());
if (out_streams_size == 1)
res.pipeline.addPipe({std::make_shared<ConcatProcessor>(res.pipeline.getHeader(), res.pipeline.getNumStreams())});
else
{
res = interpreter_select.execute();
in_streams.emplace_back(res.in);
res.in = nullptr;
res.out = nullptr;
}
res.pipeline.resize(out_streams_size);
}
else if (query.watch)
{
InterpreterWatchQuery interpreter_watch{ query.watch, context };
res = interpreter_watch.execute();
in_streams.emplace_back(res.in);
res.in = nullptr;
res.out = nullptr;
res.pipeline.init(Pipe(std::make_shared<SourceFromInputStream>(std::move(res.in))));
}
for (size_t i = 0; i < out_streams_size; i++)
@ -256,27 +261,35 @@ BlockIO InterpreterInsertQuery::execute()
}
/// What type of query: INSERT or INSERT SELECT or INSERT WATCH?
if (query.select || query.watch)
if (is_distributed_insert_select)
{
for (auto & in_stream : in_streams)
{
in_stream = std::make_shared<ConvertingBlockInputStream>(
in_stream, out_streams.at(0)->getHeader(), ConvertingBlockInputStream::MatchColumnsMode::Position);
}
/// Pipeline was already built.
}
else if (query.select || query.watch)
{
const auto & header = out_streams.at(0)->getHeader();
Block in_header = in_streams.at(0)->getHeader();
if (in_streams.size() > 1)
res.pipeline.addSimpleTransform([&](const Block & in_header) -> ProcessorPtr
{
for (size_t i = 1; i < in_streams.size(); ++i)
assertBlocksHaveEqualStructure(in_streams[i]->getHeader(), in_header, query.select ? "INSERT SELECT" : "INSERT WATCH");
}
return std::make_shared<ConvertingTransform>(in_header, header,
ConvertingTransform::MatchColumnsMode::Position);
});
res.in = std::make_shared<NullAndDoCopyBlockInputStream>(in_streams, out_streams);
res.pipeline.setSinks([&](const Block &, QueryPipeline::StreamType type) -> ProcessorPtr
{
if (type != QueryPipeline::StreamType::Main)
return nullptr;
auto stream = std::move(out_streams.back());
out_streams.pop_back();
return std::make_shared<SinkToOutputStream>(std::move(stream));
});
if (!allow_materialized)
{
for (const auto & column : table->getColumns())
if (column.default_desc.kind == ColumnDefaultKind::Materialized && in_header.has(column.name))
if (column.default_desc.kind == ColumnDefaultKind::Materialized && header.has(column.name))
throw Exception("Cannot insert column " + column.name + ", because it is MATERIALIZED column.", ErrorCodes::ILLEGAL_COLUMN);
}
}
@ -288,6 +301,7 @@ BlockIO InterpreterInsertQuery::execute()
}
else
res.out = std::move(out_streams.at(0));
res.pipeline.addStorageHolder(table);
return res;

View File

@ -839,7 +839,7 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyzeSelect(
return std::make_shared<const SyntaxAnalyzerResult>(result);
}
SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTypesList & source_columns, ConstStoragePtr storage) const
SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTypesList & source_columns, ConstStoragePtr storage, bool allow_aggregations) const
{
if (query->as<ASTSelectQuery>())
throw Exception("Not select analyze for select asts.", ErrorCodes::LOGICAL_ERROR);
@ -855,7 +855,20 @@ SyntaxAnalyzerResultPtr SyntaxAnalyzer::analyze(ASTPtr & query, const NamesAndTy
optimizeIf(query, result.aliases, settings.optimize_if_chain_to_miltiif);
assertNoAggregates(query, "in wrong place");
if (allow_aggregations)
{
GetAggregatesVisitor::Data data;
GetAggregatesVisitor(data).visit(query);
/// There can not be other aggregate functions within the aggregate functions.
for (const ASTFunction * node : data.aggregates)
for (auto & arg : node->arguments->children)
assertNoAggregates(arg, "inside another aggregate function");
result.aggregates = data.aggregates;
}
else
assertNoAggregates(query, "in wrong place");
result.collectUsedColumns(query);
return std::make_shared<const SyntaxAnalyzerResult>(result);
}

View File

@ -86,7 +86,7 @@ public:
{}
/// Analyze and rewrite not select query
SyntaxAnalyzerResultPtr analyze(ASTPtr & query, const NamesAndTypesList & source_columns_, ConstStoragePtr storage = {}) const;
SyntaxAnalyzerResultPtr analyze(ASTPtr & query, const NamesAndTypesList & source_columns_, ConstStoragePtr storage = {}, bool allow_aggregations = false) const;
/// Analyze and rewrite select query
SyntaxAnalyzerResultPtr analyzeSelect(

View File

@ -363,6 +363,9 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
else
res = interpreter->execute();
if (res.pipeline.initialized())
use_processors = true;
if (const auto * insert_interpreter = typeid_cast<const InterpreterInsertQuery *>(&*interpreter))
{
/// Save insertion table (not table function). TODO: support remote() table function.
@ -390,7 +393,7 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
/// Limits apply only to the final result.
pipeline.setProgressCallback(context.getProgressCallback());
pipeline.setProcessListElement(context.getProcessListElement());
if (stage == QueryProcessingStage::Complete)
if (stage == QueryProcessingStage::Complete && !pipeline.isCompleted())
{
pipeline.resize(1);
pipeline.addSimpleTransform([&](const Block & header)
@ -771,29 +774,36 @@ void executeQuery(
if (ast_query_with_output && ast_query_with_output->settings_ast)
InterpreterSetQuery(ast_query_with_output->settings_ast, context).executeForCurrentContext();
pipeline.addSimpleTransform([](const Block & header)
if (!pipeline.isCompleted())
{
return std::make_shared<MaterializingTransform>(header);
});
pipeline.addSimpleTransform([](const Block & header)
{
return std::make_shared<MaterializingTransform>(header);
});
auto out = context.getOutputFormatProcessor(format_name, *out_buf, pipeline.getHeader());
out->setAutoFlush();
auto out = context.getOutputFormatProcessor(format_name, *out_buf, pipeline.getHeader());
out->setAutoFlush();
/// Save previous progress callback if any. TODO Do it more conveniently.
auto previous_progress_callback = context.getProgressCallback();
/// Save previous progress callback if any. TODO Do it more conveniently.
auto previous_progress_callback = context.getProgressCallback();
/// NOTE Progress callback takes shared ownership of 'out'.
pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress)
/// NOTE Progress callback takes shared ownership of 'out'.
pipeline.setProgressCallback([out, previous_progress_callback] (const Progress & progress)
{
if (previous_progress_callback)
previous_progress_callback(progress);
out->onProgress(progress);
});
if (set_result_details)
set_result_details(context.getClientInfo().current_query_id, out->getContentType(), format_name, DateLUT::instance().getTimeZone());
pipeline.setOutputFormat(std::move(out));
}
else
{
if (previous_progress_callback)
previous_progress_callback(progress);
out->onProgress(progress);
});
if (set_result_details)
set_result_details(context.getClientInfo().current_query_id, out->getContentType(), format_name, DateLUT::instance().getTimeZone());
pipeline.setOutput(std::move(out));
pipeline.setProgressCallback(context.getProgressCallback());
}
{
auto executor = pipeline.execute();

View File

@ -7,21 +7,90 @@
namespace DB
{
ASTPtr ASTTTLElement::clone() const
{
auto clone = std::make_shared<ASTTTLElement>(*this);
clone->children.clear();
clone->ttl_expr_pos = -1;
clone->where_expr_pos = -1;
clone->setExpression(clone->ttl_expr_pos, getExpression(ttl_expr_pos, true));
clone->setExpression(clone->where_expr_pos, getExpression(where_expr_pos, true));
for (auto & expr : clone->group_by_key)
expr = expr->clone();
for (auto & [name, expr] : clone->group_by_aggregations)
expr = expr->clone();
return clone;
}
void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
children.front()->formatImpl(settings, state, frame);
if (destination_type == DataDestinationType::DISK)
ttl()->formatImpl(settings, state, frame);
if (mode == TTLMode::MOVE && destination_type == DataDestinationType::DISK)
{
settings.ostr << " TO DISK " << quoteString(destination_name);
}
else if (destination_type == DataDestinationType::VOLUME)
else if (mode == TTLMode::MOVE && destination_type == DataDestinationType::VOLUME)
{
settings.ostr << " TO VOLUME " << quoteString(destination_name);
}
else if (destination_type == DataDestinationType::DELETE)
else if (mode == TTLMode::GROUP_BY)
{
settings.ostr << " GROUP BY ";
for (auto it = group_by_key.begin(); it != group_by_key.end(); ++it)
{
if (it != group_by_key.begin())
settings.ostr << ", ";
(*it)->formatImpl(settings, state, frame);
}
if (!group_by_aggregations.empty())
{
settings.ostr << " SET ";
for (auto it = group_by_aggregations.begin(); it != group_by_aggregations.end(); ++it)
{
if (it != group_by_aggregations.begin())
settings.ostr << ", ";
settings.ostr << it->first << " = ";
it->second->formatImpl(settings, state, frame);
}
}
}
else if (mode == TTLMode::DELETE)
{
/// It would be better to output "DELETE" here but that will break compatibility with earlier versions.
}
if (where())
{
settings.ostr << " WHERE ";
where()->formatImpl(settings, state, frame);
}
}
void ASTTTLElement::setExpression(int & pos, ASTPtr && ast)
{
if (ast)
{
if (pos == -1)
{
pos = children.size();
children.emplace_back(ast);
}
else
children[pos] = ast;
}
else if (pos != -1)
{
children[pos] = ASTPtr{};
pos = -1;
}
}
ASTPtr ASTTTLElement::getExpression(int pos, bool clone) const
{
return pos != -1 ? (clone ? children[pos]->clone() : children[pos]) : ASTPtr{};
}
}

View File

@ -2,35 +2,53 @@
#include <Parsers/IAST.h>
#include <Storages/DataDestinationType.h>
#include <Storages/TTLMode.h>
namespace DB
{
/** Element of TTL expression.
*/
class ASTTTLElement : public IAST
{
public:
TTLMode mode;
DataDestinationType destination_type;
String destination_name;
ASTTTLElement(DataDestinationType destination_type_, const String & destination_name_)
: destination_type(destination_type_)
ASTs group_by_key;
std::vector<std::pair<String, ASTPtr>> group_by_aggregations;
ASTTTLElement(TTLMode mode_, DataDestinationType destination_type_, const String & destination_name_)
: mode(mode_)
, destination_type(destination_type_)
, destination_name(destination_name_)
, ttl_expr_pos(-1)
, where_expr_pos(-1)
{
}
String getID(char) const override { return "TTLElement"; }
ASTPtr clone() const override
{
auto clone = std::make_shared<ASTTTLElement>(*this);
clone->cloneChildren();
return clone;
}
ASTPtr clone() const override;
const ASTPtr ttl() const { return getExpression(ttl_expr_pos); }
const ASTPtr where() const { return getExpression(where_expr_pos); }
void setTTL(ASTPtr && ast) { setExpression(ttl_expr_pos, std::forward<ASTPtr>(ast)); }
void setWhere(ASTPtr && ast) { setExpression(where_expr_pos, std::forward<ASTPtr>(ast)); }
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
private:
int ttl_expr_pos;
int where_expr_pos;
private:
void setExpression(int & pos, ASTPtr && ast);
ASTPtr getExpression(int pos, bool clone = false) const;
};
}

View File

@ -1455,23 +1455,50 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_to_disk("TO DISK");
ParserKeyword s_to_volume("TO VOLUME");
ParserKeyword s_delete("DELETE");
ParserKeyword s_where("WHERE");
ParserKeyword s_group_by("GROUP BY");
ParserKeyword s_set("SET");
ParserToken s_comma(TokenType::Comma);
ParserToken s_eq(TokenType::Equals);
ParserIdentifier parser_identifier;
ParserStringLiteral parser_string_literal;
ParserExpression parser_exp;
ParserExpressionList parser_expression_list(false);
ASTPtr expr_elem;
if (!parser_exp.parse(pos, expr_elem, expected))
ASTPtr ttl_expr;
if (!parser_exp.parse(pos, ttl_expr, expected))
return false;
TTLMode mode;
DataDestinationType destination_type = DataDestinationType::DELETE;
String destination_name;
if (s_to_disk.ignore(pos))
destination_type = DataDestinationType::DISK;
else if (s_to_volume.ignore(pos))
destination_type = DataDestinationType::VOLUME;
else
s_delete.ignore(pos);
if (destination_type == DataDestinationType::DISK || destination_type == DataDestinationType::VOLUME)
if (s_to_disk.ignore(pos))
{
mode = TTLMode::MOVE;
destination_type = DataDestinationType::DISK;
}
else if (s_to_volume.ignore(pos))
{
mode = TTLMode::MOVE;
destination_type = DataDestinationType::VOLUME;
}
else if (s_group_by.ignore(pos))
{
mode = TTLMode::GROUP_BY;
}
else
{
s_delete.ignore(pos);
mode = TTLMode::DELETE;
}
ASTPtr where_expr;
ASTPtr ast_group_by_key;
std::vector<std::pair<String, ASTPtr>> group_by_aggregations;
if (mode == TTLMode::MOVE)
{
ASTPtr ast_space_name;
if (!parser_string_literal.parse(pos, ast_space_name, expected))
@ -1479,10 +1506,52 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
destination_name = ast_space_name->as<ASTLiteral &>().value.get<const String &>();
}
else if (mode == TTLMode::GROUP_BY)
{
if (!parser_expression_list.parse(pos, ast_group_by_key, expected))
return false;
node = std::make_shared<ASTTTLElement>(destination_type, destination_name);
node->children.push_back(expr_elem);
if (s_set.ignore(pos))
{
while (true)
{
if (!group_by_aggregations.empty() && !s_comma.ignore(pos))
break;
ASTPtr name;
ASTPtr value;
if (!parser_identifier.parse(pos, name, expected))
return false;
if (!s_eq.ignore(pos))
return false;
if (!parser_exp.parse(pos, value, expected))
return false;
String name_str;
if (!tryGetIdentifierNameInto(name, name_str))
return false;
group_by_aggregations.emplace_back(name_str, std::move(value));
}
}
}
else if (mode == TTLMode::DELETE && s_where.ignore(pos))
{
if (!parser_exp.parse(pos, where_expr, expected))
return false;
}
auto ttl_element = std::make_shared<ASTTTLElement>(mode, destination_type, destination_name);
ttl_element->setTTL(std::move(ttl_expr));
if (where_expr)
ttl_element->setWhere(std::move(where_expr));
if (mode == TTLMode::GROUP_BY)
{
ttl_element->group_by_key = std::move(ast_group_by_key->children);
ttl_element->group_by_aggregations = std::move(group_by_aggregations);
}
node = ttl_element;
return true;
}

View File

@ -14,9 +14,10 @@ struct PullingAsyncPipelineExecutor::Data
{
PipelineExecutorPtr executor;
std::exception_ptr exception;
std::atomic_bool is_executed = false;
std::atomic_bool is_finished = false;
std::atomic_bool has_exception = false;
ThreadFromGlobalPool thread;
Poco::Event finish_event;
~Data()
{
@ -36,8 +37,11 @@ struct PullingAsyncPipelineExecutor::Data
PullingAsyncPipelineExecutor::PullingAsyncPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
{
lazy_format = std::make_shared<LazyOutputFormat>(pipeline.getHeader());
pipeline.setOutput(lazy_format);
if (!pipeline.isCompleted())
{
lazy_format = std::make_shared<LazyOutputFormat>(pipeline.getHeader());
pipeline.setOutputFormat(lazy_format);
}
}
PullingAsyncPipelineExecutor::~PullingAsyncPipelineExecutor()
@ -54,7 +58,8 @@ PullingAsyncPipelineExecutor::~PullingAsyncPipelineExecutor()
const Block & PullingAsyncPipelineExecutor::getHeader() const
{
return lazy_format->getPort(IOutputFormat::PortKind::Main).getHeader();
return lazy_format ? lazy_format->getPort(IOutputFormat::PortKind::Main).getHeader()
: pipeline.getHeader(); /// Empty.
}
static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGroupStatusPtr thread_group, size_t num_threads)
@ -78,6 +83,9 @@ static void threadFunction(PullingAsyncPipelineExecutor::Data & data, ThreadGrou
data.exception = std::current_exception();
data.has_exception = true;
}
data.is_finished = true;
data.finish_event.set();
}
@ -99,20 +107,33 @@ bool PullingAsyncPipelineExecutor::pull(Chunk & chunk, uint64_t milliseconds)
if (data->has_exception)
{
/// Finish lazy format in case of exception. Otherwise thread.join() may hung.
lazy_format->finish();
if (lazy_format)
lazy_format->finish();
data->has_exception = false;
std::rethrow_exception(std::move(data->exception));
}
if (lazy_format->isFinished())
bool is_execution_finished = lazy_format ? lazy_format->isFinished()
: data->is_finished.load();
if (is_execution_finished)
{
data->is_executed = true;
/// If lazy format is finished, we don't cancel pipeline but wait for main thread to be finished.
data->is_finished = true;
/// Wait thread ant rethrow exception if any.
cancel();
return false;
}
chunk = lazy_format->getChunk(milliseconds);
if (lazy_format)
{
chunk = lazy_format->getChunk(milliseconds);
return true;
}
chunk.clear();
data->finish_event.tryWait(milliseconds);
return true;
}
@ -147,11 +168,11 @@ bool PullingAsyncPipelineExecutor::pull(Block & block, uint64_t milliseconds)
void PullingAsyncPipelineExecutor::cancel()
{
/// Cancel execution if it wasn't finished.
if (data && !data->is_executed && data->executor)
if (data && !data->is_finished && data->executor)
data->executor->cancel();
/// Finish lazy format. Otherwise thread.join() may hung.
if (!lazy_format->isFinished())
if (lazy_format && !lazy_format->isFinished())
lazy_format->finish();
/// Join thread here to wait for possible exception.
@ -165,12 +186,14 @@ void PullingAsyncPipelineExecutor::cancel()
Chunk PullingAsyncPipelineExecutor::getTotals()
{
return lazy_format->getTotals();
return lazy_format ? lazy_format->getTotals()
: Chunk();
}
Chunk PullingAsyncPipelineExecutor::getExtremes()
{
return lazy_format->getExtremes();
return lazy_format ? lazy_format->getExtremes()
: Chunk();
}
Block PullingAsyncPipelineExecutor::getTotalsBlock()
@ -197,7 +220,9 @@ Block PullingAsyncPipelineExecutor::getExtremesBlock()
BlockStreamProfileInfo & PullingAsyncPipelineExecutor::getProfileInfo()
{
return lazy_format->getProfileInfo();
static BlockStreamProfileInfo profile_info;
return lazy_format ? lazy_format->getProfileInfo()
: profile_info;
}
}

View File

@ -9,7 +9,7 @@ namespace DB
PullingPipelineExecutor::PullingPipelineExecutor(QueryPipeline & pipeline_) : pipeline(pipeline_)
{
pulling_format = std::make_shared<PullingOutputFormat>(pipeline.getHeader(), has_data_flag);
pipeline.setOutput(pulling_format);
pipeline.setOutputFormat(pulling_format);
}
PullingPipelineExecutor::~PullingPipelineExecutor()

View File

@ -16,12 +16,12 @@ namespace DB
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_EXCEPTION;
extern const int CANNOT_READ_ALL_DATA;
}
ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_)
: IInputFormat(header_, in_)
ArrowBlockInputFormat::ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_, bool stream_)
: IInputFormat(header_, in_), stream{stream_}
{
prepareReader();
}
@ -30,12 +30,23 @@ Chunk ArrowBlockInputFormat::generate()
{
Chunk res;
const Block & header = getPort().getHeader();
if (record_batch_current >= record_batch_total)
return res;
std::vector<std::shared_ptr<arrow::RecordBatch>> single_batch(1);
arrow::Status read_status = file_reader->ReadRecordBatch(record_batch_current, &single_batch[0]);
arrow::Status read_status;
if (stream)
{
read_status = stream_reader->ReadNext(&single_batch[0]);
if (!single_batch[0])
return res;
}
else
{
if (record_batch_current >= record_batch_total)
return res;
read_status = file_reader->ReadRecordBatch(record_batch_current, &single_batch[0]);
}
if (!read_status.ok())
throw Exception{"Error while reading batch of Arrow data: " + read_status.ToString(),
ErrorCodes::CANNOT_READ_ALL_DATA};
@ -57,30 +68,54 @@ void ArrowBlockInputFormat::resetParser()
{
IInputFormat::resetParser();
file_reader.reset();
if (stream)
stream_reader.reset();
else
file_reader.reset();
prepareReader();
}
void ArrowBlockInputFormat::prepareReader()
{
arrow::Status open_status = arrow::ipc::RecordBatchFileReader::Open(asArrowFile(in), &file_reader);
if (!open_status.ok())
throw Exception(open_status.ToString(), ErrorCodes::BAD_ARGUMENTS);
record_batch_total = file_reader->num_record_batches();
arrow::Status status;
if (stream)
status = arrow::ipc::RecordBatchStreamReader::Open(asArrowFile(in), &stream_reader);
else
status = arrow::ipc::RecordBatchFileReader::Open(asArrowFile(in), &file_reader);
if (!status.ok())
throw Exception{"Error while opening a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
if (stream)
record_batch_total = -1;
else
record_batch_total = file_reader->num_record_batches();
record_batch_current = 0;
}
void registerInputFormatProcessorArrow(FormatFactory &factory)
{
factory.registerInputFormatProcessor(
"Arrow",
[](ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams & /* params */,
const FormatSettings & /* format_settings */)
{
return std::make_shared<ArrowBlockInputFormat>(buf, sample);
});
"Arrow",
[](ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams & /* params */,
const FormatSettings & /* format_settings */)
{
return std::make_shared<ArrowBlockInputFormat>(buf, sample, false);
});
factory.registerInputFormatProcessor(
"ArrowStream",
[](ReadBuffer & buf,
const Block & sample,
const RowInputFormatParams & /* params */,
const FormatSettings & /* format_settings */)
{
return std::make_shared<ArrowBlockInputFormat>(buf, sample, true);
});
}
}

View File

@ -4,6 +4,7 @@
#include <Processors/Formats/IInputFormat.h>
namespace arrow { class RecordBatchReader; }
namespace arrow::ipc { class RecordBatchFileReader; }
namespace DB
@ -14,7 +15,7 @@ class ReadBuffer;
class ArrowBlockInputFormat : public IInputFormat
{
public:
ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_);
ArrowBlockInputFormat(ReadBuffer & in_, const Block & header_, bool stream_);
void resetParser() override;
@ -24,12 +25,17 @@ protected:
Chunk generate() override;
private:
void prepareReader();
private:
// Whether to use ArrowStream format
bool stream;
// This field is only used for ArrowStream format
std::shared_ptr<arrow::RecordBatchReader> stream_reader;
// The following fields are used only for Arrow format
std::shared_ptr<arrow::ipc::RecordBatchFileReader> file_reader;
int record_batch_total = 0;
int record_batch_current = 0;
void prepareReader();
};
}

View File

@ -15,8 +15,8 @@ namespace ErrorCodes
extern const int UNKNOWN_EXCEPTION;
}
ArrowBlockOutputFormat::ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_)
: IOutputFormat(header_, out_), format_settings{format_settings_}, arrow_ostream{std::make_shared<ArrowBufferedOutputStream>(out_)}
ArrowBlockOutputFormat::ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, bool stream_, const FormatSettings & format_settings_)
: IOutputFormat(header_, out_), stream{stream_}, format_settings{format_settings_}, arrow_ostream{std::make_shared<ArrowBufferedOutputStream>(out_)}
{
}
@ -29,12 +29,7 @@ void ArrowBlockOutputFormat::consume(Chunk chunk)
CHColumnToArrowColumn::chChunkToArrowTable(arrow_table, header, chunk, columns_num, "Arrow");
if (!writer)
{
// TODO: should we use arrow::ipc::IpcOptions::alignment?
auto status = arrow::ipc::RecordBatchFileWriter::Open(arrow_ostream.get(), arrow_table->schema(), &writer);
if (!status.ok())
throw Exception{"Error while opening a table: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
}
prepareWriter(arrow_table->schema());
// TODO: calculate row_group_size depending on a number of rows and table size
auto status = writer->WriteTable(*arrow_table, format_settings.arrow.row_group_size);
@ -53,6 +48,20 @@ void ArrowBlockOutputFormat::finalize()
}
}
void ArrowBlockOutputFormat::prepareWriter(const std::shared_ptr<arrow::Schema> & schema)
{
arrow::Status status;
// TODO: should we use arrow::ipc::IpcOptions::alignment?
if (stream)
status = arrow::ipc::RecordBatchStreamWriter::Open(arrow_ostream.get(), schema, &writer);
else
status = arrow::ipc::RecordBatchFileWriter::Open(arrow_ostream.get(), schema, &writer);
if (!status.ok())
throw Exception{"Error while opening a table writer: " + status.ToString(), ErrorCodes::UNKNOWN_EXCEPTION};
}
void registerOutputFormatProcessorArrow(FormatFactory & factory)
{
factory.registerOutputFormatProcessor(
@ -62,7 +71,17 @@ void registerOutputFormatProcessorArrow(FormatFactory & factory)
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, format_settings);
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, false, format_settings);
});
factory.registerOutputFormatProcessor(
"ArrowStream",
[](WriteBuffer & buf,
const Block & sample,
FormatFactory::WriteCallback,
const FormatSettings & format_settings)
{
return std::make_shared<ArrowBlockOutputFormat>(buf, sample, true, format_settings);
});
}

View File

@ -6,6 +6,7 @@
#include <Processors/Formats/IOutputFormat.h>
#include "ArrowBufferedStreams.h"
namespace arrow { class Schema; }
namespace arrow::ipc { class RecordBatchWriter; }
namespace DB
@ -14,7 +15,7 @@ namespace DB
class ArrowBlockOutputFormat : public IOutputFormat
{
public:
ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, const FormatSettings & format_settings_);
ArrowBlockOutputFormat(WriteBuffer & out_, const Block & header_, bool stream_, const FormatSettings & format_settings_);
String getName() const override { return "ArrowBlockOutputFormat"; }
void consume(Chunk) override;
@ -23,9 +24,12 @@ public:
String getContentType() const override { return "application/octet-stream"; }
private:
bool stream;
const FormatSettings format_settings;
std::shared_ptr<ArrowBufferedOutputStream> arrow_ostream;
std::shared_ptr<arrow::ipc::RecordBatchWriter> writer;
void prepareWriter(const std::shared_ptr<arrow::Schema> & schema);
};
}

View File

@ -23,7 +23,7 @@ public:
/// Will connect pipes outputs with transform inputs automatically.
Pipe(Pipes && pipes, ProcessorPtr transform);
/// Create pipe from output port. If pipe was created that way, it possibly will not have tree shape.
Pipe(OutputPort * port);
explicit Pipe(OutputPort * port);
Pipe(const Pipe & other) = delete;
Pipe(Pipe && other) = default;

View File

@ -34,6 +34,14 @@ void QueryPipeline::checkInitialized()
throw Exception("QueryPipeline wasn't initialized.", ErrorCodes::LOGICAL_ERROR);
}
void QueryPipeline::checkInitializedAndNotCompleted()
{
checkInitialized();
if (streams.empty())
throw Exception("QueryPipeline was already completed.", ErrorCodes::LOGICAL_ERROR);
}
void QueryPipeline::checkSource(const ProcessorPtr & source, bool can_have_totals)
{
if (!source->getInputs().empty())
@ -194,11 +202,11 @@ static ProcessorPtr callProcessorGetter(
template <typename TProcessorGetter>
void QueryPipeline::addSimpleTransformImpl(const TProcessorGetter & getter)
{
checkInitialized();
checkInitializedAndNotCompleted();
Block header;
auto add_transform = [&](OutputPort *& stream, StreamType stream_type, size_t stream_num [[maybe_unused]] = IProcessor::NO_STREAM)
auto add_transform = [&](OutputPort *& stream, StreamType stream_type)
{
if (!stream)
return;
@ -231,17 +239,14 @@ void QueryPipeline::addSimpleTransformImpl(const TProcessorGetter & getter)
if (transform)
{
// if (stream_type == StreamType::Main)
// transform->setStream(stream_num);
connect(*stream, transform->getInputs().front());
stream = &transform->getOutputs().front();
processors.emplace_back(std::move(transform));
}
};
for (size_t stream_num = 0; stream_num < streams.size(); ++stream_num)
add_transform(streams[stream_num], StreamType::Main, stream_num);
for (auto & stream : streams)
add_transform(stream, StreamType::Main);
add_transform(totals_having_port, StreamType::Totals);
add_transform(extremes_port, StreamType::Extremes);
@ -259,9 +264,50 @@ void QueryPipeline::addSimpleTransform(const ProcessorGetterWithStreamKind & get
addSimpleTransformImpl(getter);
}
void QueryPipeline::setSinks(const ProcessorGetterWithStreamKind & getter)
{
checkInitializedAndNotCompleted();
auto add_transform = [&](OutputPort *& stream, StreamType stream_type)
{
if (!stream)
return;
auto transform = getter(stream->getHeader(), stream_type);
if (transform)
{
if (transform->getInputs().size() != 1)
throw Exception("Sink for query pipeline transform should have single input, "
"but " + transform->getName() + " has " +
toString(transform->getInputs().size()) + " inputs.", ErrorCodes::LOGICAL_ERROR);
if (!transform->getOutputs().empty())
throw Exception("Sink for query pipeline transform should have no outputs, "
"but " + transform->getName() + " has " +
toString(transform->getOutputs().size()) + " outputs.", ErrorCodes::LOGICAL_ERROR);
}
if (!transform)
transform = std::make_shared<NullSink>(stream->getHeader());
connect(*stream, transform->getInputs().front());
processors.emplace_back(std::move(transform));
};
for (auto & stream : streams)
add_transform(stream, StreamType::Main);
add_transform(totals_having_port, StreamType::Totals);
add_transform(extremes_port, StreamType::Extremes);
streams.clear();
current_header.clear();
}
void QueryPipeline::addPipe(Processors pipe)
{
checkInitialized();
checkInitializedAndNotCompleted();
if (pipe.empty())
throw Exception("Can't add empty processors list to QueryPipeline.", ErrorCodes::LOGICAL_ERROR);
@ -298,7 +344,7 @@ void QueryPipeline::addPipe(Processors pipe)
void QueryPipeline::addDelayedStream(ProcessorPtr source)
{
checkInitialized();
checkInitializedAndNotCompleted();
checkSource(source, false);
assertBlocksHaveEqualStructure(current_header, source->getOutputs().front().getHeader(), "QueryPipeline");
@ -313,7 +359,7 @@ void QueryPipeline::addDelayedStream(ProcessorPtr source)
void QueryPipeline::resize(size_t num_streams, bool force, bool strict)
{
checkInitialized();
checkInitializedAndNotCompleted();
if (!force && num_streams == getNumStreams())
return;
@ -347,7 +393,7 @@ void QueryPipeline::enableQuotaForCurrentStreams()
void QueryPipeline::addTotalsHavingTransform(ProcessorPtr transform)
{
checkInitialized();
checkInitializedAndNotCompleted();
if (!typeid_cast<const TotalsHavingTransform *>(transform.get()))
throw Exception("TotalsHavingTransform expected for QueryPipeline::addTotalsHavingTransform.",
@ -370,7 +416,7 @@ void QueryPipeline::addTotalsHavingTransform(ProcessorPtr transform)
void QueryPipeline::addDefaultTotals()
{
checkInitialized();
checkInitializedAndNotCompleted();
if (totals_having_port)
throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
@ -392,7 +438,7 @@ void QueryPipeline::addDefaultTotals()
void QueryPipeline::addTotals(ProcessorPtr source)
{
checkInitialized();
checkInitializedAndNotCompleted();
if (totals_having_port)
throw Exception("Totals having transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
@ -423,7 +469,7 @@ void QueryPipeline::dropTotalsAndExtremes()
void QueryPipeline::addExtremesTransform()
{
checkInitialized();
checkInitializedAndNotCompleted();
if (extremes_port)
throw Exception("Extremes transform was already added to pipeline.", ErrorCodes::LOGICAL_ERROR);
@ -450,7 +496,7 @@ void QueryPipeline::addExtremesTransform()
void QueryPipeline::addCreatingSetsTransform(ProcessorPtr transform)
{
checkInitialized();
checkInitializedAndNotCompleted();
if (!typeid_cast<const CreatingSetsTransform *>(transform.get()))
throw Exception("CreatingSetsTransform expected for QueryPipeline::addExtremesTransform.",
@ -467,14 +513,14 @@ void QueryPipeline::addCreatingSetsTransform(ProcessorPtr transform)
processors.emplace_back(std::move(concat));
}
void QueryPipeline::setOutput(ProcessorPtr output)
void QueryPipeline::setOutputFormat(ProcessorPtr output)
{
checkInitialized();
checkInitializedAndNotCompleted();
auto * format = dynamic_cast<IOutputFormat * >(output.get());
if (!format)
throw Exception("IOutputFormat processor expected for QueryPipeline::setOutput.", ErrorCodes::LOGICAL_ERROR);
throw Exception("IOutputFormat processor expected for QueryPipeline::setOutputFormat.", ErrorCodes::LOGICAL_ERROR);
if (output_format)
throw Exception("QueryPipeline already has output.", ErrorCodes::LOGICAL_ERROR);
@ -507,19 +553,25 @@ void QueryPipeline::setOutput(ProcessorPtr output)
connect(*totals_having_port, totals);
connect(*extremes_port, extremes);
streams.clear();
current_header.clear();
extremes_port = nullptr;
totals_having_port = nullptr;
initRowsBeforeLimit();
}
void QueryPipeline::unitePipelines(
std::vector<QueryPipeline> && pipelines, const Block & common_header)
{
checkInitialized();
addSimpleTransform([&](const Block & header)
if (initialized())
{
return std::make_shared<ConvertingTransform>(
header, common_header, ConvertingTransform::MatchColumnsMode::Position);
});
addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(
header, common_header, ConvertingTransform::MatchColumnsMode::Position);
});
}
std::vector<OutputPort *> extremes;
std::vector<OutputPort *> totals;
@ -534,11 +586,14 @@ void QueryPipeline::unitePipelines(
{
pipeline.checkInitialized();
pipeline.addSimpleTransform([&](const Block & header)
if (!pipeline.isCompleted())
{
return std::make_shared<ConvertingTransform>(
header, common_header, ConvertingTransform::MatchColumnsMode::Position);
});
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<ConvertingTransform>(
header, common_header, ConvertingTransform::MatchColumnsMode::Position);
});
}
if (pipeline.extremes_port)
{
@ -703,6 +758,11 @@ void QueryPipeline::initRowsBeforeLimit()
Pipe QueryPipeline::getPipe() &&
{
resize(1);
return std::move(std::move(*this).getPipes()[0]);
}
Pipes QueryPipeline::getPipes() &&
{
Pipe pipe(std::move(processors), streams.at(0), totals_having_port, extremes_port);
pipe.max_parallel_streams = streams.maxParallelStreams();
@ -721,15 +781,19 @@ Pipe QueryPipeline::getPipe() &&
if (extremes_port)
pipe.setExtremesPort(extremes_port);
return pipe;
Pipes pipes;
pipes.emplace_back(std::move(pipe));
for (size_t i = 1; i < streams.size(); ++i)
pipes.emplace_back(Pipe(streams[i]));
return pipes;
}
PipelineExecutorPtr QueryPipeline::execute()
{
checkInitialized();
if (!output_format)
throw Exception("Cannot execute pipeline because it doesn't have output.", ErrorCodes::LOGICAL_ERROR);
if (!isCompleted())
throw Exception("Cannot execute pipeline because it is not completed.", ErrorCodes::LOGICAL_ERROR);
return std::make_shared<PipelineExecutor>(processors, process_list_element);
}

View File

@ -28,6 +28,7 @@ private:
{
public:
auto size() const { return data.size(); }
bool empty() const { return size() == 0; }
auto begin() { return data.begin(); }
auto end() { return data.end(); }
auto & front() { return data.front(); }
@ -81,6 +82,7 @@ public:
void init(Pipes pipes);
void init(Pipe pipe); /// Simple init for single pipe
bool initialized() { return !processors.empty(); }
bool isCompleted() { return initialized() && streams.empty(); }
/// Type of logical data stream for simple transform.
/// Sometimes it's important to know which part of pipeline we are working for.
@ -95,13 +97,23 @@ public:
using ProcessorGetter = std::function<ProcessorPtr(const Block & header)>;
using ProcessorGetterWithStreamKind = std::function<ProcessorPtr(const Block & header, StreamType stream_type)>;
/// Add transform with simple input and simple output for each port.
void addSimpleTransform(const ProcessorGetter & getter);
void addSimpleTransform(const ProcessorGetterWithStreamKind & getter);
/// Add several processors. They must have same header for inputs and same for outputs.
/// Total number of inputs must be the same as the number of streams. Output ports will become new streams.
void addPipe(Processors pipe);
/// Add TotalsHavingTransform. Resize pipeline to single input. Adds totals port.
void addTotalsHavingTransform(ProcessorPtr transform);
/// Add transform which calculates extremes. This transform adds extremes port and doesn't change inputs number.
void addExtremesTransform();
/// Adds transform which creates sets. It will be executed before reading any data from input ports.
void addCreatingSetsTransform(ProcessorPtr transform);
void setOutput(ProcessorPtr output);
/// Resize pipeline to single output and add IOutputFormat. Pipeline will be completed after this transformation.
void setOutputFormat(ProcessorPtr output);
/// Sink is a processor with single input port and no output ports. Creates sink for each output port.
/// Pipeline will be completed after this transformation.
void setSinks(const ProcessorGetterWithStreamKind & getter);
/// Add totals which returns one chunk with single row with defaults.
void addDefaultTotals();
@ -118,6 +130,7 @@ public:
/// Check if resize transform was used. (In that case another distinct transform will be added).
bool hasMixedStreams() const { return has_resize || hasMoreThanOneStream(); }
/// Changes the number of input ports if needed. Adds ResizeTransform.
void resize(size_t num_streams, bool force = false, bool strict = false);
void enableQuotaForCurrentStreams();
@ -155,8 +168,9 @@ public:
/// Set upper limit for the recommend number of threads
void setMaxThreads(size_t max_threads_) { max_threads = max_threads_; }
/// Convert query pipeline to single pipe.
/// Convert query pipeline to single or several pipes.
Pipe getPipe() &&;
Pipes getPipes() &&;
private:
/// Destruction order: processors, header, locks, temporary storages, local contexts
@ -193,6 +207,7 @@ private:
QueryStatus * process_list_element = nullptr;
void checkInitialized();
void checkInitializedAndNotCompleted();
static void checkSource(const ProcessorPtr & source, bool can_have_totals);
template <typename TProcessorGetter>

View File

@ -1,5 +1,6 @@
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Interpreters/sortBlock.h>
#include <Common/PODArray.h>
namespace DB
{
@ -11,6 +12,38 @@ PartialSortingTransform::PartialSortingTransform(
{
}
static ColumnRawPtrs extractColumns(const Block & block, const SortDescription & description)
{
size_t size = description.size();
ColumnRawPtrs res;
res.reserve(size);
for (size_t i = 0; i < size; ++i)
{
const IColumn * column = !description[i].column_name.empty()
? block.getByName(description[i].column_name).column.get()
: block.safeGetByPosition(description[i].column_number).column.get();
res.emplace_back(column);
}
return res;
}
bool less(const ColumnRawPtrs & lhs, UInt64 lhs_row_num,
const ColumnRawPtrs & rhs, UInt64 rhs_row_num, const SortDescription & description)
{
size_t size = description.size();
for (size_t i = 0; i < size; ++i)
{
int res = description[i].direction * lhs[i]->compareAt(lhs_row_num, rhs_row_num, *rhs[i], 1);
if (res < 0)
return true;
else if (res > 0)
return false;
}
return false;
}
void PartialSortingTransform::transform(Chunk & chunk)
{
if (read_rows)
@ -19,7 +52,42 @@ void PartialSortingTransform::transform(Chunk & chunk)
auto block = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
chunk.clear();
ColumnRawPtrs block_columns;
UInt64 rows_num = block.rows();
if (!threshold_block_columns.empty())
{
IColumn::Filter filter(rows_num, 0);
block_columns = extractColumns(block, description);
size_t filtered_count = 0;
for (UInt64 i = 0; i < rows_num; ++i)
{
if (less(threshold_block_columns, limit - 1, block_columns, i, description))
{
++filtered_count;
filter[i] = 1;
}
}
if (filtered_count)
{
for (auto & column : block.getColumns())
{
column = column->filter(filter, filtered_count);
}
}
}
sortBlock(block, description, limit);
if (limit && limit < block.rows() &&
(threshold_block_columns.empty() || less(block_columns, limit - 1, threshold_block_columns, limit - 1, description)))
{
threshold_block = block.cloneWithColumns(block.getColumns());
threshold_block_columns = extractColumns(threshold_block, description);
}
chunk.setColumns(block.getColumns(), block.rows());
}

View File

@ -29,6 +29,8 @@ private:
SortDescription description;
UInt64 limit;
RowsBeforeLimitCounterPtr read_rows;
Block threshold_block;
ColumnRawPtrs threshold_block_columns;
};
}

View File

View File

@ -6,6 +6,7 @@
#include <Common/CurrentMetrics.h>
#include <Common/HTMLForm.h>
#include <Core/Names.h>
#include <re2/re2.h>
@ -21,7 +22,7 @@ namespace DB
class WriteBufferFromHTTPServerResponse;
typedef std::shared_ptr<const re2::RE2> CompiledRegexPtr;
using CompiledRegexPtr = std::shared_ptr<const re2::RE2>;
class HTTPHandler : public Poco::Net::HTTPRequestHandler
{

View File

@ -4,6 +4,7 @@
#include <re2/stringpiece.h>
#include <common/find_symbols.h>
#include <Poco/StringTokenizer.h>
#include <Poco/Util/LayeredConfiguration.h>
#include "HTTPHandler.h"
#include "NotFoundHandler.h"

View File

@ -6,6 +6,7 @@
#include <re2/stringpiece.h>
#include <Poco/StringTokenizer.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <common/find_symbols.h>

View File

@ -1,14 +1,22 @@
#pragma once
#include <Poco/Logger.h>
#include <Poco/Util/LayeredConfiguration.h>
namespace Poco
{
#include <Interpreters/Context.h>
namespace Util
{
class LayeredConfiguration;
}
class Logger;
}
namespace DB
{
class Context;
class IServer
{
public:

View File

@ -3,6 +3,7 @@
#include <Poco/Net/HTTPBasicCredentials.h>
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <common/logger_useful.h>
#include <Common/HTMLForm.h>
#include <Common/setThreadName.h>

View File

@ -4,8 +4,7 @@
#include <Poco/Util/Application.h>
#include <common/logger_useful.h>
#include <ext/scope_guard.h>
#include "IServer.h"
#include "MySQLHandler.h"
#include <Server/MySQLHandler.h>
#if USE_SSL
# include <Poco/Net/SSLManager.h>

View File

@ -2,7 +2,8 @@
#include <Poco/Net/TCPServerConnectionFactory.h>
#include <atomic>
#include "IServer.h"
#include <memory>
#include <Server/IServer.h>
#if !defined(ARCADIA_BUILD)
# include <Common/config.h>

View File

@ -6,6 +6,7 @@
#include <Poco/Net/HTTPServerRequest.h>
#include <Poco/Net/HTTPServerResponse.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>

View File

@ -4,6 +4,8 @@
#include <Poco/Net/HTTPRequestHandler.h>
#include <Common/StringUtils/StringUtils.h>
#include <Core/Types.h>
#include <IO/WriteBuffer.h>
namespace DB

View File

@ -262,8 +262,8 @@ void TCPHandler::runImpl()
else if (state.need_receive_data_for_input)
{
/// It is special case for input(), all works for reading data from client will be done in callbacks.
/// state.io.in is NullAndDoCopyBlockInputStream so read it once.
state.io.in->read();
auto executor = state.io.pipeline.execute();
executor->execute(state.io.pipeline.getNumThreads());
state.io.onFinish();
}
else if (state.io.pipeline.initialized())

View File

@ -3,8 +3,8 @@
#include <Poco/Net/TCPServerConnectionFactory.h>
#include <Poco/Net/NetException.h>
#include <common/logger_useful.h>
#include "IServer.h"
#include "TCPHandler.h"
#include <Server/IServer.h>
#include <Server/TCPHandler.h>
namespace Poco { class Logger; }

22
src/Server/ya.make Normal file
View File

@ -0,0 +1,22 @@
LIBRARY()
PEERDIR(
clickhouse/src/Common
contrib/libs/poco/Util
)
SRCS(
HTTPHandler.cpp
HTTPHandlerFactory.cpp
InterserverIOHTTPHandler.cpp
MySQLHandler.cpp
MySQLHandlerFactory.cpp
NotFoundHandler.cpp
PrometheusMetricsWriter.cpp
PrometheusRequestHandler.cpp
ReplicasStatusHandler.cpp
StaticRequestHandler.cpp
TCPHandler.cpp
)
END()

View File

@ -110,8 +110,9 @@ void StorageDistributedDirectoryMonitor::flushAllData()
{
if (!quit)
{
CurrentMetrics::Increment metric_pending_files{CurrentMetrics::DistributedFilesToInsert, 0};
std::unique_lock lock{mutex};
processFiles();
processFiles(metric_pending_files);
}
}
@ -131,6 +132,9 @@ void StorageDistributedDirectoryMonitor::run()
{
std::unique_lock lock{mutex};
/// This metric will be updated with the number of pending files later.
CurrentMetrics::Increment metric_pending_files{CurrentMetrics::DistributedFilesToInsert, 0};
bool do_sleep = false;
while (!quit)
{
@ -139,7 +143,7 @@ void StorageDistributedDirectoryMonitor::run()
{
try
{
do_sleep = !processFiles();
do_sleep = !processFiles(metric_pending_files);
}
catch (...)
{
@ -222,7 +226,7 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
}
bool StorageDistributedDirectoryMonitor::processFiles()
bool StorageDistributedDirectoryMonitor::processFiles(CurrentMetrics::Increment & metric_pending_files)
{
std::map<UInt64, std::string> files;
@ -236,14 +240,16 @@ bool StorageDistributedDirectoryMonitor::processFiles()
files[parse<UInt64>(file_path.getBaseName())] = file_path_str;
}
/// Note: the value of this metric will be kept if this function will throw an exception.
/// This is needed, because in case of exception, files still pending.
metric_pending_files.changeTo(files.size());
if (files.empty())
return false;
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedFilesToInsert, CurrentMetrics::Value(files.size())};
if (should_batch_inserts)
{
processFilesWithBatching(files);
processFilesWithBatching(files, metric_pending_files);
}
else
{
@ -252,14 +258,14 @@ bool StorageDistributedDirectoryMonitor::processFiles()
if (quit)
return true;
processFile(file.second);
processFile(file.second, metric_pending_files);
}
}
return true;
}
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path, CurrentMetrics::Increment & metric_pending_files)
{
LOG_TRACE(log, "Started processing `{}`", file_path);
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(storage.global_context->getSettingsRef());
@ -289,6 +295,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
}
Poco::File{file_path}.remove();
metric_pending_files.sub();
LOG_TRACE(log, "Finished processing `{}`", file_path);
}
@ -584,7 +591,9 @@ bool StorageDistributedDirectoryMonitor::scheduleAfter(size_t ms)
return task_handle->scheduleAfter(ms, false);
}
void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map<UInt64, std::string> & files)
void StorageDistributedDirectoryMonitor::processFilesWithBatching(
const std::map<UInt64, std::string> & files,
CurrentMetrics::Increment & metric_pending_files)
{
std::unordered_set<UInt64> file_indices_to_skip;
@ -596,6 +605,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
batch.readText(in);
file_indices_to_skip.insert(batch.file_indices.begin(), batch.file_indices.end());
batch.send();
metric_pending_files.sub(batch.file_indices.size());
}
std::unordered_map<BatchHeader, Batch, BatchHeader::Hash> header_to_batch;
@ -656,13 +666,17 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
batch.total_bytes += total_bytes;
if (batch.isEnoughSize())
{
batch.send();
metric_pending_files.sub(batch.file_indices.size());
}
}
for (auto & kv : header_to_batch)
{
Batch & batch = kv.second;
batch.send();
metric_pending_files.sub(batch.file_indices.size());
}
/// current_batch.txt will not exist if there was no send

View File

@ -9,6 +9,8 @@
#include <IO/ReadBufferFromFile.h>
namespace CurrentMetrics { class Increment; }
namespace DB
{
@ -37,9 +39,9 @@ public:
bool scheduleAfter(size_t ms);
private:
void run();
bool processFiles();
void processFile(const std::string & file_path);
void processFilesWithBatching(const std::map<UInt64, std::string> & files);
bool processFiles(CurrentMetrics::Increment & metric_pending_files);
void processFile(const std::string & file_path, CurrentMetrics::Increment & metric_pending_files);
void processFilesWithBatching(const std::map<UInt64, std::string> & files, CurrentMetrics::Increment & metric_pending_files);
static bool isFileBrokenErrorCode(int code);
void markAsBroken(const std::string & file_path) const;

Some files were not shown because too many files have changed in this diff Show More