Merge branch 'master' into remove-strange-code-fix-19283

This commit is contained in:
Alexey Milovidov 2021-04-18 03:22:10 +03:00
commit c669ac81a0
130 changed files with 2905 additions and 412 deletions

View File

@ -271,9 +271,13 @@ struct integer<Bits, Signed>::_impl
/// As to_Integral does a static_cast to int64_t, it may result in UB.
/// The necessary check here is that long double has enough significant (mantissa) bits to store the
/// int64_t max value precisely.
//TODO Be compatible with Apple aarch64
#if not (defined(__APPLE__) && defined(__aarch64__))
static_assert(LDBL_MANT_DIG >= 64,
"On your system long double has less than 64 precision bits,"
"which may result in UB when initializing double from int64_t");
#endif
if ((rhs > 0 && rhs < static_cast<long double>(max_int)) || (rhs < 0 && rhs > static_cast<long double>(min_int)))
{

View File

@ -1,7 +1,7 @@
if (CMAKE_SYSTEM_PROCESSOR MATCHES "amd64|x86_64")
set (ARCH_AMD64 1)
endif ()
if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(aarch64.*|AARCH64.*)")
if (CMAKE_SYSTEM_PROCESSOR MATCHES "^(aarch64.*|AARCH64.*|arm64.*|ARM64.*)")
set (ARCH_AARCH64 1)
endif ()
if (ARCH_AARCH64 OR CMAKE_SYSTEM_PROCESSOR MATCHES "arm")

View File

@ -4,6 +4,9 @@ set (DEFAULT_LIBS "${DEFAULT_LIBS} ${COVERAGE_OPTION} -lc -lm -lpthread -ldl")
if (COMPILER_GCC)
set (DEFAULT_LIBS "${DEFAULT_LIBS} -lgcc_eh")
if (ARCH_AARCH64)
set (DEFAULT_LIBS "${DEFAULT_LIBS} -lgcc")
endif ()
endif ()
message(STATUS "Default libraries: ${DEFAULT_LIBS}")

View File

@ -0,0 +1,14 @@
set (CMAKE_SYSTEM_NAME "Darwin")
set (CMAKE_SYSTEM_PROCESSOR "aarch64")
set (CMAKE_C_COMPILER_TARGET "aarch64-apple-darwin")
set (CMAKE_CXX_COMPILER_TARGET "aarch64-apple-darwin")
set (CMAKE_ASM_COMPILER_TARGET "aarch64-apple-darwin")
set (CMAKE_OSX_SYSROOT "${CMAKE_CURRENT_LIST_DIR}/../toolchain/darwin-aarch64")
set (CMAKE_TRY_COMPILE_TARGET_TYPE STATIC_LIBRARY) # disable linkage check - it doesn't work in CMake
set (HAS_PRE_1970_EXITCODE "0" CACHE STRING "Result from TRY_RUN" FORCE)
set (HAS_PRE_1970_EXITCODE__TRYRUN_OUTPUT "" CACHE STRING "Output from TRY_RUN" FORCE)
set (HAS_POST_2038_EXITCODE "0" CACHE STRING "Result from TRY_RUN" FORCE)
set (HAS_POST_2038_EXITCODE__TRYRUN_OUTPUT "" CACHE STRING "Output from TRY_RUN" FORCE)

View File

@ -64,7 +64,8 @@ if (NOT OPENLDAP_FOUND AND NOT MISSING_INTERNAL_LDAP_LIBRARY)
( "${_system_name}" STREQUAL "linux" AND "${_system_processor}" STREQUAL "aarch64" ) OR
( "${_system_name}" STREQUAL "linux" AND "${_system_processor}" STREQUAL "ppc64le" ) OR
( "${_system_name}" STREQUAL "freebsd" AND "${_system_processor}" STREQUAL "x86_64" ) OR
( "${_system_name}" STREQUAL "darwin" AND "${_system_processor}" STREQUAL "x86_64" )
( "${_system_name}" STREQUAL "darwin" AND "${_system_processor}" STREQUAL "x86_64" ) OR
( "${_system_name}" STREQUAL "darwin" AND "${_system_processor}" STREQUAL "aarch64" )
)
set (_ldap_supported_platform TRUE)
endif ()

View File

@ -1,3 +1,7 @@
if (OS_DARWIN AND ARCH_AARCH64)
set (ENABLE_ROCKSDB OFF CACHE INTERNAL "")
endif()
option(ENABLE_ROCKSDB "Enable ROCKSDB" ${ENABLE_LIBRARIES})
if (NOT ENABLE_ROCKSDB)

2
contrib/boost vendored

@ -1 +1 @@
Subproject commit ee24fa55bc46e4d2ce7d0d052cc5a0d9b1be8c36
Subproject commit a8d43d3142cc6b26fc55bec33f7f6edb1156ab7a

View File

@ -1,10 +1,13 @@
if (SANITIZE OR NOT (ARCH_AMD64 OR ARCH_ARM OR ARCH_PPC64LE) OR NOT (OS_LINUX OR OS_FREEBSD OR OS_DARWIN))
if (SANITIZE OR NOT (
((OS_LINUX OR OS_FREEBSD) AND (ARCH_AMD64 OR ARCH_ARM OR ARCH_PPC64LE)) OR
(OS_DARWIN AND CMAKE_BUILD_TYPE STREQUAL "RelWithDebInfo")
))
if (ENABLE_JEMALLOC)
message (${RECONFIGURE_MESSAGE_LEVEL}
"jemalloc is disabled implicitly: it doesn't work with sanitizers and can only be used with x86_64, aarch64 or ppc64le on linux or freebsd.")
endif()
"jemalloc is disabled implicitly: it doesn't work with sanitizers and can only be used with x86_64, aarch64, or ppc64le Linux or FreeBSD builds and RelWithDebInfo macOS builds.")
endif ()
set (ENABLE_JEMALLOC OFF)
else()
else ()
option (ENABLE_JEMALLOC "Enable jemalloc allocator" ${ENABLE_LIBRARIES})
endif ()
@ -34,9 +37,9 @@ if (OS_LINUX)
# avoid spurious latencies and additional work associated with
# MADV_DONTNEED. See
# https://github.com/ClickHouse/ClickHouse/issues/11121 for motivation.
set (JEMALLOC_CONFIG_MALLOC_CONF "percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:10000")
set (JEMALLOC_CONFIG_MALLOC_CONF "percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000")
else()
set (JEMALLOC_CONFIG_MALLOC_CONF "oversize_threshold:0,muzzy_decay_ms:10000")
set (JEMALLOC_CONFIG_MALLOC_CONF "oversize_threshold:0,muzzy_decay_ms:5000,dirty_decay_ms:5000")
endif()
# CACHE variable is empty, to allow changing defaults without necessity
# to purge cache

View File

@ -42,7 +42,7 @@
* total number of bits in a pointer, e.g. on x64, for which the uppermost 16
* bits are the same as bit 47.
*/
#define LG_VADDR 48
#define LG_VADDR 64
/* Defined if C11 atomics are available. */
#define JEMALLOC_C11_ATOMICS 1
@ -101,11 +101,6 @@
*/
#define JEMALLOC_HAVE_MACH_ABSOLUTE_TIME 1
/*
* Defined if clock_gettime(CLOCK_REALTIME, ...) is available.
*/
#define JEMALLOC_HAVE_CLOCK_REALTIME 1
/*
* Defined if _malloc_thread_cleanup() exists. At least in the case of
* FreeBSD, pthread_key_create() allocates, which if used during malloc
@ -181,14 +176,14 @@
/* #undef LG_QUANTUM */
/* One page is 2^LG_PAGE bytes. */
#define LG_PAGE 16
#define LG_PAGE 14
/*
* One huge page is 2^LG_HUGEPAGE bytes. Note that this is defined even if the
* system does not explicitly support huge pages; system calls that require
* explicit huge page support are separately configured.
*/
#define LG_HUGEPAGE 29
#define LG_HUGEPAGE 21
/*
* If defined, adjacent virtual memory mappings with identical attributes
@ -356,7 +351,7 @@
/* #undef JEMALLOC_EXPORT */
/* config.malloc_conf options string. */
#define JEMALLOC_CONFIG_MALLOC_CONF "@JEMALLOC_CONFIG_MALLOC_CONF@"
#define JEMALLOC_CONFIG_MALLOC_CONF ""
/* If defined, jemalloc takes the malloc/free/etc. symbol names. */
/* #undef JEMALLOC_IS_MALLOC */

View File

@ -66,7 +66,7 @@
#cmakedefine WITH_SASL_OAUTHBEARER 1
#cmakedefine WITH_SASL_CYRUS 1
// crc32chw
#if !defined(__PPC__) && (!defined(__aarch64__) || defined(__ARM_FEATURE_CRC32))
#if !defined(__PPC__) && (!defined(__aarch64__) || defined(__ARM_FEATURE_CRC32)) && !(defined(__aarch64__) && defined(__APPLE__))
#define WITH_CRC32C_HW 1
#endif
// regex

View File

@ -0,0 +1,63 @@
/* include/lber_types.h. Generated from lber_types.hin by configure. */
/* $OpenLDAP$ */
/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
*
* Copyright 1998-2020 The OpenLDAP Foundation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted only as authorized by the OpenLDAP
* Public License.
*
* A copy of this license is available in file LICENSE in the
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>.
*/
/*
* LBER types
*/
#ifndef _LBER_TYPES_H
#define _LBER_TYPES_H
#include <ldap_cdefs.h>
LDAP_BEGIN_DECL
/* LBER boolean, enum, integers (32 bits or larger) */
#define LBER_INT_T int
/* LBER tags (32 bits or larger) */
#define LBER_TAG_T long
/* LBER socket descriptor */
#define LBER_SOCKET_T int
/* LBER lengths (32 bits or larger) */
#define LBER_LEN_T long
/* ------------------------------------------------------------ */
/* booleans, enumerations, and integers */
typedef LBER_INT_T ber_int_t;
/* signed and unsigned versions */
typedef signed LBER_INT_T ber_sint_t;
typedef unsigned LBER_INT_T ber_uint_t;
/* tags */
typedef unsigned LBER_TAG_T ber_tag_t;
/* "socket" descriptors */
typedef LBER_SOCKET_T ber_socket_t;
/* lengths */
typedef unsigned LBER_LEN_T ber_len_t;
/* signed lengths */
typedef signed LBER_LEN_T ber_slen_t;
LDAP_END_DECL
#endif /* _LBER_TYPES_H */

View File

@ -0,0 +1,74 @@
/* include/ldap_config.h. Generated from ldap_config.hin by configure. */
/* $OpenLDAP$ */
/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
*
* Copyright 1998-2020 The OpenLDAP Foundation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted only as authorized by the OpenLDAP
* Public License.
*
* A copy of this license is available in file LICENSE in the
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>.
*/
/*
* This file works in conjunction with OpenLDAP configure system.
* If you do no like the values below, adjust your configure options.
*/
#ifndef _LDAP_CONFIG_H
#define _LDAP_CONFIG_H
/* directory separator */
#ifndef LDAP_DIRSEP
#ifndef _WIN32
#define LDAP_DIRSEP "/"
#else
#define LDAP_DIRSEP "\\"
#endif
#endif
/* directory for temporary files */
#if defined(_WIN32)
# define LDAP_TMPDIR "C:\\." /* we don't have much of a choice */
#elif defined( _P_tmpdir )
# define LDAP_TMPDIR _P_tmpdir
#elif defined( P_tmpdir )
# define LDAP_TMPDIR P_tmpdir
#elif defined( _PATH_TMPDIR )
# define LDAP_TMPDIR _PATH_TMPDIR
#else
# define LDAP_TMPDIR LDAP_DIRSEP "tmp"
#endif
/* directories */
#ifndef LDAP_BINDIR
#define LDAP_BINDIR "/tmp/ldap-prefix/bin"
#endif
#ifndef LDAP_SBINDIR
#define LDAP_SBINDIR "/tmp/ldap-prefix/sbin"
#endif
#ifndef LDAP_DATADIR
#define LDAP_DATADIR "/tmp/ldap-prefix/share/openldap"
#endif
#ifndef LDAP_SYSCONFDIR
#define LDAP_SYSCONFDIR "/tmp/ldap-prefix/etc/openldap"
#endif
#ifndef LDAP_LIBEXECDIR
#define LDAP_LIBEXECDIR "/tmp/ldap-prefix/libexec"
#endif
#ifndef LDAP_MODULEDIR
#define LDAP_MODULEDIR "/tmp/ldap-prefix/libexec/openldap"
#endif
#ifndef LDAP_RUNDIR
#define LDAP_RUNDIR "/tmp/ldap-prefix/var"
#endif
#ifndef LDAP_LOCALEDIR
#define LDAP_LOCALEDIR ""
#endif
#endif /* _LDAP_CONFIG_H */

View File

@ -0,0 +1,61 @@
/* include/ldap_features.h. Generated from ldap_features.hin by configure. */
/* $OpenLDAP$ */
/* This work is part of OpenLDAP Software <http://www.openldap.org/>.
*
* Copyright 1998-2020 The OpenLDAP Foundation.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted only as authorized by the OpenLDAP
* Public License.
*
* A copy of this license is available in file LICENSE in the
* top-level directory of the distribution or, alternatively, at
* <http://www.OpenLDAP.org/license.html>.
*/
/*
* LDAP Features
*/
#ifndef _LDAP_FEATURES_H
#define _LDAP_FEATURES_H 1
/* OpenLDAP API version macros */
#define LDAP_VENDOR_VERSION 20501
#define LDAP_VENDOR_VERSION_MAJOR 2
#define LDAP_VENDOR_VERSION_MINOR 5
#define LDAP_VENDOR_VERSION_PATCH X
/*
** WORK IN PROGRESS!
**
** OpenLDAP reentrancy/thread-safeness should be dynamically
** checked using ldap_get_option().
**
** The -lldap implementation is not thread-safe.
**
** The -lldap_r implementation is:
** LDAP_API_FEATURE_THREAD_SAFE (basic thread safety)
** but also be:
** LDAP_API_FEATURE_SESSION_THREAD_SAFE
** LDAP_API_FEATURE_OPERATION_THREAD_SAFE
**
** The preprocessor flag LDAP_API_FEATURE_X_OPENLDAP_THREAD_SAFE
** can be used to determine if -lldap_r is available at compile
** time. You must define LDAP_THREAD_SAFE if and only if you
** link with -lldap_r.
**
** If you fail to define LDAP_THREAD_SAFE when linking with
** -lldap_r or define LDAP_THREAD_SAFE when linking with -lldap,
** provided header definitions and declarations may be incorrect.
**
*/
/* is -lldap_r available or not */
#define LDAP_API_FEATURE_X_OPENLDAP_THREAD_SAFE 1
/* LDAP v2 Referrals */
/* #undef LDAP_API_FEATURE_X_OPENLDAP_V2_REFERRALS */
#endif /* LDAP_FEATURES */

File diff suppressed because it is too large Load Diff

View File

@ -233,3 +233,10 @@ else ()
message (STATUS "Using Poco::Foundation: ${LIBRARY_POCO_FOUNDATION} ${INCLUDE_POCO_FOUNDATION}")
endif ()
if(OS_DARWIN AND ARCH_AARCH64)
target_compile_definitions (_poco_foundation
PRIVATE
POCO_NO_STAT64
)
endif()

View File

@ -142,14 +142,14 @@ if(CMAKE_SYSTEM_PROCESSOR MATCHES "^(powerpc|ppc)64")
endif(HAS_ALTIVEC)
endif(CMAKE_SYSTEM_PROCESSOR MATCHES "^(powerpc|ppc)64")
if(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|AARCH64")
if(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|AARCH64|arm64|ARM64")
CHECK_C_COMPILER_FLAG("-march=armv8-a+crc+crypto" HAS_ARMV8_CRC)
if(HAS_ARMV8_CRC)
message(STATUS " HAS_ARMV8_CRC yes")
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -march=armv8-a+crc+crypto -Wno-unused-function")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -march=armv8-a+crc+crypto -Wno-unused-function")
endif(HAS_ARMV8_CRC)
endif(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|AARCH64")
endif(CMAKE_SYSTEM_PROCESSOR MATCHES "aarch64|AARCH64|arm64|ARM64")
include(CheckCXXSourceCompiles)

2
contrib/zlib-ng vendored

@ -1 +1 @@
Subproject commit b82d3497a5afc46dec3c5d07e4b163b169f251d7
Subproject commit 4039bb4623905e73c6e32a0c022f144bab87b2b3

View File

@ -367,6 +367,9 @@ function run_tests
# JSON functions
01666_blns
# Requires postgresql-client
01802_test_postgresql_protocol_with_row_policy
# Depends on AWS
01801_s3_cluster
)

View File

@ -21,14 +21,14 @@ function start()
-- --path /var/lib/clickhouse1/ --logger.stderr /var/log/clickhouse-server/stderr1.log \
--logger.log /var/log/clickhouse-server/clickhouse-server1.log --logger.errorlog /var/log/clickhouse-server/clickhouse-server1.err.log \
--tcp_port 19000 --tcp_port_secure 19440 --http_port 18123 --https_port 18443 --interserver_http_port 19009 --tcp_with_proxy_port 19010 \
--mysql_port 19004 \
--mysql_port 19004 --postgresql_port 19005 \
--keeper_server.tcp_port 19181 --keeper_server.server_id 2
sudo -E -u clickhouse /usr/bin/clickhouse server --config /etc/clickhouse-server2/config.xml --daemon \
-- --path /var/lib/clickhouse2/ --logger.stderr /var/log/clickhouse-server/stderr2.log \
--logger.log /var/log/clickhouse-server/clickhouse-server2.log --logger.errorlog /var/log/clickhouse-server/clickhouse-server2.err.log \
--tcp_port 29000 --tcp_port_secure 29440 --http_port 28123 --https_port 28443 --interserver_http_port 29009 --tcp_with_proxy_port 29010 \
--mysql_port 29004 \
--mysql_port 29004 --postgresql_port 29005 \
--keeper_server.tcp_port 29181 --keeper_server.server_id 3
fi

View File

@ -28,7 +28,8 @@ RUN apt-get update -y \
tree \
unixodbc \
wget \
mysql-client=5.7*
mysql-client=5.7* \
postgresql-client
RUN pip3 install numpy scipy pandas

View File

@ -44,7 +44,7 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
-- --path /var/lib/clickhouse1/ --logger.stderr /var/log/clickhouse-server/stderr1.log \
--logger.log /var/log/clickhouse-server/clickhouse-server1.log --logger.errorlog /var/log/clickhouse-server/clickhouse-server1.err.log \
--tcp_port 19000 --tcp_port_secure 19440 --http_port 18123 --https_port 18443 --interserver_http_port 19009 --tcp_with_proxy_port 19010 \
--mysql_port 19004 \
--mysql_port 19004 --postgresql_port 19005 \
--keeper_server.tcp_port 19181 --keeper_server.server_id 2 \
--macros.replica r2 # It doesn't work :(
@ -52,7 +52,7 @@ if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]
-- --path /var/lib/clickhouse2/ --logger.stderr /var/log/clickhouse-server/stderr2.log \
--logger.log /var/log/clickhouse-server/clickhouse-server2.log --logger.errorlog /var/log/clickhouse-server/clickhouse-server2.err.log \
--tcp_port 29000 --tcp_port_secure 29440 --http_port 28123 --https_port 28443 --interserver_http_port 29009 --tcp_with_proxy_port 29010 \
--mysql_port 29004 \
--mysql_port 29004 --postgresql_port 29005 \
--keeper_server.tcp_port 29181 --keeper_server.server_id 3 \
--macros.shard s2 # It doesn't work :(

View File

@ -5,12 +5,13 @@ toc_title: Build on Mac OS X
# How to Build ClickHouse on Mac OS X {#how-to-build-clickhouse-on-mac-os-x}
Build should work on x86_64 (Intel) based macOS 10.15 (Catalina) and higher with recent Xcode's native AppleClang, or Homebrew's vanilla Clang or GCC compilers.
Build should work on x86_64 (Intel) and arm64 (Apple Silicon) based macOS 10.15 (Catalina) and higher with recent Xcode's native AppleClang, or Homebrew's vanilla Clang or GCC compilers.
## Install Homebrew {#install-homebrew}
``` bash
$ /bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
/bin/bash -c "$(curl -fsSL https://raw.githubusercontent.com/Homebrew/install/HEAD/install.sh)"
# ...and follow the printed instructions on any additional steps required to complete the installation.
```
## Install Xcode and Command Line Tools {#install-xcode-and-command-line-tools}
@ -22,8 +23,8 @@ Open it at least once to accept the end-user license agreement and automatically
Then, make sure that the latest Comman Line Tools are installed and selected in the system:
``` bash
$ sudo rm -rf /Library/Developer/CommandLineTools
$ sudo xcode-select --install
sudo rm -rf /Library/Developer/CommandLineTools
sudo xcode-select --install
```
Reboot.
@ -31,14 +32,15 @@ Reboot.
## Install Required Compilers, Tools, and Libraries {#install-required-compilers-tools-and-libraries}
``` bash
$ brew update
$ brew install cmake ninja libtool gettext llvm gcc
brew update
brew install cmake ninja libtool gettext llvm gcc
```
## Checkout ClickHouse Sources {#checkout-clickhouse-sources}
``` bash
$ git clone --recursive git@github.com:ClickHouse/ClickHouse.git # or https://github.com/ClickHouse/ClickHouse.git
git clone --recursive git@github.com:ClickHouse/ClickHouse.git
# ...alternatively, you can use https://github.com/ClickHouse/ClickHouse.git as the repo URL.
```
## Build ClickHouse {#build-clickhouse}
@ -46,37 +48,37 @@ $ git clone --recursive git@github.com:ClickHouse/ClickHouse.git # or https://gi
To build using Xcode's native AppleClang compiler:
``` bash
$ cd ClickHouse
$ rm -rf build
$ mkdir build
$ cd build
$ cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo -DENABLE_JEMALLOC=OFF ..
$ cmake --build . --config RelWithDebInfo
$ cd ..
cd ClickHouse
rm -rf build
mkdir build
cd build
cmake -DCMAKE_BUILD_TYPE=RelWithDebInfo ..
cmake --build . --config RelWithDebInfo
cd ..
```
To build using Homebrew's vanilla Clang compiler:
``` bash
$ cd ClickHouse
$ rm -rf build
$ mkdir build
$ cd build
$ cmake -DCMAKE_C_COMPILER=$(brew --prefix llvm)/bin/clang -DCMAKE_CXX_COMPILER==$(brew --prefix llvm)/bin/clang++ -DCMAKE_BUILD_TYPE=RelWithDebInfo -DENABLE_JEMALLOC=OFF ..
$ cmake --build . --config RelWithDebInfo
$ cd ..
cd ClickHouse
rm -rf build
mkdir build
cd build
cmake -DCMAKE_C_COMPILER=$(brew --prefix llvm)/bin/clang -DCMAKE_CXX_COMPILER=$(brew --prefix llvm)/bin/clang++ -DCMAKE_BUILD_TYPE=RelWithDebInfo ..
cmake --build . --config RelWithDebInfo
cd ..
```
To build using Homebrew's vanilla GCC compiler:
``` bash
$ cd ClickHouse
$ rm -rf build
$ mkdir build
$ cd build
$ cmake -DCMAKE_C_COMPILER=$(brew --prefix gcc)/bin/gcc-10 -DCMAKE_CXX_COMPILER=$(brew --prefix gcc)/bin/g++-10 -DCMAKE_BUILD_TYPE=RelWithDebInfo -DENABLE_JEMALLOC=OFF ..
$ cmake --build . --config RelWithDebInfo
$ cd ..
cd ClickHouse
rm -rf build
mkdir build
cd build
cmake -DCMAKE_C_COMPILER=$(brew --prefix gcc)/bin/gcc-10 -DCMAKE_CXX_COMPILER=$(brew --prefix gcc)/bin/g++-10 -DCMAKE_BUILD_TYPE=RelWithDebInfo ..
cmake --build . --config RelWithDebInfo
cd ..
```
## Caveats {#caveats}
@ -115,7 +117,7 @@ To do so, create the `/Library/LaunchDaemons/limit.maxfiles.plist` file with the
Execute the following command:
``` bash
$ sudo chown root:wheel /Library/LaunchDaemons/limit.maxfiles.plist
sudo chown root:wheel /Library/LaunchDaemons/limit.maxfiles.plist
```
Reboot.

View File

@ -18,11 +18,17 @@ Engine parameters:
- `num_layers` Parallelism layer. Physically, the table will be represented as `num_layers` of independent buffers. Recommended value: 16.
- `min_time`, `max_time`, `min_rows`, `max_rows`, `min_bytes`, and `max_bytes` Conditions for flushing data from the buffer.
Optional engine parameters:
- `flush_time`, `flush_rows`, `flush_bytes` Conditions for flushing data from the buffer, that will happen only in background (ommited or zero means no `flush*` parameters).
Data is flushed from the buffer and written to the destination table if all the `min*` conditions or at least one `max*` condition are met.
- `min_time`, `max_time` Condition for the time in seconds from the moment of the first write to the buffer.
- `min_rows`, `max_rows` Condition for the number of rows in the buffer.
- `min_bytes`, `max_bytes` Condition for the number of bytes in the buffer.
Also if at least one `flush*` condition are met flush initiated in background, this is different from `max*`, since `flush*` allows you to configure background flushes separately to avoid adding latency for `INSERT` (into `Buffer`) queries.
- `min_time`, `max_time`, `flush_time` Condition for the time in seconds from the moment of the first write to the buffer.
- `min_rows`, `max_rows`, `flush_rows` Condition for the number of rows in the buffer.
- `min_bytes`, `max_bytes`, `flush_bytes` Condition for the number of bytes in the buffer.
During the write operation, data is inserted to a `num_layers` number of random buffers. Or, if the data part to insert is large enough (greater than `max_rows` or `max_bytes`), it is written directly to the destination table, omitting the buffer.

View File

@ -1213,6 +1213,62 @@ SELECT arrayFill(x -> not isNull(x), [1, null, 3, 11, 12, null, null, 5, 6, 14,
Note that the `arrayFill` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You must pass a lambda function to it as the first argument, and it cant be omitted.
## arrayFold(func, arr1, …, init) {#array-fold}
Returns an result of [folding](https://en.wikipedia.org/wiki/Fold_(higher-order_function)) arrays and value `init` using function `func`.
I.e. result of calculation `func(arr1[n], …, func(arr1[n - 1], …, func(…, func(arr1[2], …, func(arr1[1], …, init)))))`.
Note that the `arrayMap` is a [higher-order function](../../sql-reference/functions/index.md#higher-order-functions). You must pass a lambda function to it as the first argument, and it cant be omitted.
**Arguments**
- `func` — The lambda function with `n+1` arguments (where `n` is number of input arrays), first `n` arguments are for
current elements of input arrays, and last argument is for current value of accumulator.
- `arr` — Any number of [arrays](../../sql-reference/data-types/array.md).
- `init` - Initial value of accumulator.
**Returned value**
Final value of accumulator.
**Examples**
The following example shows how to acquire product and sum of elements of array:
``` sql
SELECT arrayMap(x, accum -> (accum.1 * x, accum.2 + x), [1, 2, 3], (0, 1)) as res;
```
``` text
┌─res───────┐
│ (120, 15) │
└───────────┘
```
The following example shows how to reverse elements of array:
``` sql
SELECT arrayFold(x, acc -> arrayPushFront(acc, x), [1,2,3,4,5], emptyArrayUInt64()) as res;
```
``` text
┌─res─────────┐
│ [5,4,3,2,1] │
└─────────────┘
```
Folding may be used to access of already passed elements due to function calculation, for example:
``` sql
SELECT arrayFold(x, acc -> (x, concat(acc.2, toString(acc.1), ',')), [1,2], (0,''))
```
``` text
┌─res────────┐
│ (2,'0,1,') │
└────────────┘
```
## arrayReverseFill(func, arr1, …) {#array-reverse-fill}
Scan through `arr1` from the last element to the first element and replace `arr1[i]` by `arr1[i + 1]` if `func` returns 0. The last element of `arr1` will not be replaced.

View File

@ -16,7 +16,7 @@ The following operations with [partitions](../../../engines/table-engines/merget
- [CLEAR COLUMN IN PARTITION](#alter_clear-column-partition) — Resets the value of a specified column in a partition.
- [CLEAR INDEX IN PARTITION](#alter_clear-index-partition) — Resets the specified secondary index in a partition.
- [FREEZE PARTITION](#alter_freeze-partition) — Creates a backup of a partition.
- [FETCH PARTITION](#alter_fetch-partition) — Downloads a partition from another server.
- [FETCH PARTITION\|PART](#alter_fetch-partition) — Downloads a part or partition from another server.
- [MOVE PARTITION\|PART](#alter_move-partition) — Move partition/data part to another disk or volume.
<!-- -->
@ -198,29 +198,35 @@ ALTER TABLE table_name CLEAR INDEX index_name IN PARTITION partition_expr
The query works similar to `CLEAR COLUMN`, but it resets an index instead of a column data.
## FETCH PARTITION {#alter_fetch-partition}
## FETCH PARTITION|PART {#alter_fetch-partition}
``` sql
ALTER TABLE table_name FETCH PARTITION partition_expr FROM 'path-in-zookeeper'
ALTER TABLE table_name FETCH PARTITION|PART partition_expr FROM 'path-in-zookeeper'
```
Downloads a partition from another server. This query only works for the replicated tables.
The query does the following:
1. Downloads the partition from the specified shard. In path-in-zookeeper you must specify a path to the shard in ZooKeeper.
1. Downloads the partition|part from the specified shard. In path-in-zookeeper you must specify a path to the shard in ZooKeeper.
2. Then the query puts the downloaded data to the `detached` directory of the `table_name` table. Use the [ATTACH PARTITION\|PART](#alter_attach-partition) query to add the data to the table.
For example:
1. FETCH PARTITION
``` sql
ALTER TABLE users FETCH PARTITION 201902 FROM '/clickhouse/tables/01-01/visits';
ALTER TABLE users ATTACH PARTITION 201902;
```
2. FETCH PART
``` sql
ALTER TABLE users FETCH PART 201901_2_2_0 FROM '/clickhouse/tables/01-01/visits';
ALTER TABLE users ATTACH PART 201901_2_2_0;
```
Note that:
- The `ALTER ... FETCH PARTITION` query isnt replicated. It places the partition to the `detached` directory only on the local server.
- The `ALTER ... FETCH PARTITION|PART` query isnt replicated. It places the part or partition to the `detached` directory only on the local server.
- The `ALTER TABLE ... ATTACH` query is replicated. It adds the data to all replicas. The data is added to one of the replicas from the `detached` directory, and to the others - from neighboring replicas.
Before downloading, the system checks if the partition exists and the table structure matches. The most appropriate replica is selected automatically from the healthy replicas.

View File

@ -5,39 +5,81 @@ toc_title: ROW POLICY
# CREATE ROW POLICY {#create-row-policy-statement}
Creates [filters for rows](../../../operations/access-rights.md#row-policy-management), which a user can read from a table.
Creates a [row policy](../../../operations/access-rights.md#row-policy-management), i.e. a filter used to determine which rows a user can read from a table.
Syntax:
``` sql
CREATE [ROW] POLICY [IF NOT EXISTS | OR REPLACE] policy_name1 [ON CLUSTER cluster_name1] ON [db1.]table1
[, policy_name2 [ON CLUSTER cluster_name2] ON [db2.]table2 ...]
[FOR SELECT] USING condition
[AS {PERMISSIVE | RESTRICTIVE}]
[FOR SELECT]
[USING condition]
[TO {role1 [, role2 ...] | ALL | ALL EXCEPT role1 [, role2 ...]}]
```
`ON CLUSTER` clause allows creating row policies on a cluster, see [Distributed DDL](../../../sql-reference/distributed-ddl.md).
## USING Clause {#create-row-policy-using}
## AS Clause {#create-row-policy-as}
Using this section you can create permissive or restrictive policies.
Permissive policy grants access to rows. Permissive policies which apply to the same table are combined together using the boolean `OR` operator. Policies are permissive by default.
Restrictive policy restricts access to rows. Restrictive policies which apply to the same table are combined together using the boolean `AND` operator.
Restrictive policies apply to rows that passed the permissive filters. If you set restrictive policies but no permissive policies, the user cant get any row from the table.
Allows to specify a condition to filter rows. An user will see a row if the condition is calculated to non-zero for the row.
## TO Clause {#create-row-policy-to}
In the section `TO` you can provide a mixed list of roles and users, for example, `CREATE ROW POLICY ... TO accountant, john@localhost`.
In the section `TO` you can provide a list of users and roles this policy should work for. For example, `CREATE ROW POLICY ... TO accountant, john@localhost`.
Keyword `ALL` means all the ClickHouse users including current user. Keywords `ALL EXCEPT` allow to exclude some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost`
Keyword `ALL` means all the ClickHouse users including current user. Keyword `ALL EXCEPT` allow to exclude some users from the all users list, for example, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost`
## Examples {#examples}
!!! note "Note"
If there are no row policies defined for a table then any user can `SELECT` all the row from the table. Defining one or more row policies for the table makes the access to the table depending on the row policies no matter if those row policies are defined for the current user or not. For example, the following policy
`CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter`
`CREATE ROW POLICY filter ON mydb.mytable FOR SELECT USING a<1000 TO accountant, john@localhost`
forbids the users `mira` and `peter` to see the rows with `b != 1`, and any non-mentioned user (e.g., the user `paul`) will see no rows from `mydb.table1` at all.
If that's not desirable it can't be fixed by adding one more row policy, like the following:
`CREATE ROW POLICY filter ON mydb.mytable FOR SELECT USING a<1000 TO ALL EXCEPT mira`
`CREATE ROW POLICY pol2 ON mydb.table1 USING 1 TO ALL EXCEPT mira, peter`
## AS Clause {#create-row-policy-as}
It's allowed to have more than one policy enabled on the same table for the same user at the one time. So we need a way to combine the conditions from multiple policies.
By default policies are combined using the boolean `OR` operator. For example, the following policies
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 TO peter, antonio
```
enables the user `peter` to see rows with either `b=1` or `c=2`.
The `AS` clause specifies how policies should be combined with other policies. Policies can be either permissive or restrictive. By default policies are permissive, which means they are combined using the boolean `OR` operator.
A policy can be defined as restrictive as an alternative. Restrictive policies are combined using the boolean `AND` operator.
Here is the general formula:
```
row_is_visible = (one or more of the permissive policies' conditions are non-zero) AND
(all of the restrictive policies's conditions are non-zero)
```
For example, the following policies
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio
```
enables the user `peter` to see rows only if both `b=1` AND `c=2`.
## ON CLUSTER Clause {#create-row-policy-on-cluster}
Allows creating row policies on a cluster, see [Distributed DDL](../../../sql-reference/distributed-ddl.md).
## Examples
`CREATE ROW POLICY filter1 ON mydb.mytable USING a<1000 TO accountant, john@localhost`
`CREATE ROW POLICY filter2 ON mydb.mytable USING a<1000 AND b=5 TO ALL EXCEPT mira`
`CREATE ROW POLICY filter3 ON mydb.mytable USING 1 TO admin`

View File

@ -91,7 +91,7 @@ Hierarchy of privileges:
- `ALTER ADD CONSTRAINT`
- `ALTER DROP CONSTRAINT`
- `ALTER TTL`
- `ALTER MATERIALIZE TTL`
- `ALTER MATERIALIZE TTL`
- `ALTER SETTINGS`
- `ALTER MOVE PARTITION`
- `ALTER FETCH PARTITION`
@ -102,9 +102,9 @@ Hierarchy of privileges:
- [CREATE](#grant-create)
- `CREATE DATABASE`
- `CREATE TABLE`
- `CREATE TEMPORARY TABLE`
- `CREATE VIEW`
- `CREATE DICTIONARY`
- `CREATE TEMPORARY TABLE`
- [DROP](#grant-drop)
- `DROP DATABASE`
- `DROP TABLE`
@ -150,7 +150,7 @@ Hierarchy of privileges:
- `SYSTEM RELOAD`
- `SYSTEM RELOAD CONFIG`
- `SYSTEM RELOAD DICTIONARY`
- `SYSTEM RELOAD EMBEDDED DICTIONARIES`
- `SYSTEM RELOAD EMBEDDED DICTIONARIES`
- `SYSTEM MERGES`
- `SYSTEM TTL MERGES`
- `SYSTEM FETCHES`
@ -276,10 +276,10 @@ Allows executing [ALTER](../../sql-reference/statements/alter/index.md) queries
- `ALTER ADD CONSTRAINT`. Level: `TABLE`. Aliases: `ADD CONSTRAINT`
- `ALTER DROP CONSTRAINT`. Level: `TABLE`. Aliases: `DROP CONSTRAINT`
- `ALTER TTL`. Level: `TABLE`. Aliases: `ALTER MODIFY TTL`, `MODIFY TTL`
- `ALTER MATERIALIZE TTL`. Level: `TABLE`. Aliases: `MATERIALIZE TTL`
- `ALTER MATERIALIZE TTL`. Level: `TABLE`. Aliases: `MATERIALIZE TTL`
- `ALTER SETTINGS`. Level: `TABLE`. Aliases: `ALTER SETTING`, `ALTER MODIFY SETTING`, `MODIFY SETTING`
- `ALTER MOVE PARTITION`. Level: `TABLE`. Aliases: `ALTER MOVE PART`, `MOVE PARTITION`, `MOVE PART`
- `ALTER FETCH PARTITION`. Level: `TABLE`. Aliases: `FETCH PARTITION`
- `ALTER FETCH PARTITION`. Level: `TABLE`. Aliases: `ALTER FETCH PART`, `FETCH PARTITION`, `FETCH PART`
- `ALTER FREEZE PARTITION`. Level: `TABLE`. Aliases: `FREEZE PARTITION`
- `ALTER VIEW` Level: `GROUP`
- `ALTER VIEW REFRESH`. Level: `VIEW`. Aliases: `ALTER LIVE VIEW REFRESH`, `REFRESH VIEW`
@ -304,9 +304,9 @@ Allows executing [CREATE](../../sql-reference/statements/create/index.md) and [A
- `CREATE`. Level: `GROUP`
- `CREATE DATABASE`. Level: `DATABASE`
- `CREATE TABLE`. Level: `TABLE`
- `CREATE TEMPORARY TABLE`. Level: `GLOBAL`
- `CREATE VIEW`. Level: `VIEW`
- `CREATE DICTIONARY`. Level: `DICTIONARY`
- `CREATE TEMPORARY TABLE`. Level: `GLOBAL`
**Notes**
@ -401,7 +401,7 @@ Allows a user to execute [SYSTEM](../../sql-reference/statements/system.md) quer
- `SYSTEM RELOAD`. Level: `GROUP`
- `SYSTEM RELOAD CONFIG`. Level: `GLOBAL`. Aliases: `RELOAD CONFIG`
- `SYSTEM RELOAD DICTIONARY`. Level: `GLOBAL`. Aliases: `SYSTEM RELOAD DICTIONARIES`, `RELOAD DICTIONARY`, `RELOAD DICTIONARIES`
- `SYSTEM RELOAD EMBEDDED DICTIONARIES`. Level: `GLOBAL`. Aliases: R`ELOAD EMBEDDED DICTIONARIES`
- `SYSTEM RELOAD EMBEDDED DICTIONARIES`. Level: `GLOBAL`. Aliases: `RELOAD EMBEDDED DICTIONARIES`
- `SYSTEM MERGES`. Level: `TABLE`. Aliases: `SYSTEM STOP MERGES`, `SYSTEM START MERGES`, `STOP MERGES`, `START MERGES`
- `SYSTEM TTL MERGES`. Level: `TABLE`. Aliases: `SYSTEM STOP TTL MERGES`, `SYSTEM START TTL MERGES`, `STOP TTL MERGES`, `START TTL MERGES`
- `SYSTEM FETCHES`. Level: `TABLE`. Aliases: `SYSTEM STOP FETCHES`, `SYSTEM START FETCHES`, `STOP FETCHES`, `START FETCHES`

View File

@ -1147,6 +1147,62 @@ SELECT arrayReverseFill(x -> not isNull(x), [1, null, 3, 11, 12, null, null, 5,
Функция `arrayReverseFill` является [функцией высшего порядка](../../sql-reference/functions/index.md#higher-order-functions) — в качестве первого аргумента ей нужно передать лямбда-функцию, и этот аргумент не может быть опущен.
## arrayFold(func, arr1, …, init) {#array-fold}
Возвращает результат [сворачивания](https://ru.wikipedia.org/wiki/%D0%A1%D0%B2%D1%91%D1%80%D1%82%D0%BA%D0%B0_%D1%81%D0%BF%D0%B8%D1%81%D0%BA%D0%B0) массивов и начального значения `init` с помощью функции `func`.
Т.е. результат вычисления `func(arr1[n], …, func(arr1[n - 1], …, func(…, func(arr1[2], …, func(arr1[1], …, init)))))`.
Функция `arrayFold` является [функцией высшего порядка](../../sql-reference/functions/index.md#higher-order-functions) — в качестве первого аргумента ей нужно передать лямбда-функцию, и этот аргумент не может быть опущен.
**Аргументы**
- `func` — лямбда-функция с `n+1` параметром (где `n` это количество входных массивов), причём первые `n` параметров
используются для текущих элементов входных массивов, а последний элемент для текущего значения аккумулятора.
- `arr` — произвольное количество [массивов](../../sql-reference/data-types/array.md).
- `init` - начальное значение аккумулятора.
**Возвращаемое значение**
Итоговое значение аккумулятора.
**Примеры**
Следующий пример показывает, как вычислить произведение и сумму элементов массива:
``` sql
SELECT arrayMap(x, accum -> (accum.1 * x, accum.2 + x), [1, 2, 3], (0, 1)) as res;
```
``` text
┌─res───────┐
│ (120, 15) │
└───────────┘
```
В этом примере показано, как обратить массив:
``` sql
SELECT arrayFold(x, acc -> arrayPushFront(acc, x), [1,2,3,4,5], emptyArrayUInt64()) as res;
```
``` text
┌─res─────────┐
│ [5,4,3,2,1] │
└─────────────┘
```
Свёртка может быть использована для доступа к уже пройденным в процессе вычисления элементам. Например:
``` sql
SELECT arrayFold(x, acc -> (x, concat(acc.2, toString(acc.1), ',')), [1,2], (0,''))
```
``` text
┌─res────────┐
│ (2,'0,1,') │
└────────────┘
```
## arraySplit(func, arr1, …) {#array-split}
Разделяет массив `arr1` на несколько. Если `func` возвращает не 0, то массив разделяется, а элемент помещается в левую часть. Массив не разбивается по первому элементу.
@ -1183,6 +1239,7 @@ SELECT arrayReverseSplit((x, y) -> y, [1, 2, 3, 4, 5], [1, 0, 0, 1, 0]) AS res
Функция `arrayReverseSplit` является [функцией высшего порядка](../../sql-reference/functions/index.md#higher-order-functions) — в качестве первого аргумента ей нужно передать лямбда-функцию, и этот аргумент не может быть опущен.
## arrayExists(\[func,\] arr1, …) {#arrayexistsfunc-arr1}
Возвращает 1, если существует хотя бы один элемент массива `arr`, для которого функция func возвращает не 0. Иначе возвращает 0.

View File

@ -5,7 +5,7 @@ toc_title: "Политика доступа"
# CREATE ROW POLICY {#create-row-policy-statement}
Создает [фильтры для строк](../../../operations/access-rights.md#row-policy-management), которые пользователь может прочесть из таблицы.
Создает [политики доступа к строкам](../../../operations/access-rights.md#row-policy-management), т.е. фильтры, которые определяют, какие строки пользователь может читать из таблицы.
Синтаксис:
@ -13,33 +13,74 @@ toc_title: "Политика доступа"
CREATE [ROW] POLICY [IF NOT EXISTS | OR REPLACE] policy_name1 [ON CLUSTER cluster_name1] ON [db1.]table1
[, policy_name2 [ON CLUSTER cluster_name2] ON [db2.]table2 ...]
[AS {PERMISSIVE | RESTRICTIVE}]
[FOR SELECT]
[USING condition]
[FOR SELECT] USING condition
[TO {role [,...] | ALL | ALL EXCEPT role [,...]}]
```
Секция `ON CLUSTER` позволяет создавать фильтры для строк на кластере, см. [Распределенные DDL запросы](../../../sql-reference/distributed-ddl.md).
## Секция USING {#create-row-policy-using}
## Секция AS {#create-row-policy-as}
С помощью данной секции можно создать политику разрешения или ограничения.
Политика разрешения предоставляет доступ к строкам. Разрешительные политики, которые применяются к одной таблице, объединяются с помощью логического оператора `OR`. Политики являются разрешительными по умолчанию.
Политика ограничения запрещает доступ к строкам. Ограничительные политики, которые применяются к одной таблице, объединяются логическим оператором `AND`.
Ограничительные политики применяются к строкам, прошедшим фильтр разрешительной политики. Если вы не зададите разрешительные политики, пользователь не сможет обращаться ни к каким строкам из таблицы.
Секция `USING` указывает условие для фильтрации строк. Пользователь может видеть строку, если это условие, вычисленное для строки, дает ненулевой результат.
## Секция TO {#create-row-policy-to}
В секции `TO` вы можете перечислить как роли, так и пользователей. Например, `CREATE ROW POLICY ... TO accountant, john@localhost`.
В секции `TO` перечисляются пользователи и роли, для которых должна действовать политика. Например, `CREATE ROW POLICY ... TO accountant, john@localhost`.
Ключевым словом `ALL` обозначаются все пользователи, включая текущего. Ключевые слова `ALL EXCEPT` позволяют исключить пользователей из списка всех пользователей. Например, `CREATE ROW POLICY ... TO ALL EXCEPT accountant, john@localhost`
!!! note "Note"
Если для таблицы не задано ни одной политики доступа к строкам, то любой пользователь может выполнить команду SELECT и получить все строки таблицы. Если определить хотя бы одну политику для таблицы, до доступ к строкам будет управляться этими политиками, причем для всех пользователей (даже для тех, для кого политики не определялись). Например, следующая политика
`CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter`
запретит пользователям `mira` и `peter` видеть строки с `b != 1`, и еще запретит всем остальным пользователям (например, пользователю `paul`) видеть какие-либо строки вообще из таблицы `mydb.table1`.
Если это нежелательно, такое поведение можно исправить, определив дополнительную политику:
`CREATE ROW POLICY pol2 ON mydb.table1 USING 1 TO ALL EXCEPT mira, peter`
## Секция AS {#create-row-policy-as}
Может быть одновременно активно более одной политики для одной и той же таблицы и одного и того же пользователя. Поэтому нам нужен способ комбинировать политики.
По умолчанию политики комбинируются с использованием логического оператора `OR`. Например, политики:
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 TO peter, antonio
```
разрешат пользователю с именем `peter` видеть строки, для которых будет верно `b=1` или `c=2`.
Секция `AS` указывает, как политики должны комбинироваться с другими политиками. Политики могут быть или разрешительными (`PERMISSIVE`), или ограничительными (`RESTRICTIVE`). По умолчанию политики создаются разрешительными (`PERMISSIVE`); такие политики комбинируются с использованием логического оператора `OR`.
Ограничительные (`RESTRICTIVE`) политики комбинируются с использованием логического оператора `AND`.
Общая формула выглядит так:
```
строка_видима = (одна или больше permissive-политик дала ненулевой результат проверки условия) И
(все restrictive-политики дали ненулевой результат проверки условия)
```
Например, политики
``` sql
CREATE ROW POLICY pol1 ON mydb.table1 USING b=1 TO mira, peter
CREATE ROW POLICY pol2 ON mydb.table1 USING c=2 AS RESTRICTIVE TO peter, antonio
```
разрешат пользователю с именем `peter` видеть только те строки, для которых будет одновременно `b=1` и `c=2`.
## Секция ON CLUSTER {#create-row-policy-on-cluster}
Секция `ON CLUSTER` позволяет создавать политики на кластере, см. [Распределенные DDL запросы](../../../sql-reference/distributed-ddl.md).
## Примеры
`CREATE ROW POLICY filter ON mydb.mytable FOR SELECT USING a<1000 TO accountant, john@localhost`
`CREATE ROW POLICY filter1 ON mydb.mytable USING a<1000 TO accountant, john@localhost`
`CREATE ROW POLICY filter ON mydb.mytable FOR SELECT USING a<1000 TO ALL EXCEPT mira`
`CREATE ROW POLICY filter2 ON mydb.mytable USING a<1000 AND b=5 TO ALL EXCEPT mira`
`CREATE ROW POLICY filter3 ON mydb.mytable USING 1 TO admin`
<!--hide-->

View File

@ -93,7 +93,7 @@ GRANT SELECT(x,y) ON db.table TO john WITH GRANT OPTION
- `ALTER ADD CONSTRAINT`
- `ALTER DROP CONSTRAINT`
- `ALTER TTL`
- `ALTER MATERIALIZE TTL`
- `ALTER MATERIALIZE TTL`
- `ALTER SETTINGS`
- `ALTER MOVE PARTITION`
- `ALTER FETCH PARTITION`
@ -104,9 +104,9 @@ GRANT SELECT(x,y) ON db.table TO john WITH GRANT OPTION
- [CREATE](#grant-create)
- `CREATE DATABASE`
- `CREATE TABLE`
- `CREATE TEMPORARY TABLE`
- `CREATE VIEW`
- `CREATE DICTIONARY`
- `CREATE TEMPORARY TABLE`
- [DROP](#grant-drop)
- `DROP DATABASE`
- `DROP TABLE`
@ -152,7 +152,7 @@ GRANT SELECT(x,y) ON db.table TO john WITH GRANT OPTION
- `SYSTEM RELOAD`
- `SYSTEM RELOAD CONFIG`
- `SYSTEM RELOAD DICTIONARY`
- `SYSTEM RELOAD EMBEDDED DICTIONARIES`
- `SYSTEM RELOAD EMBEDDED DICTIONARIES`
- `SYSTEM MERGES`
- `SYSTEM TTL MERGES`
- `SYSTEM FETCHES`
@ -279,7 +279,7 @@ GRANT INSERT(x,y) ON db.table TO john
- `ALTER ADD CONSTRAINT`. Уровень: `TABLE`. Алиасы: `ADD CONSTRAINT`
- `ALTER DROP CONSTRAINT`. Уровень: `TABLE`. Алиасы: `DROP CONSTRAINT`
- `ALTER TTL`. Уровень: `TABLE`. Алиасы: `ALTER MODIFY TTL`, `MODIFY TTL`
- `ALTER MATERIALIZE TTL`. Уровень: `TABLE`. Алиасы: `MATERIALIZE TTL`
- `ALTER MATERIALIZE TTL`. Уровень: `TABLE`. Алиасы: `MATERIALIZE TTL`
- `ALTER SETTINGS`. Уровень: `TABLE`. Алиасы: `ALTER SETTING`, `ALTER MODIFY SETTING`, `MODIFY SETTING`
- `ALTER MOVE PARTITION`. Уровень: `TABLE`. Алиасы: `ALTER MOVE PART`, `MOVE PARTITION`, `MOVE PART`
- `ALTER FETCH PARTITION`. Уровень: `TABLE`. Алиасы: `FETCH PARTITION`
@ -307,9 +307,9 @@ GRANT INSERT(x,y) ON db.table TO john
- `CREATE`. Уровень: `GROUP`
- `CREATE DATABASE`. Уровень: `DATABASE`
- `CREATE TABLE`. Уровень: `TABLE`
- `CREATE TEMPORARY TABLE`. Уровень: `GLOBAL`
- `CREATE VIEW`. Уровень: `VIEW`
- `CREATE DICTIONARY`. Уровень: `DICTIONARY`
- `CREATE TEMPORARY TABLE`. Уровень: `GLOBAL`
**Дополнительно**
@ -407,7 +407,7 @@ GRANT INSERT(x,y) ON db.table TO john
- `SYSTEM RELOAD`. Уровень: `GROUP`
- `SYSTEM RELOAD CONFIG`. Уровень: `GLOBAL`. Алиасы: `RELOAD CONFIG`
- `SYSTEM RELOAD DICTIONARY`. Уровень: `GLOBAL`. Алиасы: `SYSTEM RELOAD DICTIONARIES`, `RELOAD DICTIONARY`, `RELOAD DICTIONARIES`
- `SYSTEM RELOAD EMBEDDED DICTIONARIES`. Уровень: `GLOBAL`. Алиасы: `RELOAD EMBEDDED DICTIONARIES`
- `SYSTEM RELOAD EMBEDDED DICTIONARIES`. Уровень: `GLOBAL`. Алиасы: `RELOAD EMBEDDED DICTIONARIES`
- `SYSTEM MERGES`. Уровень: `TABLE`. Алиасы: `SYSTEM STOP MERGES`, `SYSTEM START MERGES`, `STOP MERGES`, `START MERGES`
- `SYSTEM TTL MERGES`. Уровень: `TABLE`. Алиасы: `SYSTEM STOP TTL MERGES`, `SYSTEM START TTL MERGES`, `STOP TTL MERGES`, `START TTL MERGES`
- `SYSTEM FETCHES`. Уровень: `TABLE`. Алиасы: `SYSTEM STOP FETCHES`, `SYSTEM START FETCHES`, `STOP FETCHES`, `START FETCHES`

View File

@ -54,9 +54,10 @@ void ODBCHandler::processError(HTTPServerResponse & response, const std::string
void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse & response)
{
HTMLForm params(request);
LOG_TRACE(log, "Request URI: {}", request.getURI());
if (mode == "read")
params.read(request.getStream());
LOG_TRACE(log, "Request URI: {}", request.getURI());
if (mode == "read" && !params.has("query"))
{
@ -64,11 +65,6 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
return;
}
if (!params.has("columns"))
{
processError(response, "No 'columns' in request URL");
return;
}
if (!params.has("connection_string"))
{
@ -76,6 +72,16 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
return;
}
if (!params.has("sample_block"))
{
processError(response, "No 'sample_block' in request URL");
return;
}
std::string format = params.get("format", "RowBinary");
std::string connection_string = params.get("connection_string");
LOG_TRACE(log, "Connection string: '{}'", connection_string);
UInt64 max_block_size = DEFAULT_BLOCK_SIZE;
if (params.has("max_block_size"))
{
@ -88,24 +94,19 @@ void ODBCHandler::handleRequest(HTTPServerRequest & request, HTTPServerResponse
max_block_size = parse<size_t>(max_block_size_str);
}
std::string columns = params.get("columns");
std::string sample_block_string = params.get("sample_block");
std::unique_ptr<Block> sample_block;
try
{
sample_block = parseColumns(std::move(columns));
sample_block = parseColumns(std::move(sample_block_string));
}
catch (const Exception & ex)
{
processError(response, "Invalid 'columns' parameter in request body '" + ex.message() + "'");
LOG_WARNING(log, ex.getStackTraceString());
processError(response, "Invalid 'sample_block' parameter in request body '" + ex.message() + "'");
LOG_ERROR(log, ex.getStackTraceString());
return;
}
std::string format = params.get("format", "RowBinary");
std::string connection_string = params.get("connection_string");
LOG_TRACE(log, "Connection string: '{}'", connection_string);
WriteBufferFromHTTPServerResponse out(response, request.getMethod() == Poco::Net::HTTPRequest::HTTP_HEAD, keep_alive_timeout);
try

View File

@ -89,7 +89,7 @@
<!-- Compatibility with PostgreSQL protocol.
ClickHouse will pretend to be PostgreSQL for applications connecting to this port.
-->
<!-- <postgresql_port>9005</postgresql_port> -->
<postgresql_port>9005</postgresql_port>
<!-- HTTP API with TLS (HTTPS).
You have to configure certificate to enable this interface.

View File

@ -62,7 +62,7 @@ enum class AccessType
enabled implicitly by the grant ALTER_TABLE */\
M(ALTER_SETTINGS, "ALTER SETTING, ALTER MODIFY SETTING, MODIFY SETTING", TABLE, ALTER_TABLE) /* allows to execute ALTER MODIFY SETTING */\
M(ALTER_MOVE_PARTITION, "ALTER MOVE PART, MOVE PARTITION, MOVE PART", TABLE, ALTER_TABLE) \
M(ALTER_FETCH_PARTITION, "FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FETCH_PARTITION, "ALTER FETCH PART, FETCH PARTITION", TABLE, ALTER_TABLE) \
M(ALTER_FREEZE_PARTITION, "FREEZE PARTITION, UNFREEZE", TABLE, ALTER_TABLE) \
\
M(ALTER_TABLE, "", GROUP, ALTER) \

View File

@ -37,7 +37,7 @@ class IXDBCBridgeHelper : public IBridgeHelper
public:
explicit IXDBCBridgeHelper(ContextPtr context_) : IBridgeHelper(context_) {}
virtual std::vector<std::pair<std::string, std::string>> getURLParams(const std::string & cols, UInt64 max_block_size) const = 0;
virtual std::vector<std::pair<std::string, std::string>> getURLParams(UInt64 max_block_size) const = 0;
virtual Poco::URI getColumnsInfoURI() const = 0;
@ -138,12 +138,11 @@ protected:
return uri;
}
URLParams getURLParams(const std::string & cols, UInt64 max_block_size) const override
URLParams getURLParams(UInt64 max_block_size) const override
{
std::vector<std::pair<std::string, std::string>> result;
result.emplace_back("connection_string", connection_string); /// already validated
result.emplace_back("columns", cols);
result.emplace_back("max_block_size", std::to_string(max_block_size));
return result;

View File

@ -122,7 +122,7 @@ namespace
else if (auto * data_uint64 = getIndexesData<UInt64>(column))
return mapUniqueIndexImpl(*data_uint64);
else
throw Exception("Indexes column for getUniqueIndex must be ColumnUInt, got" + column.getName(),
throw Exception("Indexes column for getUniqueIndex must be ColumnUInt, got " + column.getName(),
ErrorCodes::LOGICAL_ERROR);
}
}
@ -151,7 +151,7 @@ void ColumnLowCardinality::insertFrom(const IColumn & src, size_t n)
const auto * low_cardinality_src = typeid_cast<const ColumnLowCardinality *>(&src);
if (!low_cardinality_src)
throw Exception("Expected ColumnLowCardinality, got" + src.getName(), ErrorCodes::ILLEGAL_COLUMN);
throw Exception("Expected ColumnLowCardinality, got " + src.getName(), ErrorCodes::ILLEGAL_COLUMN);
size_t position = low_cardinality_src->getIndexes().getUInt(n);

View File

@ -66,7 +66,7 @@ ColumnPtr selectIndexImpl(const Column & column, const IColumn & indexes, size_t
else if (auto * data_uint64 = detail::getIndexesData<UInt64>(indexes))
return column.template indexImpl<UInt64>(*data_uint64, limit);
else
throw Exception("Indexes column for IColumn::select must be ColumnUInt, got" + indexes.getName(),
throw Exception("Indexes column for IColumn::select must be ColumnUInt, got " + indexes.getName(),
ErrorCodes::LOGICAL_ERROR);
}

View File

@ -88,7 +88,6 @@ void checkColumn(
if (num_collisions <= max_collisions_to_print)
{
collisions_str << "Collision:\n";
collisions_str << print_for_row(it->second) << '\n';
collisions_str << print_for_row(i) << std::endl;
}

View File

@ -146,6 +146,9 @@
M(StorageBufferPassedTimeMaxThreshold, "") \
M(StorageBufferPassedRowsMaxThreshold, "") \
M(StorageBufferPassedBytesMaxThreshold, "") \
M(StorageBufferPassedTimeFlushThreshold, "") \
M(StorageBufferPassedRowsFlushThreshold, "") \
M(StorageBufferPassedBytesFlushThreshold, "") \
M(StorageBufferLayerLockReadersWaitMilliseconds, "Time for waiting for Buffer layer during reading") \
M(StorageBufferLayerLockWritersWaitMilliseconds, "Time for waiting free Buffer layer to write to (can be used to tune Buffer layers)") \
\

View File

@ -184,6 +184,10 @@ static void * getCallerAddress(const ucontext_t & context)
# else
return reinterpret_cast<void *>(context.uc_mcontext.gregs[REG_RIP]);
# endif
#elif defined(__APPLE__) && defined(__aarch64__)
return reinterpret_cast<void *>(context.uc_mcontext->__ss.__pc);
#elif defined(__aarch64__)
return reinterpret_cast<void *>(context.uc_mcontext.pc);
#elif defined(__powerpc64__)

View File

@ -832,10 +832,13 @@ class NoPasswordAuth : public AuthenticationMethod
{
public:
void authenticate(
const String & /* user_name */,
ContextPtr /* context */,
Messaging::MessageTransport & /* mt */,
const Poco::Net::SocketAddress & /* address */) override {}
const String & user_name,
ContextPtr context,
Messaging::MessageTransport & mt,
const Poco::Net::SocketAddress & address) override
{
setPassword(user_name, "", context, mt, address);
}
Authentication::Type getType() const override
{

View File

@ -252,8 +252,6 @@ class IColumn;
* Almost all limits apply to each stream individually. \
*/ \
\
M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \
M(UInt64, offset, 0, "Offset on read rows from the most 'end' result for select query", 0) \
M(UInt64, max_rows_to_read, 0, "Limit on read rows from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \
M(UInt64, max_bytes_to_read, 0, "Limit on read bytes (after decompression) from the most 'deep' sources. That is, only in the deepest subquery. When reading from a remote server, it is only checked on a remote server.", 0) \
M(OverflowMode, read_overflow_mode, OverflowMode::THROW, "What to do when the limit is exceeded.", 0) \
@ -464,6 +462,8 @@ class IColumn;
\
M(Bool, database_replicated_ddl_output, true, "Obsolete setting, does nothing. Will be removed after 2021-09-08", 0) \
M(HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT, "How to handle errors for Kafka engine. Passible values: default, stream.", 0) \
M(UInt64, limit, 0, "Limit on read rows from the most 'end' result for select query, default 0 means no limit length", 0) \
M(UInt64, offset, 0, "Offset on read rows from the most 'end' result for select query", 0) \
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS below.

View File

@ -466,7 +466,7 @@ namespace
else if (auto * data_uint64 = getIndexesData<UInt64>(column))
return mapIndexWithAdditionalKeys(*data_uint64, dict_size);
else
throw Exception("Indexes column for mapIndexWithAdditionalKeys must be UInt, got" + column.getName(),
throw Exception("Indexes column for mapIndexWithAdditionalKeys must be UInt, got " + column.getName(),
ErrorCodes::LOGICAL_ERROR);
}
}

View File

@ -58,7 +58,7 @@ std::pair<String, StoragePtr> createTableFromAST(
auto table_function = factory.get(ast_create_query.as_table_function, context);
ColumnsDescription columns;
if (ast_create_query.columns_list && ast_create_query.columns_list->columns)
columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, false);
columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, true);
StoragePtr storage = table_function->execute(ast_create_query.as_table_function, context, ast_create_query.table, std::move(columns));
storage->renameInMemory(ast_create_query);
return {ast_create_query.table, storage};
@ -69,7 +69,7 @@ std::pair<String, StoragePtr> createTableFromAST(
if (!ast_create_query.columns_list || !ast_create_query.columns_list->columns)
throw Exception("Missing definition of columns.", ErrorCodes::EMPTY_LIST_OF_COLUMNS_PASSED);
ColumnsDescription columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, false);
ColumnsDescription columns = InterpreterCreateQuery::getColumnsDescription(*ast_create_query.columns_list->columns, context, true);
ConstraintsDescription constraints = InterpreterCreateQuery::getConstraintsDescription(ast_create_query.columns_list->constraints);
return

View File

@ -51,6 +51,14 @@ Columns DirectDictionary<dictionary_key_type>::getColumns(
key_to_fetched_index.reserve(requested_keys.size());
auto fetched_columns_from_storage = request.makeAttributesResultColumns();
for (size_t attribute_index = 0; attribute_index < request.attributesSize(); ++attribute_index)
{
if (!request.shouldFillResultColumnWithIndex(attribute_index))
continue;
auto & fetched_column_from_storage = fetched_columns_from_storage[attribute_index];
fetched_column_from_storage->reserve(requested_keys.size());
}
size_t fetched_key_index = 0;

View File

@ -16,12 +16,9 @@
#include "DictionarySourceFactory.h"
#include "DictionaryStructure.h"
#include "readInvalidateQuery.h"
#include "registerDictionaries.h"
#include <Common/escapeForFileName.h>
#if USE_ODBC
# include <Poco/Data/ODBC/Connector.h> // Y_IGNORE
#endif
namespace DB
{
@ -125,7 +122,7 @@ XDBCDictionarySource::XDBCDictionarySource(
{
bridge_url = bridge_helper->getMainURI();
auto url_params = bridge_helper->getURLParams(sample_block_.getNamesAndTypesList().toString(), max_block_size);
auto url_params = bridge_helper->getURLParams(max_block_size);
for (const auto & [name, value] : url_params)
bridge_url.addQueryParameter(name, value);
}
@ -151,6 +148,7 @@ XDBCDictionarySource::XDBCDictionarySource(const XDBCDictionarySource & other)
{
}
std::string XDBCDictionarySource::getUpdateFieldAndDate()
{
if (update_time != std::chrono::system_clock::from_time_t(0))
@ -167,52 +165,61 @@ std::string XDBCDictionarySource::getUpdateFieldAndDate()
}
}
BlockInputStreamPtr XDBCDictionarySource::loadAll()
{
LOG_TRACE(log, load_all_query);
return loadBase(load_all_query);
return loadFromQuery(bridge_url, sample_block, load_all_query);
}
BlockInputStreamPtr XDBCDictionarySource::loadUpdatedAll()
{
std::string load_query_update = getUpdateFieldAndDate();
LOG_TRACE(log, load_query_update);
return loadBase(load_query_update);
return loadFromQuery(bridge_url, sample_block, load_query_update);
}
BlockInputStreamPtr XDBCDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
const auto query = query_builder.composeLoadIdsQuery(ids);
return loadBase(query);
return loadFromQuery(bridge_url, sample_block, query);
}
BlockInputStreamPtr XDBCDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
const auto query = query_builder.composeLoadKeysQuery(key_columns, requested_rows, ExternalQueryBuilder::AND_OR_CHAIN);
return loadBase(query);
return loadFromQuery(bridge_url, sample_block, query);
}
bool XDBCDictionarySource::supportsSelectiveLoad() const
{
return true;
}
bool XDBCDictionarySource::hasUpdateField() const
{
return !update_field.empty();
}
DictionarySourcePtr XDBCDictionarySource::clone() const
{
return std::make_unique<XDBCDictionarySource>(*this);
}
std::string XDBCDictionarySource::toString() const
{
return bridge_helper->getName() + ": " + db + '.' + table + (where.empty() ? "" : ", where: " + where);
}
bool XDBCDictionarySource::isModified() const
{
if (!invalidate_query.empty())
@ -235,41 +242,38 @@ std::string XDBCDictionarySource::doInvalidateQuery(const std::string & request)
bridge_helper->startBridgeSync();
auto invalidate_url = bridge_helper->getMainURI();
auto url_params = bridge_helper->getURLParams(invalidate_sample_block.getNamesAndTypesList().toString(), max_block_size);
auto url_params = bridge_helper->getURLParams(max_block_size);
for (const auto & [name, value] : url_params)
invalidate_url.addQueryParameter(name, value);
XDBCBridgeBlockInputStream stream(
invalidate_url,
[request](std::ostream & os) { os << "query=" << request; },
invalidate_sample_block,
getContext(),
max_block_size,
timeouts,
bridge_helper->getName() + "BlockInputStream");
return readInvalidateQuery(stream);
return readInvalidateQuery(*loadFromQuery(invalidate_url, invalidate_sample_block, request));
}
BlockInputStreamPtr XDBCDictionarySource::loadBase(const std::string & query) const
BlockInputStreamPtr XDBCDictionarySource::loadFromQuery(const Poco::URI url, const Block & required_sample_block, const std::string & query) const
{
bridge_helper->startBridgeSync();
auto write_body_callback = [required_sample_block, query](std::ostream & os)
{
os << "sample_block=" << escapeForFileName(required_sample_block.getNamesAndTypesList().toString());
os << "&";
os << "query=" << escapeForFileName(query);
};
return std::make_shared<XDBCBridgeBlockInputStream>(
bridge_url,
[query](std::ostream & os) { os << "query=" << query; },
sample_block,
url,
write_body_callback,
required_sample_block,
getContext(),
max_block_size,
timeouts,
bridge_helper->getName() + "BlockInputStream");
}
void registerDictionarySourceXDBC(DictionarySourceFactory & factory)
{
#if USE_ODBC
Poco::Data::ODBC::Connector::registerConnector();
#endif
auto create_table_source = [=](const DictionaryStructure & dict_struct,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
@ -294,6 +298,7 @@ void registerDictionarySourceXDBC(DictionarySourceFactory & factory)
factory.registerSource("odbc", create_table_source);
}
void registerDictionarySourceJDBC(DictionarySourceFactory & factory)
{
auto create_table_source = [=](const DictionaryStructure & /* dict_struct */,

View File

@ -62,7 +62,7 @@ private:
// execute invalidate_query. expects single cell in result
std::string doInvalidateQuery(const std::string & request) const;
BlockInputStreamPtr loadBase(const std::string & query) const;
BlockInputStreamPtr loadFromQuery(const Poco::URI url, const Block & required_sample_block, const std::string & query) const;
Poco::Logger * log;

View File

@ -51,7 +51,6 @@ SRCS(
HierarchyDictionariesUtils.cpp
IPAddressDictionary.cpp
LibraryDictionarySource.cpp
LibraryDictionarySourceExternal.cpp
MongoDBDictionarySource.cpp
MySQLDictionarySource.cpp
PolygonDictionary.cpp

View File

@ -1,9 +1,17 @@
#include <aws/core/client/DefaultRetryStrategy.h>
#if !defined(ARCADIA_BUILD)
#include <Common/config.h>
#endif
#include <IO/ReadHelpers.h>
#include <IO/S3Common.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <common/logger_useful.h>
#if USE_AWS_S3
#include <aws/core/client/DefaultRetryStrategy.h>
#include <IO/S3Common.h>
#include "DiskS3.h"
#include "Disks/DiskCacheWrapper.h"
#include "Disks/DiskFactory.h"
@ -196,3 +204,10 @@ void registerDiskS3(DiskFactory & factory)
}
}
#else
void registerDiskS3(DiskFactory &) {}
#endif

View File

@ -1,5 +1,7 @@
configure_file(config_functions.h.in ${ConfigIncludePath}/config_functions.h)
add_subdirectory(divide)
include(${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake)
add_headers_and_sources(clickhouse_functions .)
@ -25,7 +27,7 @@ target_link_libraries(clickhouse_functions
PRIVATE
${ZLIB_LIBRARIES}
boost::filesystem
libdivide
divide_impl
)
if (OPENSSL_CRYPTO_LIBRARY)

View File

@ -20,7 +20,7 @@ void registerFunctionsCoding(FunctionFactory & factory)
factory.registerFunction<FunctionUUIDNumToString>();
factory.registerFunction<FunctionUUIDStringToNum>();
factory.registerFunction<FunctionHex>(FunctionFactory::CaseInsensitive);
factory.registerFunction<FunctionUnhex>();
factory.registerFunction<FunctionUnhex>(FunctionFactory::CaseInsensitive);
factory.registerFunction<FunctionChar>(FunctionFactory::CaseInsensitive);
factory.registerFunction<FunctionBitmaskToArray>();
factory.registerFunction<FunctionToIPv4>();

View File

@ -0,0 +1,187 @@
#include "FunctionArrayMapped.h"
#include <Functions/FunctionFactory.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int SIZES_OF_ARRAYS_DOESNT_MATCH;
extern const int TYPE_MISMATCH;
}
/** arrayFold(x1,...,xn,accum -> expression, array1,...,arrayn, init_accum) - apply the expression to each element of the array (or set of parallel arrays).
*/
class FunctionArrayFold : public IFunction
{
public:
static constexpr auto name = "arrayFold";
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionArrayFold>(); }
String getName() const override { return name; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
void getLambdaArgumentTypes(DataTypes & arguments) const override
{
if (arguments.size() < 3)
throw Exception("Function " + getName() + " needs lambda function, at least one array argument and one accumulator argument.",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
DataTypes nested_types(arguments.size() - 1);
for (size_t i = 0; i < nested_types.size() - 1; ++i)
{
const DataTypeArray * array_type = checkAndGetDataType<DataTypeArray>(&*arguments[i + 1]);
if (!array_type)
throw Exception("Argument " + toString(i + 2) + " of function " + getName() + " must be array. Found "
+ arguments[i + 1]->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
nested_types[i] = recursiveRemoveLowCardinality(array_type->getNestedType());
}
nested_types[nested_types.size() - 1] = arguments[arguments.size() - 1];
const DataTypeFunction * function_type = checkAndGetDataType<DataTypeFunction>(arguments[0].get());
if (!function_type || function_type->getArgumentTypes().size() != nested_types.size())
throw Exception("First argument for this overload of " + getName() + " must be a function with "
+ toString(nested_types.size()) + " arguments. Found "
+ arguments[0]->getName() + " instead.", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
arguments[0] = std::make_shared<DataTypeFunction>(nested_types);
}
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
if (arguments.size() < 2)
throw Exception("Function " + getName() + " needs at least 2 arguments; passed "
+ toString(arguments.size()) + ".",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const auto * data_type_function = checkAndGetDataType<DataTypeFunction>(arguments[0].type.get());
if (!data_type_function)
throw Exception("First argument for function " + getName() + " must be a function.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
auto const accumulator_type = arguments.back().type;
auto const lambda_type = data_type_function->getReturnType();
if (! accumulator_type->equals(*lambda_type))
throw Exception("Return type of lambda function must be the same as the accumulator type. "
"Inferred type of lambda " + lambda_type->getName() + ", "
+ "inferred type of accumulator " + accumulator_type->getName() + ".",
ErrorCodes::TYPE_MISMATCH);
return DataTypePtr(accumulator_type);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /*input_rows_count*/) const override
{
const auto & column_with_type_and_name = arguments[0];
if (!column_with_type_and_name.column)
throw Exception("First argument for function " + getName() + " must be a function.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
const auto * column_function = typeid_cast<const ColumnFunction *>(column_with_type_and_name.column.get());
if (!column_function)
throw Exception("First argument for function " + getName() + " must be a function.",
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
ColumnPtr offsets_column;
ColumnPtr column_first_array_ptr;
const ColumnArray * column_first_array = nullptr;
ColumnsWithTypeAndName arrays;
arrays.reserve(arguments.size() - 1);
for (size_t i = 1; i < arguments.size() - 1; ++i)
{
const auto & array_with_type_and_name = arguments[i];
ColumnPtr column_array_ptr = array_with_type_and_name.column;
const auto * column_array = checkAndGetColumn<ColumnArray>(column_array_ptr.get());
const DataTypePtr & array_type_ptr = array_with_type_and_name.type;
const auto * array_type = checkAndGetDataType<DataTypeArray>(array_type_ptr.get());
if (!column_array)
{
const ColumnConst * column_const_array = checkAndGetColumnConst<ColumnArray>(column_array_ptr.get());
if (!column_const_array)
throw Exception("Expected array column, found " + column_array_ptr->getName(), ErrorCodes::ILLEGAL_COLUMN);
column_array_ptr = recursiveRemoveLowCardinality(column_const_array->convertToFullColumn());
column_array = checkAndGetColumn<ColumnArray>(column_array_ptr.get());
}
if (!array_type)
throw Exception("Expected array type, found " + array_type_ptr->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (!offsets_column)
{
offsets_column = column_array->getOffsetsPtr();
}
else
{
/// The first condition is optimization: do not compare data if the pointers are equal.
if (column_array->getOffsetsPtr() != offsets_column
&& column_array->getOffsets() != typeid_cast<const ColumnArray::ColumnOffsets &>(*offsets_column).getData())
throw Exception("Arrays passed to " + getName() + " must have equal size", ErrorCodes::SIZES_OF_ARRAYS_DOESNT_MATCH);
}
if (i == 1)
{
column_first_array_ptr = column_array_ptr;
column_first_array = column_array;
}
arrays.emplace_back(ColumnWithTypeAndName(column_array->getDataPtr(),
recursiveRemoveLowCardinality(array_type->getNestedType()),
array_with_type_and_name.name));
}
arrays.emplace_back(arguments.back());
MutableColumnPtr result = arguments.back().column->convertToFullColumnIfConst()->cloneEmpty();
size_t arr_cursor = 0;
for (size_t irow = 0; irow < column_first_array->size(); ++irow) // for each row of result
{
// Make accumulator column for this row. We initialize it
// with the starting value given as the last argument.
ColumnWithTypeAndName accumulator_column = arguments.back();
ColumnPtr acc(accumulator_column.column->cut(irow, 1));
auto accumulator = ColumnWithTypeAndName(acc,
accumulator_column.type,
accumulator_column.name);
ColumnPtr res(acc);
size_t const arr_next = column_first_array->getOffsets()[irow]; // when we do folding
for (size_t iter = 0; arr_cursor < arr_next; ++iter, ++arr_cursor)
{
// Make slice of input arrays and accumulator for lambda
ColumnsWithTypeAndName iter_arrays;
iter_arrays.reserve(arrays.size() + 1);
for (size_t icolumn = 0; icolumn < arrays.size() - 1; ++icolumn)
{
auto const & arr = arrays[icolumn];
iter_arrays.emplace_back(ColumnWithTypeAndName(arr.column->cut(arr_cursor, 1),
arr.type,
arr.name));
}
iter_arrays.emplace_back(accumulator);
// Calculate function on arguments
auto replicated_column_function_ptr = IColumn::mutate(column_function->replicate(ColumnArray::Offsets(column_first_array->getOffsets().size(), 1)));
auto * replicated_column_function = typeid_cast<ColumnFunction *>(replicated_column_function_ptr.get());
replicated_column_function->appendArguments(iter_arrays);
auto lambda_result = replicated_column_function->reduce().column;
if (lambda_result->lowCardinality())
lambda_result = lambda_result->convertToFullColumnIfLowCardinality();
res = lambda_result->cut(0, 1);
accumulator.column = res;
}
result->insert((*res)[0]);
}
return result;
}
};
void registerFunctionArrayFold(FunctionFactory & factory)
{
factory.registerFunction<FunctionArrayFold>();
}
}

View File

@ -0,0 +1,22 @@
# A library for integer division by constant with CPU dispatching.
if (ARCH_AMD64)
add_library(divide_impl_sse2 divideImpl.cpp)
target_compile_options(divide_impl_sse2 PRIVATE -msse2 -DNAMESPACE=SSE2)
target_link_libraries(divide_impl_sse2 libdivide)
add_library(divide_impl_avx2 divideImpl.cpp)
target_compile_options(divide_impl_avx2 PRIVATE -mavx2 -DNAMESPACE=AVX2)
target_link_libraries(divide_impl_avx2 libdivide)
set(IMPLEMENTATIONS divide_impl_sse2 divide_impl_avx2)
else ()
add_library(divide_impl_generic divideImpl.cpp)
target_compile_options(divide_impl_generic PRIVATE -DNAMESPACE=Generic)
target_link_libraries(divide_impl_generic libdivide)
set(IMPLEMENTATIONS divide_impl_generic)
endif ()
add_library(divide_impl divide.cpp)
target_link_libraries(divide_impl ${IMPLEMENTATIONS} clickhouse_common_io)

View File

@ -0,0 +1,57 @@
#include "divide.h"
#include <Common/CpuId.h>
#if defined(__x86_64__) && !defined(ARCADIA_BUILD)
namespace SSE2
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);
}
namespace AVX2
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);
}
#else
namespace Generic
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);
}
#endif
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size)
{
#if defined(__x86_64__) && !defined(ARCADIA_BUILD)
if (DB::Cpu::CpuFlagsCache::have_AVX2)
AVX2::divideImpl(a_pos, b, c_pos, size);
else if (DB::Cpu::CpuFlagsCache::have_SSE2)
SSE2::divideImpl(a_pos, b, c_pos, size);
#else
Generic::divideImpl(a_pos, b, c_pos, size);
#endif
}
template void divideImpl<uint64_t, uint64_t, uint64_t>(const uint64_t * __restrict, uint64_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint32_t, uint64_t>(const uint64_t * __restrict, uint32_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint16_t, uint64_t>(const uint64_t * __restrict, uint16_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, char8_t, uint64_t>(const uint64_t * __restrict, char8_t, uint64_t * __restrict, size_t);
template void divideImpl<uint32_t, uint64_t, uint32_t>(const uint32_t * __restrict, uint64_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint32_t, uint32_t>(const uint32_t * __restrict, uint32_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint16_t, uint32_t>(const uint32_t * __restrict, uint16_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, char8_t, uint32_t>(const uint32_t * __restrict, char8_t, uint32_t * __restrict, size_t);
template void divideImpl<int64_t, int64_t, int64_t>(const int64_t * __restrict, int64_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int32_t, int64_t>(const int64_t * __restrict, int32_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int16_t, int64_t>(const int64_t * __restrict, int16_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int8_t, int64_t>(const int64_t * __restrict, int8_t, int64_t * __restrict, size_t);
template void divideImpl<int32_t, int64_t, int32_t>(const int32_t * __restrict, int64_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int32_t, int32_t>(const int32_t * __restrict, int32_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int16_t, int32_t>(const int32_t * __restrict, int16_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int8_t, int32_t>(const int32_t * __restrict, int8_t, int32_t * __restrict, size_t);

View File

@ -0,0 +1,6 @@
#pragma once
#include <cstddef>
template <typename A, typename B, typename ResultType>
extern void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size);

View File

@ -0,0 +1,79 @@
/// This translation unit should be compiled multiple times
/// with different values of NAMESPACE and machine flags (sse2, avx2).
#if !defined(NAMESPACE)
#if defined(ARCADIA_BUILD)
#define NAMESPACE Generic
#else
#error "NAMESPACE macro must be defined"
#endif
#endif
#if defined(__AVX2__)
#define REG_SIZE 32
#define LIBDIVIDE_AVX2
#elif defined(__SSE2__)
#define REG_SIZE 16
#define LIBDIVIDE_SSE2
#endif
#include <libdivide.h>
namespace NAMESPACE
{
template <typename A, typename B, typename ResultType>
void divideImpl(const A * __restrict a_pos, B b, ResultType * __restrict c_pos, size_t size)
{
libdivide::divider<A> divider(b);
const A * a_end = a_pos + size;
#if defined(__SSE2__)
static constexpr size_t values_per_simd_register = REG_SIZE / sizeof(A);
const A * a_end_simd = a_pos + size / values_per_simd_register * values_per_simd_register;
while (a_pos < a_end_simd)
{
#if defined(__AVX2__)
_mm256_storeu_si256(reinterpret_cast<__m256i *>(c_pos),
_mm256_loadu_si256(reinterpret_cast<const __m256i *>(a_pos)) / divider);
#else
_mm_storeu_si128(reinterpret_cast<__m128i *>(c_pos),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(a_pos)) / divider);
#endif
a_pos += values_per_simd_register;
c_pos += values_per_simd_register;
}
#endif
while (a_pos < a_end)
{
*c_pos = *a_pos / divider;
++a_pos;
++c_pos;
}
}
template void divideImpl<uint64_t, uint64_t, uint64_t>(const uint64_t * __restrict, uint64_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint32_t, uint64_t>(const uint64_t * __restrict, uint32_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, uint16_t, uint64_t>(const uint64_t * __restrict, uint16_t, uint64_t * __restrict, size_t);
template void divideImpl<uint64_t, char8_t, uint64_t>(const uint64_t * __restrict, char8_t, uint64_t * __restrict, size_t);
template void divideImpl<uint32_t, uint64_t, uint32_t>(const uint32_t * __restrict, uint64_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint32_t, uint32_t>(const uint32_t * __restrict, uint32_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, uint16_t, uint32_t>(const uint32_t * __restrict, uint16_t, uint32_t * __restrict, size_t);
template void divideImpl<uint32_t, char8_t, uint32_t>(const uint32_t * __restrict, char8_t, uint32_t * __restrict, size_t);
template void divideImpl<int64_t, int64_t, int64_t>(const int64_t * __restrict, int64_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int32_t, int64_t>(const int64_t * __restrict, int32_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int16_t, int64_t>(const int64_t * __restrict, int16_t, int64_t * __restrict, size_t);
template void divideImpl<int64_t, int8_t, int64_t>(const int64_t * __restrict, int8_t, int64_t * __restrict, size_t);
template void divideImpl<int32_t, int64_t, int32_t>(const int32_t * __restrict, int64_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int32_t, int32_t>(const int32_t * __restrict, int32_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int16_t, int32_t>(const int32_t * __restrict, int16_t, int32_t * __restrict, size_t);
template void divideImpl<int32_t, int8_t, int32_t>(const int32_t * __restrict, int8_t, int32_t * __restrict, size_t);
}

View File

@ -1,11 +1,7 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionBinaryArithmetic.h>
#if defined(__SSE2__)
# define LIBDIVIDE_SSE2 1
#endif
#include <libdivide.h>
#include "divide/divide.h"
namespace DB
@ -70,34 +66,11 @@ struct DivideIntegralByConstantImpl
if (unlikely(static_cast<A>(b) == 0))
throw Exception("Division by zero", ErrorCodes::ILLEGAL_DIVISION);
libdivide::divider<A> divider(b);
const A * a_end = a_pos + size;
#if defined(__SSE2__)
static constexpr size_t values_per_sse_register = 16 / sizeof(A);
const A * a_end_sse = a_pos + size / values_per_sse_register * values_per_sse_register;
while (a_pos < a_end_sse)
{
_mm_storeu_si128(reinterpret_cast<__m128i *>(c_pos),
_mm_loadu_si128(reinterpret_cast<const __m128i *>(a_pos)) / divider);
a_pos += values_per_sse_register;
c_pos += values_per_sse_register;
}
#endif
while (a_pos < a_end)
{
*c_pos = *a_pos / divider;
++a_pos;
++c_pos;
}
divideImpl(a_pos, b, c_pos, size);
}
};
/** Specializations are specified for dividing numbers of the type UInt64 and UInt32 by the numbers of the same sign.
/** Specializations are specified for dividing numbers of the type UInt64, UInt32, Int64, Int32 by the numbers of the same sign.
* Can be expanded to all possible combinations, but more code is needed.
*/

View File

@ -35,7 +35,7 @@ public:
{
const auto * type = typeid_cast<const DataTypeLowCardinality *>(arguments[0].get());
if (!type)
throw Exception("First first argument of function lowCardinalityIndexes must be ColumnLowCardinality, but got"
throw Exception("First first argument of function lowCardinalityIndexes must be ColumnLowCardinality, but got "
+ arguments[0]->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return std::make_shared<DataTypeUInt64>();

View File

@ -33,7 +33,7 @@ public:
{
const auto * type = typeid_cast<const DataTypeLowCardinality *>(arguments[0].get());
if (!type)
throw Exception("First first argument of function lowCardinalityKeys must be ColumnLowCardinality, but got"
throw Exception("First first argument of function lowCardinalityKeys must be ColumnLowCardinality, but got "
+ arguments[0]->getName(), ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
return type->getDictionaryType();

View File

@ -4,6 +4,7 @@ namespace DB
class FunctionFactory;
void registerFunctionArrayMap(FunctionFactory & factory);
void registerFunctionArrayFold(FunctionFactory & factory);
void registerFunctionArrayFilter(FunctionFactory & factory);
void registerFunctionArrayCount(FunctionFactory & factory);
void registerFunctionArrayExists(FunctionFactory & factory);
@ -22,6 +23,7 @@ void registerFunctionArrayDifference(FunctionFactory & factory);
void registerFunctionsHigherOrder(FunctionFactory & factory)
{
registerFunctionArrayMap(factory);
registerFunctionArrayFold(factory);
registerFunctionArrayFilter(factory);
registerFunctionArrayCount(factory);
registerFunctionArrayExists(factory);

View File

@ -144,6 +144,7 @@ SRCS(
array/arrayFirst.cpp
array/arrayFirstIndex.cpp
array/arrayFlatten.cpp
array/arrayFold.cpp
array/arrayIntersect.cpp
array/arrayJoin.cpp
array/arrayMap.cpp
@ -229,6 +230,8 @@ SRCS(
defaultValueOfTypeName.cpp
demange.cpp
divide.cpp
divide/divide.cpp
divide/divideImpl.cpp
dumpColumnStructure.cpp
e.cpp
empty.cpp

View File

@ -1,13 +1,17 @@
#pragma once
#include <Common/config.h>
#if USE_AWS_S3
#include <Common/RemoteHostFilter.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/HTTPCommon.h>
#include <IO/S3/SessionAwareIOStream.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/core/http/HttpClient.h>
#include <aws/core/http/HttpRequest.h>
#include <aws/core/http/standard/StandardHttpResponse.h>
#include <aws/core/client/ClientConfiguration.h> // Y_IGNORE
#include <aws/core/http/HttpClient.h> // Y_IGNORE
#include <aws/core/http/HttpRequest.h> // Y_IGNORE
#include <aws/core/http/standard/StandardHttpResponse.h> // Y_IGNORE
namespace Aws::Http::Standard
{
@ -94,3 +98,5 @@ private:
};
}
#endif

View File

@ -5,8 +5,8 @@
#if USE_AWS_S3
#include <common/types.h>
#include <aws/core/Aws.h>
#include <aws/core/client/ClientConfiguration.h>
#include <aws/core/Aws.h> // Y_IGNORE
#include <aws/core/client/ClientConfiguration.h> // Y_IGNORE
#include <IO/S3/PocoHTTPClient.h>
#include <Poco/URI.h>

View File

@ -161,7 +161,7 @@ INSTANTIATE_TEST_SUITE_P(DateTimeToString, DateTimeToStringParamTestDayNum,
{
"Negative DayNum value wraps as if it was UInt16 due to LUT limitations and to maintain compatibility with existing code.",
DayNum(-10 * 365),
"2106-02-07"
"2139-06-10"
},
})
);

View File

@ -650,7 +650,7 @@ std::optional<NameAndTypePair> ActionsMatcher::getNameAndTypeFromAST(const ASTPt
return NameAndTypePair(child_column_name, node->result_type);
if (!data.only_consts)
throw Exception("Unknown identifier: " + child_column_name + " there are columns: " + data.actions_stack.dumpNames(),
throw Exception("Unknown identifier: " + child_column_name + "; there are columns: " + data.actions_stack.dumpNames(),
ErrorCodes::UNKNOWN_IDENTIFIER);
return {};

View File

@ -834,7 +834,7 @@ void Aggregator::writeToTemporaryFile(AggregatedDataVariants & data_variants, co
ProfileEvents::increment(ProfileEvents::ExternalAggregationCompressedBytes, compressed_bytes);
ProfileEvents::increment(ProfileEvents::ExternalAggregationUncompressedBytes, uncompressed_bytes);
LOG_TRACE(log,
LOG_DEBUG(log,
"Written part in {} sec., {} rows, {} uncompressed, {} compressed,"
" {} uncompressed bytes per row, {} compressed bytes per row, compression rate: {}"
" ({} rows/sec., {}/sec. uncompressed, {}/sec. compressed)",
@ -947,7 +947,7 @@ void Aggregator::writeToTemporaryFileImpl(
/// `data_variants` will not destroy them in the destructor, they are now owned by ColumnAggregateFunction objects.
data_variants.aggregator = nullptr;
LOG_TRACE(log, "Max size of temporary block: {} rows, {}.", max_temporary_block_size_rows, ReadableSize(max_temporary_block_size_bytes));
LOG_DEBUG(log, "Max size of temporary block: {} rows, {}.", max_temporary_block_size_rows, ReadableSize(max_temporary_block_size_bytes));
}
@ -1481,7 +1481,7 @@ BlocksList Aggregator::convertToBlocks(AggregatedDataVariants & data_variants, b
}
double elapsed_seconds = watch.elapsedSeconds();
LOG_TRACE(log,
LOG_DEBUG(log,
"Converted aggregated data to blocks. {} rows, {} in {} sec. ({} rows/sec., {}/sec.)",
rows, ReadableSize(bytes),
elapsed_seconds, rows / elapsed_seconds,
@ -2109,7 +2109,7 @@ Block Aggregator::mergeBlocks(BlocksList & blocks, bool final)
size_t rows = block.rows();
size_t bytes = block.bytes();
double elapsed_seconds = watch.elapsedSeconds();
LOG_TRACE(log, "Merged partially aggregated blocks. {} rows, {}. in {} sec. ({} rows/sec., {}/sec.)",
LOG_DEBUG(log, "Merged partially aggregated blocks. {} rows, {}. in {} sec. ({} rows/sec., {}/sec.)",
rows, ReadableSize(bytes),
elapsed_seconds, rows / elapsed_seconds,
ReadableSize(bytes / elapsed_seconds));

View File

@ -372,7 +372,20 @@ void DDLWorker::scheduleTasks(bool reinitialized)
}
Strings queue_nodes = zookeeper->getChildren(queue_dir, nullptr, queue_updated_event);
size_t size_before_filtering = queue_nodes.size();
filterAndSortQueueNodes(queue_nodes);
/// The following message is too verbose, but it can be useful too debug mysterious test failures in CI
LOG_TRACE(log, "scheduleTasks: initialized={}, size_before_filtering={}, queue_size={}, "
"entries={}..{}, "
"first_failed_task_name={}, current_tasks_size={},"
"last_current_task={},"
"last_skipped_entry_name={}",
initialized, size_before_filtering, queue_nodes.size(),
queue_nodes.empty() ? "none" : queue_nodes.front(), queue_nodes.empty() ? "none" : queue_nodes.back(),
first_failed_task_name ? *first_failed_task_name : "none", current_tasks.size(),
current_tasks.empty() ? "none" : current_tasks.back()->entry_name,
last_skipped_entry_name ? *last_skipped_entry_name : "none");
if (max_tasks_in_queue < queue_nodes.size())
cleanup_event->set();

View File

@ -363,7 +363,7 @@ ASTPtr InterpreterCreateQuery::formatConstraints(const ConstraintsDescription &
}
ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
const ASTExpressionList & columns_ast, ContextPtr context_, bool sanity_check_compression_codecs)
const ASTExpressionList & columns_ast, ContextPtr context_, bool attach)
{
/// First, deduce implicit types.
@ -372,6 +372,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
ASTPtr default_expr_list = std::make_shared<ASTExpressionList>();
NamesAndTypesList column_names_and_types;
bool make_columns_nullable = !attach && context_->getSettingsRef().data_type_default_nullable;
for (const auto & ast : columns_ast.children)
{
@ -390,8 +391,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
if (*col_decl.null_modifier)
column_type = makeNullable(column_type);
}
/// XXX: context_ or context ?
else if (context_->getSettingsRef().data_type_default_nullable)
else if (make_columns_nullable)
{
column_type = makeNullable(column_type);
}
@ -436,6 +436,7 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
if (!default_expr_list->children.empty())
defaults_sample_block = validateColumnsDefaultsAndGetSampleBlock(default_expr_list, column_names_and_types, context_);
bool sanity_check_compression_codecs = !attach && !context_->getSettingsRef().allow_suspicious_codecs;
ColumnsDescription res;
auto name_type_it = column_names_and_types.begin();
for (auto ast_it = columns_ast.children.begin(); ast_it != columns_ast.children.end(); ++ast_it, ++name_type_it)
@ -511,8 +512,7 @@ InterpreterCreateQuery::TableProperties InterpreterCreateQuery::setProperties(AS
if (create.columns_list->columns)
{
bool sanity_check_compression_codecs = !create.attach && !getContext()->getSettingsRef().allow_suspicious_codecs;
properties.columns = getColumnsDescription(*create.columns_list->columns, getContext(), sanity_check_compression_codecs);
properties.columns = getColumnsDescription(*create.columns_list->columns, getContext(), create.attach);
}
if (create.columns_list->indices)

View File

@ -53,7 +53,7 @@ public:
/// Obtain information about columns, their types, default values and column comments,
/// for case when columns in CREATE query is specified explicitly.
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, bool sanity_check_compression_codecs);
static ColumnsDescription getColumnsDescription(const ASTExpressionList & columns, ContextPtr context, bool attach);
static ConstraintsDescription getConstraintsDescription(const ASTExpressionList * constraints);
static void prepareOnClusterQuery(ASTCreateQuery & create, ContextPtr context, const String & cluster_name);

View File

@ -429,7 +429,7 @@ StoragePtr InterpreterSystemQuery::tryRestartReplica(const StorageID & replica,
auto & create = create_ast->as<ASTCreateQuery &>();
create.attach = true;
auto columns = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, system_context, false);
auto columns = InterpreterCreateQuery::getColumnsDescription(*create.columns_list->columns, system_context, true);
auto constraints = InterpreterCreateQuery::getConstraintsDescription(create.columns_list->constraints);
auto data_path = database->getTableDataPath(create);

View File

@ -245,7 +245,7 @@ void ASTAlterCommand::formatImpl(
else if (type == ASTAlterCommand::FETCH_PARTITION)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << indent_str << "FETCH "
<< "PARTITION " << (settings.hilite ? hilite_none : "");
<< (part ? "PART " : "PARTITION ") << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
settings.ostr << (settings.hilite ? hilite_keyword : "")
<< " FROM " << (settings.hilite ? hilite_none : "") << DB::quote << from;

View File

@ -142,16 +142,19 @@ antlrcpp::Any ParseTreeVisitor::visitIdentifierOrNull(ClickHouseParser::Identifi
antlrcpp::Any ParseTreeVisitor::visitInterval(ClickHouseParser::IntervalContext *)
{
asm (""); // prevent symbol removal
__builtin_unreachable();
}
antlrcpp::Any ParseTreeVisitor::visitKeyword(ClickHouseParser::KeywordContext *)
{
asm (""); // prevent symbol removal
__builtin_unreachable();
}
antlrcpp::Any ParseTreeVisitor::visitKeywordForAlias(ClickHouseParser::KeywordForAliasContext *)
{
asm (""); // prevent symbol removal
__builtin_unreachable();
}

View File

@ -61,6 +61,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_drop_detached_partition("DROP DETACHED PARTITION");
ParserKeyword s_drop_detached_part("DROP DETACHED PART");
ParserKeyword s_fetch_partition("FETCH PARTITION");
ParserKeyword s_fetch_part("FETCH PART");
ParserKeyword s_replace_partition("REPLACE PARTITION");
ParserKeyword s_freeze("FREEZE");
ParserKeyword s_unfreeze("UNFREEZE");
@ -428,6 +429,21 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
command->from = ast_from->as<ASTLiteral &>().value.get<const String &>();
command->type = ASTAlterCommand::FETCH_PARTITION;
}
else if (s_fetch_part.ignore(pos, expected))
{
if (!parser_string_literal.parse(pos, command->partition, expected))
return false;
if (!s_from.ignore(pos, expected))
return false;
ASTPtr ast_from;
if (!parser_string_literal.parse(pos, ast_from, expected))
return false;
command->from = ast_from->as<ASTLiteral &>().value.get<const String &>();
command->part = true;
command->type = ASTAlterCommand::FETCH_PARTITION;
}
else if (s_freeze.ignore(pos, expected))
{
if (s_partition.ignore(pos, expected))

View File

@ -190,7 +190,7 @@ Chunk IRowInputFormat::generate()
if (num_errors && (params.allow_errors_num > 0 || params.allow_errors_ratio > 0))
{
Poco::Logger * log = &Poco::Logger::get("IRowInputFormat");
LOG_TRACE(log, "Skipped {} rows with errors while reading the input stream", num_errors);
LOG_DEBUG(log, "Skipped {} rows with errors while reading the input stream", num_errors);
}
readSuffix();

View File

@ -21,16 +21,13 @@ void MarkdownRowOutputFormat::writePrefix()
}
writeCString("\n|", out);
String left_alignment = ":-|";
String central_alignment = ":-:|";
String right_alignment = "-:|";
for (size_t i = 0; i < columns; ++i)
{
if (isInteger(types[i]))
if (types[i]->shouldAlignRightInPrettyFormats())
writeString(right_alignment, out);
else if (isString(types[i]))
writeString(left_alignment, out);
else
writeString(central_alignment, out);
writeString(left_alignment, out);
}
writeChar('\n', out);
}

View File

@ -214,8 +214,8 @@ IProcessor::Status AggregatingInOrderTransform::prepare()
{
output.push(std::move(to_push_chunk));
output.finish();
LOG_TRACE(log, "Aggregated. {} to {} rows (from {})", src_rows, res_rows,
formatReadableSizeWithBinarySuffix(src_bytes));
LOG_DEBUG(log, "Aggregated. {} to {} rows (from {})",
src_rows, res_rows, formatReadableSizeWithBinarySuffix(src_bytes));
return Status::Finished;
}
if (input.isFinished())

View File

@ -541,7 +541,7 @@ void AggregatingTransform::initGenerate()
double elapsed_seconds = watch.elapsedSeconds();
size_t rows = variants.sizeWithoutOverflowRow();
LOG_TRACE(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
LOG_DEBUG(log, "Aggregated. {} to {} rows (from {}) in {} sec. ({} rows/sec., {}/sec.)",
src_rows, rows, ReadableSize(src_bytes),
elapsed_seconds, src_rows / elapsed_seconds,
ReadableSize(src_bytes / elapsed_seconds));
@ -599,7 +599,7 @@ void AggregatingTransform::initGenerate()
pipe = Pipe::unitePipes(std::move(pipes));
}
LOG_TRACE(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed));
LOG_DEBUG(log, "Will merge {} temporary files of size {} compressed, {} uncompressed.", files.files.size(), ReadableSize(files.sum_size_compressed), ReadableSize(files.sum_size_uncompressed));
addMergingAggregatedMemoryEfficientTransform(pipe, params, temporary_data_merge_threads);

View File

@ -52,7 +52,7 @@ Chunk MergingAggregatedTransform::generate()
if (!generate_started)
{
generate_started = true;
LOG_TRACE(log, "Read {} blocks of partially aggregated data, total {} rows.", total_input_blocks, total_input_rows);
LOG_DEBUG(log, "Read {} blocks of partially aggregated data, total {} rows.", total_input_blocks, total_input_rows);
/// Exception safety. Make iterator valid in case any method below throws.
next_block = blocks.begin();

View File

@ -535,7 +535,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
ReadBufferFromFile in(file_path);
const auto & distributed_header = readDistributedHeader(in, log);
LOG_TRACE(log, "Started processing `{}` ({} rows, {} bytes)", file_path,
LOG_DEBUG(log, "Started processing `{}` ({} rows, {} bytes)", file_path,
formatReadableQuantity(distributed_header.rows),
formatReadableSizeWithBinarySuffix(distributed_header.bytes));
@ -631,7 +631,7 @@ struct StorageDistributedDirectoryMonitor::Batch
Stopwatch watch;
LOG_TRACE(parent.log, "Sending a batch of {} files ({} rows, {} bytes).", file_indices.size(),
LOG_DEBUG(parent.log, "Sending a batch of {} files ({} rows, {} bytes).", file_indices.size(),
formatReadableQuantity(total_rows),
formatReadableSizeWithBinarySuffix(total_bytes));
@ -876,7 +876,7 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
if (!total_rows || !header)
{
LOG_TRACE(log, "Processing batch {} with old format (no header/rows)", in.getFileName());
LOG_DEBUG(log, "Processing batch {} with old format (no header/rows)", in.getFileName());
CompressedReadBuffer decompressing_in(in);
NativeBlockInputStream block_in(decompressing_in, DBMS_TCP_PROTOCOL_VERSION);

View File

@ -2563,7 +2563,7 @@ void MergeTreeData::delayInsertOrThrowIfNeeded(Poco::Event * until) const
ProfileEvents::increment(ProfileEvents::RejectedInserts);
throw Exception(
ErrorCodes::TOO_MANY_PARTS,
"Too many parts ({}). Parts cleaning are processing significantly slower than inserts",
"Too many parts ({}). Merges are processing significantly slower than inserts",
parts_count_in_partition);
}
@ -2909,7 +2909,12 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
throw Exception("Cannot move parts because moves are manually disabled", ErrorCodes::ABORTED);
}
void MergeTreeData::fetchPartition(const ASTPtr & /*partition*/, const StorageMetadataPtr & /*metadata_snapshot*/, const String & /*from*/, ContextPtr /*query_context*/)
void MergeTreeData::fetchPartition(
const ASTPtr & /*partition*/,
const StorageMetadataPtr & /*metadata_snapshot*/,
const String & /*from*/,
bool /*fetch_part*/,
ContextPtr /*query_context*/)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "FETCH PARTITION is not supported by storage {}", getName());
}
@ -2972,7 +2977,7 @@ Pipe MergeTreeData::alterPartition(
break;
case PartitionCommand::FETCH_PARTITION:
fetchPartition(command.partition, metadata_snapshot, command.from_zookeeper_path, query_context);
fetchPartition(command.partition, metadata_snapshot, command.from_zookeeper_path, command.part, query_context);
break;
case PartitionCommand::FREEZE_PARTITION:

View File

@ -970,7 +970,12 @@ protected:
virtual void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr context) = 0;
/// Makes sense only for replicated tables
virtual void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, ContextPtr query_context);
virtual void fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,
const String & from,
bool fetch_part,
ContextPtr query_context);
void writePartLog(
PartLogElement::Type type,

View File

@ -1054,7 +1054,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreams(
false);
/// Let's estimate total number of rows for progress bar.
LOG_TRACE(log, "Reading approx. {} rows with {} streams", total_rows, num_streams);
LOG_DEBUG(log, "Reading approx. {} rows with {} streams", total_rows, num_streams);
for (size_t i = 0; i < num_streams; ++i)
{
@ -1576,7 +1576,7 @@ QueryPlanPtr MergeTreeDataSelectExecutor::spreadMarkRangesAmongStreamsFinal(
settings.preferred_block_size_bytes,
false);
LOG_TRACE(log, "Reading approx. {} rows with {} streams", total_rows_in_lonely_parts, num_streams_for_lonely_parts);
LOG_DEBUG(log, "Reading approx. {} rows with {} streams", total_rows_in_lonely_parts, num_streams_for_lonely_parts);
for (size_t i = 0; i < num_streams_for_lonely_parts; ++i)
{

View File

@ -182,7 +182,7 @@ bool MergeTreePartsMover::selectPartsForMove(
if (!parts_to_move.empty())
{
LOG_TRACE(log, "Selected {} parts to move according to storage policy rules and {} parts according to TTL rules, {} total", parts_to_move_by_policy_rules, parts_to_move_by_ttl_rules, ReadableSize(parts_to_move_total_size_bytes));
LOG_DEBUG(log, "Selected {} parts to move according to storage policy rules and {} parts according to TTL rules, {} total", parts_to_move_by_policy_rules, parts_to_move_by_ttl_rules, ReadableSize(parts_to_move_total_size_bytes));
return true;
}
else

View File

@ -47,7 +47,7 @@ MergeTreeReverseSelectProcessor::MergeTreeReverseSelectProcessor(
size_t total_rows = data_part->index_granularity.getRowsCountInRanges(all_mark_ranges);
if (!quiet)
LOG_TRACE(log, "Reading {} ranges in reverse order from part {}, approx. {} rows starting from {}",
LOG_DEBUG(log, "Reading {} ranges in reverse order from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows,
data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));

View File

@ -47,7 +47,7 @@ MergeTreeSelectProcessor::MergeTreeSelectProcessor(
size_t total_rows = data_part->index_granularity.getRowsCountInRanges(all_mark_ranges);
if (!quiet)
LOG_TRACE(log, "Reading {} ranges from part {}, approx. {} rows starting from {}",
LOG_DEBUG(log, "Reading {} ranges from part {}, approx. {} rows starting from {}",
all_mark_ranges.size(), data_part->name, total_rows,
data_part->index_granularity.getMarkStartingRow(all_mark_ranges.front().begin));

View File

@ -29,10 +29,10 @@ MergeTreeSequentialSource::MergeTreeSequentialSource(
{
/// Print column name but don't pollute logs in case of many columns.
if (columns_to_read.size() == 1)
LOG_TRACE(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part, column {}",
LOG_DEBUG(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part, column {}",
data_part->getMarksCount(), data_part->name, data_part->rows_count, columns_to_read.front());
else
LOG_TRACE(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part",
LOG_DEBUG(log, "Reading {} marks from part {}, total {} rows starting from the beginning of the part",
data_part->getMarksCount(), data_part->name, data_part->rows_count);
}

View File

@ -342,6 +342,15 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
timed_blocks.begin(), timed_blocks.end(), block_threshold, NodeWithStat::greaterByTime);
auto first_outdated_block = std::min(first_outdated_block_fixed_threshold, first_outdated_block_time_threshold);
auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
if (!num_nodes_to_delete)
return;
auto last_outdated_block = timed_blocks.end() - 1;
LOG_TRACE(log, "Will clear {} old blocks from {} (ctime {}) to {} (ctime {})", num_nodes_to_delete,
first_outdated_block->node, first_outdated_block->ctime,
last_outdated_block->node, last_outdated_block->ctime);
zkutil::AsyncResponses<Coordination::RemoveResponse> try_remove_futures;
for (auto it = first_outdated_block; it != timed_blocks.end(); ++it)
{
@ -372,9 +381,7 @@ void ReplicatedMergeTreeCleanupThread::clearOldBlocks()
first_outdated_block++;
}
auto num_nodes_to_delete = timed_blocks.end() - first_outdated_block;
if (num_nodes_to_delete)
LOG_TRACE(log, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete);
LOG_TRACE(log, "Cleared {} old blocks from ZooKeeper", num_nodes_to_delete);
}

View File

@ -82,6 +82,7 @@ std::optional<PartitionCommand> PartitionCommand::parse(const ASTAlterCommand *
res.type = FETCH_PARTITION;
res.partition = command_ast->partition;
res.from_zookeeper_path = command_ast->from;
res.part = command_ast->part;
return res;
}
else if (command_ast->type == ASTAlterCommand::FREEZE_PARTITION)
@ -140,7 +141,10 @@ std::string PartitionCommand::typeToString() const
else
return "DROP DETACHED PARTITION";
case PartitionCommand::Type::FETCH_PARTITION:
return "FETCH PARTITION";
if (part)
return "FETCH PART";
else
return "FETCH PARTITION";
case PartitionCommand::Type::FREEZE_ALL_PARTITIONS:
return "FREEZE ALL";
case PartitionCommand::Type::FREEZE_PARTITION:

View File

@ -40,6 +40,9 @@ namespace ProfileEvents
extern const Event StorageBufferPassedTimeMaxThreshold;
extern const Event StorageBufferPassedRowsMaxThreshold;
extern const Event StorageBufferPassedBytesMaxThreshold;
extern const Event StorageBufferPassedTimeFlushThreshold;
extern const Event StorageBufferPassedRowsFlushThreshold;
extern const Event StorageBufferPassedBytesFlushThreshold;
extern const Event StorageBufferLayerLockReadersWaitMilliseconds;
extern const Event StorageBufferLayerLockWritersWaitMilliseconds;
}
@ -103,6 +106,7 @@ StorageBuffer::StorageBuffer(
size_t num_shards_,
const Thresholds & min_thresholds_,
const Thresholds & max_thresholds_,
const Thresholds & flush_thresholds_,
const StorageID & destination_id_,
bool allow_materialized_)
: IStorage(table_id_)
@ -110,6 +114,7 @@ StorageBuffer::StorageBuffer(
, num_shards(num_shards_), buffers(num_shards_)
, min_thresholds(min_thresholds_)
, max_thresholds(max_thresholds_)
, flush_thresholds(flush_thresholds_)
, destination_id(destination_id_)
, allow_materialized(allow_materialized_)
, log(&Poco::Logger::get("StorageBuffer (" + table_id_.getFullTableName() + ")"))
@ -542,7 +547,7 @@ public:
{
if (storage.destination_id)
{
LOG_TRACE(storage.log, "Writing block with {} rows, {} bytes directly.", rows, bytes);
LOG_DEBUG(storage.log, "Writing block with {} rows, {} bytes directly.", rows, bytes);
storage.writeBlockToDestination(block, destination);
}
return;
@ -602,7 +607,7 @@ private:
{
buffer.data = sorted_block.cloneEmpty();
}
else if (storage.checkThresholds(buffer, current_time, sorted_block.rows(), sorted_block.bytes()))
else if (storage.checkThresholds(buffer, /* direct= */true, current_time, sorted_block.rows(), sorted_block.bytes()))
{
/** If, after inserting the buffer, the constraints are exceeded, then we will reset the buffer.
* This also protects against unlimited consumption of RAM, since if it is impossible to write to the table,
@ -713,7 +718,7 @@ bool StorageBuffer::supportsPrewhere() const
return false;
}
bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows, size_t additional_bytes) const
bool StorageBuffer::checkThresholds(const Buffer & buffer, bool direct, time_t current_time, size_t additional_rows, size_t additional_bytes) const
{
time_t time_passed = 0;
if (buffer.first_write_time)
@ -722,11 +727,11 @@ bool StorageBuffer::checkThresholds(const Buffer & buffer, time_t current_time,
size_t rows = buffer.data.rows() + additional_rows;
size_t bytes = buffer.data.bytes() + additional_bytes;
return checkThresholdsImpl(rows, bytes, time_passed);
return checkThresholdsImpl(direct, rows, bytes, time_passed);
}
bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const
bool StorageBuffer::checkThresholdsImpl(bool direct, size_t rows, size_t bytes, time_t time_passed) const
{
if (time_passed > min_thresholds.time && rows > min_thresholds.rows && bytes > min_thresholds.bytes)
{
@ -752,6 +757,27 @@ bool StorageBuffer::checkThresholdsImpl(size_t rows, size_t bytes, time_t time_p
return true;
}
if (!direct)
{
if (flush_thresholds.time && time_passed > flush_thresholds.time)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedTimeFlushThreshold);
return true;
}
if (flush_thresholds.rows && rows > flush_thresholds.rows)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedRowsFlushThreshold);
return true;
}
if (flush_thresholds.bytes && bytes > flush_thresholds.bytes)
{
ProfileEvents::increment(ProfileEvents::StorageBufferPassedBytesFlushThreshold);
return true;
}
}
return false;
}
@ -785,7 +811,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
if (check_thresholds)
{
if (!checkThresholdsImpl(rows, bytes, time_passed))
if (!checkThresholdsImpl(/* direct= */false, rows, bytes, time_passed))
return;
}
else
@ -804,7 +830,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
if (!destination_id)
{
LOG_TRACE(log, "Flushing buffer with {} rows (discarded), {} bytes, age {} seconds {}.", rows, bytes, time_passed, (check_thresholds ? "(bg)" : "(direct)"));
LOG_DEBUG(log, "Flushing buffer with {} rows (discarded), {} bytes, age {} seconds {}.", rows, bytes, time_passed, (check_thresholds ? "(bg)" : "(direct)"));
return;
}
@ -841,7 +867,7 @@ void StorageBuffer::flushBuffer(Buffer & buffer, bool check_thresholds, bool loc
}
UInt64 milliseconds = watch.elapsedMilliseconds();
LOG_TRACE(log, "Flushing buffer with {} rows, {} bytes, age {} seconds, took {} ms {}.", rows, bytes, time_passed, milliseconds, (check_thresholds ? "(bg)" : "(direct)"));
LOG_DEBUG(log, "Flushing buffer with {} rows, {} bytes, age {} seconds, took {} ms {}.", rows, bytes, time_passed, milliseconds, (check_thresholds ? "(bg)" : "(direct)"));
}
@ -1040,16 +1066,17 @@ void registerStorageBuffer(StorageFactory & factory)
*
* db, table - in which table to put data from buffer.
* num_buckets - level of parallelism.
* min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer.
* min_time, max_time, min_rows, max_rows, min_bytes, max_bytes - conditions for flushing the buffer,
* flush_time, flush_rows, flush_bytes - conditions for flushing.
*/
factory.registerStorage("Buffer", [](const StorageFactory::Arguments & args)
{
ASTs & engine_args = args.engine_args;
if (engine_args.size() != 9)
throw Exception("Storage Buffer requires 9 parameters: "
" destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes.",
if (engine_args.size() < 9 || engine_args.size() > 12)
throw Exception("Storage Buffer requires from 9 to 12 parameters: "
" destination_database, destination_table, num_buckets, min_time, max_time, min_rows, max_rows, min_bytes, max_bytes[, flush_time, flush_rows, flush_bytes].",
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
// Table and database name arguments accept expressions, evaluate them.
@ -1058,7 +1085,7 @@ void registerStorageBuffer(StorageFactory & factory)
// After we evaluated all expressions, check that all arguments are
// literals.
for (size_t i = 0; i < 9; i++)
for (size_t i = 0; i < engine_args.size(); i++)
{
if (!typeid_cast<ASTLiteral *>(engine_args[i].get()))
{
@ -1068,17 +1095,29 @@ void registerStorageBuffer(StorageFactory & factory)
}
}
String destination_database = engine_args[0]->as<ASTLiteral &>().value.safeGet<String>();
String destination_table = engine_args[1]->as<ASTLiteral &>().value.safeGet<String>();
size_t i = 0;
UInt64 num_buckets = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[2]->as<ASTLiteral &>().value);
String destination_database = engine_args[i++]->as<ASTLiteral &>().value.safeGet<String>();
String destination_table = engine_args[i++]->as<ASTLiteral &>().value.safeGet<String>();
Int64 min_time = applyVisitor(FieldVisitorConvertToNumber<Int64>(), engine_args[3]->as<ASTLiteral &>().value);
Int64 max_time = applyVisitor(FieldVisitorConvertToNumber<Int64>(), engine_args[4]->as<ASTLiteral &>().value);
UInt64 min_rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[5]->as<ASTLiteral &>().value);
UInt64 max_rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[6]->as<ASTLiteral &>().value);
UInt64 min_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[7]->as<ASTLiteral &>().value);
UInt64 max_bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[8]->as<ASTLiteral &>().value);
UInt64 num_buckets = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
StorageBuffer::Thresholds min;
StorageBuffer::Thresholds max;
StorageBuffer::Thresholds flush;
min.time = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
max.time = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
min.rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
max.rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
min.bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
max.bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
if (engine_args.size() > i)
flush.time = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
if (engine_args.size() > i)
flush.rows = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
if (engine_args.size() > i)
flush.bytes = applyVisitor(FieldVisitorConvertToNumber<UInt64>(), engine_args[i++]->as<ASTLiteral &>().value);
/// If destination_id is not set, do not write data from the buffer, but simply empty the buffer.
StorageID destination_id = StorageID::createEmpty();
@ -1094,8 +1133,7 @@ void registerStorageBuffer(StorageFactory & factory)
args.constraints,
args.getContext(),
num_buckets,
StorageBuffer::Thresholds{min_time, min_rows, min_bytes},
StorageBuffer::Thresholds{max_time, max_rows, max_bytes},
min, max, flush,
destination_id,
static_cast<bool>(args.getLocalContext()->getSettingsRef().insert_allow_materialized_columns));
},

View File

@ -35,6 +35,10 @@ namespace DB
* Thresholds can be exceeded. For example, if max_rows = 1 000 000, the buffer already had 500 000 rows,
* and a part of 800 000 rows is added, then there will be 1 300 000 rows in the buffer, and then such a block will be written to the subordinate table.
*
* There are also separate thresholds for flush, those thresholds are checked only for non-direct flush.
* This maybe useful if you do not want to add extra latency for INSERT queries,
* so you can set max_rows=1e6 and flush_rows=500e3, then each 500e3 rows buffer will be flushed in background only.
*
* When you destroy a Buffer table, all remaining data is flushed to the subordinate table.
* The data in the buffer is not replicated, not logged to disk, not indexed. With a rough restart of the server, the data is lost.
*/
@ -45,12 +49,11 @@ friend class BufferSource;
friend class BufferBlockOutputStream;
public:
/// Thresholds.
struct Thresholds
{
time_t time; /// The number of seconds from the insertion of the first row into the block.
size_t rows; /// The number of rows in the block.
size_t bytes; /// The number of (uncompressed) bytes in the block.
time_t time = 0; /// The number of seconds from the insertion of the first row into the block.
size_t rows = 0; /// The number of rows in the block.
size_t bytes = 0; /// The number of (uncompressed) bytes in the block.
};
std::string getName() const override { return "Buffer"; }
@ -135,6 +138,7 @@ private:
const Thresholds min_thresholds;
const Thresholds max_thresholds;
const Thresholds flush_thresholds;
StorageID destination_id;
bool allow_materialized;
@ -153,8 +157,8 @@ private:
/// are exceeded. If reset_block_structure is set - clears inner block
/// structure inside buffer (useful in OPTIMIZE and ALTER).
void flushBuffer(Buffer & buffer, bool check_thresholds, bool locked = false, bool reset_block_structure = false);
bool checkThresholds(const Buffer & buffer, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
bool checkThresholdsImpl(size_t rows, size_t bytes, time_t time_passed) const;
bool checkThresholds(const Buffer & buffer, bool direct, time_t current_time, size_t additional_rows = 0, size_t additional_bytes = 0) const;
bool checkThresholdsImpl(bool direct, size_t rows, size_t bytes, time_t time_passed) const;
/// `table` argument is passed, as it is sometimes evaluated beforehand. It must match the `destination`.
void writeBlockToDestination(const Block & block, StoragePtr table);
@ -177,6 +181,7 @@ protected:
size_t num_shards_,
const Thresholds & min_thresholds_,
const Thresholds & max_thresholds_,
const Thresholds & flush_thresholds_,
const StorageID & destination_id,
bool allow_materialized_);
};

View File

@ -130,6 +130,7 @@ namespace ErrorCodes
extern const int UNKNOWN_POLICY;
extern const int NO_SUCH_DATA_PART;
extern const int INTERSERVER_SCHEME_DOESNT_MATCH;
extern const int DUPLICATE_DATA_PART;
}
namespace ActionLocks
@ -5356,11 +5357,11 @@ void StorageReplicatedMergeTree::getReplicaDelays(time_t & out_absolute_delay, t
}
}
void StorageReplicatedMergeTree::fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,
const String & from_,
bool fetch_part,
ContextPtr query_context)
{
Macros::MacroExpansionInfo info;
@ -5373,40 +5374,54 @@ void StorageReplicatedMergeTree::fetchPartition(
if (from.empty())
throw Exception("ZooKeeper path should not be empty", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
String partition_id = getPartitionIDFromQuery(partition, query_context);
zkutil::ZooKeeperPtr zookeeper;
if (auxiliary_zookeeper_name != default_zookeeper_name)
{
zookeeper = getContext()->getAuxiliaryZooKeeper(auxiliary_zookeeper_name);
LOG_INFO(log, "Will fetch partition {} from shard {} (auxiliary zookeeper '{}')", partition_id, from_, auxiliary_zookeeper_name);
}
else
{
zookeeper = getZooKeeper();
LOG_INFO(log, "Will fetch partition {} from shard {}", partition_id, from_);
}
if (from.back() == '/')
from.resize(from.size() - 1);
if (fetch_part)
{
String part_name = partition->as<ASTLiteral &>().value.safeGet<String>();
auto part_path = findReplicaHavingPart(part_name, from, zookeeper);
if (part_path.empty())
throw Exception(ErrorCodes::NO_REPLICA_HAS_PART, "Part {} does not exist on any replica", part_name);
/** Let's check that there is no such part in the `detached` directory (where we will write the downloaded parts).
* Unreliable (there is a race condition) - such a part may appear a little later.
*/
if (checkIfDetachedPartExists(part_name))
throw Exception(ErrorCodes::DUPLICATE_DATA_PART, "Detached part " + part_name + " already exists.");
LOG_INFO(log, "Will fetch part {} from shard {} (zookeeper '{}')", part_name, from_, auxiliary_zookeeper_name);
try
{
/// part name , metadata, part_path , true, 0, zookeeper
if (!fetchPart(part_name, metadata_snapshot, part_path, true, 0, zookeeper))
throw Exception(ErrorCodes::UNFINISHED, "Failed to fetch part {} from {}", part_name, from_);
}
catch (const DB::Exception & e)
{
if (e.code() != ErrorCodes::RECEIVED_ERROR_FROM_REMOTE_IO_SERVER && e.code() != ErrorCodes::RECEIVED_ERROR_TOO_MANY_REQUESTS
&& e.code() != ErrorCodes::CANNOT_READ_ALL_DATA)
throw;
LOG_INFO(log, e.displayText());
}
return;
}
String partition_id = getPartitionIDFromQuery(partition, query_context);
LOG_INFO(log, "Will fetch partition {} from shard {} (zookeeper '{}')", partition_id, from_, auxiliary_zookeeper_name);
/** Let's check that there is no such partition in the `detached` directory (where we will write the downloaded parts).
* Unreliable (there is a race condition) - such a partition may appear a little later.
*/
Poco::DirectoryIterator dir_end;
for (const std::string & path : getDataPaths())
{
for (Poco::DirectoryIterator dir_it{path + "detached/"}; dir_it != dir_end; ++dir_it)
{
MergeTreePartInfo part_info;
if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, format_version)
&& part_info.partition_id == partition_id)
throw Exception("Detached partition " + partition_id + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
}
}
if (checkIfDetachedPartitionExists(partition_id))
throw Exception("Detached partition " + partition_id + " already exists.", ErrorCodes::PARTITION_ALREADY_EXISTS);
zkutil::Strings replicas;
zkutil::Strings active_replicas;
@ -6913,4 +6928,46 @@ String StorageReplicatedMergeTree::getSharedDataReplica(
return best_replica;
}
String StorageReplicatedMergeTree::findReplicaHavingPart(
const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_)
{
Strings replicas = zookeeper_->getChildren(zookeeper_path_ + "/replicas");
/// Select replicas in uniformly random order.
std::shuffle(replicas.begin(), replicas.end(), thread_local_rng);
for (const String & replica : replicas)
{
if (zookeeper_->exists(zookeeper_path_ + "/replicas/" + replica + "/parts/" + part_name)
&& zookeeper_->exists(zookeeper_path_ + "/replicas/" + replica + "/is_active"))
return zookeeper_path_ + "/replicas/" + replica;
}
return {};
}
bool StorageReplicatedMergeTree::checkIfDetachedPartExists(const String & part_name)
{
Poco::DirectoryIterator dir_end;
for (const std::string & path : getDataPaths())
for (Poco::DirectoryIterator dir_it{path + "detached/"}; dir_it != dir_end; ++dir_it)
if (dir_it.name() == part_name)
return true;
return false;
}
bool StorageReplicatedMergeTree::checkIfDetachedPartitionExists(const String & partition_name)
{
Poco::DirectoryIterator dir_end;
for (const std::string & path : getDataPaths())
{
for (Poco::DirectoryIterator dir_it{path + "detached/"}; dir_it != dir_end; ++dir_it)
{
MergeTreePartInfo part_info;
if (MergeTreePartInfo::tryParsePartName(dir_it.name(), &part_info, format_version) && part_info.partition_id == partition_name)
return true;
}
}
return false;
}
}

View File

@ -522,8 +522,11 @@ private:
/** Returns an empty string if no one has a part.
*/
String findReplicaHavingPart(const String & part_name, bool active);
static String findReplicaHavingPart(const String & part_name, const String & zookeeper_path_, zkutil::ZooKeeper::Ptr zookeeper_);
bool checkReplicaHavePart(const String & replica, const String & part_name);
bool checkIfDetachedPartExists(const String & part_name);
bool checkIfDetachedPartitionExists(const String & partition_name);
/** Find replica having specified part or any part that covers it.
* If active = true, consider only active replicas.
@ -626,7 +629,12 @@ private:
PartitionCommandsResultInfo attachPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, bool part, ContextPtr query_context) override;
void replacePartitionFrom(const StoragePtr & source_table, const ASTPtr & partition, bool replace, ContextPtr query_context) override;
void movePartitionToTable(const StoragePtr & dest_table, const ASTPtr & partition, ContextPtr query_context) override;
void fetchPartition(const ASTPtr & partition, const StorageMetadataPtr & metadata_snapshot, const String & from, ContextPtr query_context) override;
void fetchPartition(
const ASTPtr & partition,
const StorageMetadataPtr & metadata_snapshot,
const String & from,
bool fetch_part,
ContextPtr query_context) override;
/// Check granularity of already existing replicated table in zookeeper if it exists
/// return true if it's fixed

View File

@ -14,6 +14,8 @@
#include <Storages/StorageURL.h>
#include <Storages/transformQueryForExternalDatabase.h>
#include <common/logger_useful.h>
#include <Common/escapeForFileName.h>
namespace DB
{
@ -53,24 +55,18 @@ std::string StorageXDBC::getReadMethod() const
}
std::vector<std::pair<std::string, std::string>> StorageXDBC::getReadURIParams(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const Names & /* column_names */,
const StorageMetadataPtr & /* metadata_snapshot */,
const SelectQueryInfo & /*query_info*/,
ContextPtr /*context*/,
QueryProcessingStage::Enum & /*processed_stage*/,
size_t max_block_size) const
{
NamesAndTypesList cols;
for (const String & name : column_names)
{
auto column_data = metadata_snapshot->getColumns().getPhysical(name);
cols.emplace_back(column_data.name, column_data.type);
}
return bridge_helper->getURLParams(cols.toString(), max_block_size);
return bridge_helper->getURLParams(max_block_size);
}
std::function<void(std::ostream &)> StorageXDBC::getReadPOSTDataCallback(
const Names & /*column_names*/,
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
const SelectQueryInfo & query_info,
ContextPtr local_context,
@ -84,7 +80,21 @@ std::function<void(std::ostream &)> StorageXDBC::getReadPOSTDataCallback(
remote_table_name,
local_context);
return [query](std::ostream & os) { os << "query=" << query; };
NamesAndTypesList cols;
for (const String & name : column_names)
{
auto column_data = metadata_snapshot->getColumns().getPhysical(name);
cols.emplace_back(column_data.name, column_data.type);
}
auto write_body_callback = [query, cols](std::ostream & os)
{
os << "sample_block=" << escapeForFileName(cols.toString());
os << "&";
os << "query=" << escapeForFileName(query);
};
return write_body_callback;
}
Pipe StorageXDBC::read(
@ -106,20 +116,17 @@ BlockOutputStreamPtr StorageXDBC::write(const ASTPtr & /*query*/, const StorageM
{
bridge_helper->startBridgeSync();
NamesAndTypesList cols;
Poco::URI request_uri = uri;
request_uri.setPath("/write");
for (const String & name : metadata_snapshot->getSampleBlock().getNames())
{
auto column_data = metadata_snapshot->getColumns().getPhysical(name);
cols.emplace_back(column_data.name, column_data.type);
}
auto url_params = bridge_helper->getURLParams(cols.toString(), 65536);
auto url_params = bridge_helper->getURLParams(65536);
for (const auto & [param, value] : url_params)
request_uri.addQueryParameter(param, value);
request_uri.addQueryParameter("db_name", remote_database_name);
request_uri.addQueryParameter("table_name", remote_table_name);
request_uri.addQueryParameter("format_name", format_name);
request_uri.addQueryParameter("sample_block", metadata_snapshot->getSampleBlock().getNamesAndTypesList().toString());
return std::make_shared<StorageURLBlockOutputStream>(
request_uri,

View File

@ -25,7 +25,7 @@ ColumnsDescription parseColumnsListFromString(const std::string & structure, Con
if (!columns_list)
throw Exception("Could not cast AST to ASTExpressionList", ErrorCodes::LOGICAL_ERROR);
return InterpreterCreateQuery::getColumnsDescription(*columns_list, context, !settings.allow_suspicious_codecs);
return InterpreterCreateQuery::getColumnsDescription(*columns_list, context, false);
}
}

View File

@ -156,6 +156,7 @@
"extractURLParameterNames"
"extractURLParameters"
"FETCH PARTITION"
"FETCH PART"
"FINAL"
"FIRST"
"firstSignificantSubdomain"

View File

@ -1,5 +1,3 @@
import pytest
from helpers.client import QueryRuntimeException
from helpers.cluster import ClickHouseCluster
@ -18,23 +16,33 @@ def start_cluster():
cluster.shutdown()
def test_fetch_part_from_allowed_zookeeper(start_cluster):
@pytest.mark.parametrize(
('part', 'date', 'part_name'),
[
('PARTITION', '2020-08-27', '2020-08-27'),
('PART', '2020-08-28', '20200828_0_0_0'),
]
)
def test_fetch_part_from_allowed_zookeeper(start_cluster, part, date, part_name):
node.query(
"CREATE TABLE simple (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', 'node') ORDER BY tuple() PARTITION BY date;"
"CREATE TABLE IF NOT EXISTS simple (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/0/simple', 'node') ORDER BY tuple() PARTITION BY date;"
)
node.query("INSERT INTO simple VALUES ('2020-08-27', 1)")
node.query("""INSERT INTO simple VALUES ('{date}', 1)""".format(date=date))
node.query(
"CREATE TABLE simple2 (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/1/simple', 'node') ORDER BY tuple() PARTITION BY date;"
"CREATE TABLE IF NOT EXISTS simple2 (date Date, id UInt32) ENGINE = ReplicatedMergeTree('/clickhouse/tables/1/simple', 'node') ORDER BY tuple() PARTITION BY date;"
)
node.query(
"ALTER TABLE simple2 FETCH PARTITION '2020-08-27' FROM 'zookeeper2:/clickhouse/tables/0/simple';"
)
node.query("ALTER TABLE simple2 ATTACH PARTITION '2020-08-27';")
"""ALTER TABLE simple2 FETCH {part} '{part_name}' FROM 'zookeeper2:/clickhouse/tables/0/simple';""".format(
part=part, part_name=part_name))
node.query("""ALTER TABLE simple2 ATTACH {part} '{part_name}';""".format(part=part, part_name=part_name))
with pytest.raises(QueryRuntimeException):
node.query(
"ALTER TABLE simple2 FETCH PARTITION '2020-08-27' FROM 'zookeeper:/clickhouse/tables/0/simple';"
)
"""ALTER TABLE simple2 FETCH {part} '{part_name}' FROM 'zookeeper:/clickhouse/tables/0/simple';""".format(
part=part, part_name=part_name))
assert node.query("SELECT id FROM simple2").strip() == "1"
assert node.query("""SELECT id FROM simple2 where date = '{date}'""".format(date=date)).strip() == "1"

View File

@ -505,3 +505,59 @@ def test_concurrent_queries(started_cluster):
node1.query('DROP TABLE test_pg_table;')
cursor.execute('DROP TABLE clickhouse.test_pg_table;')
def test_odbc_long_column_names(started_cluster):
conn = get_postgres_conn();
cursor = conn.cursor()
column_name = "column" * 8
create_table = "CREATE TABLE clickhouse.test_long_column_names ("
for i in range(1000):
if i != 0:
create_table += ", "
create_table += "{} integer".format(column_name + str(i))
create_table += ")"
cursor.execute(create_table)
insert = "INSERT INTO clickhouse.test_long_column_names SELECT i" + ", i" * 999 + " FROM generate_series(0, 99) as t(i)"
cursor.execute(insert)
conn.commit()
create_table = "CREATE TABLE test_long_column_names ("
for i in range(1000):
if i != 0:
create_table += ", "
create_table += "{} UInt32".format(column_name + str(i))
create_table += ") ENGINE=ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_long_column_names')"
result = node1.query(create_table);
result = node1.query('SELECT * FROM test_long_column_names');
expected = node1.query("SELECT number" + ", number" * 999 + " FROM numbers(100)")
assert(result == expected)
cursor.execute("DROP TABLE IF EXISTS clickhouse.test_long_column_names")
node1.query("DROP TABLE IF EXISTS test_long_column_names")
def test_odbc_long_text(started_cluster):
conn = get_postgres_conn()
cursor = conn.cursor()
cursor.execute("drop table if exists clickhouse.test_long_text")
cursor.execute("create table clickhouse.test_long_text(flen int, field1 text)");
# sample test from issue 9363
text_from_issue = """BEGIN These examples only show the order that data is arranged in. The values from different columns are stored separately, and data from the same column is stored together. Examples of a column-oriented DBMS: Vertica, Paraccel (Actian Matrix and Amazon Redshift), Sybase IQ, Exasol, Infobright, InfiniDB, MonetDB (VectorWise and Actian Vector), LucidDB, SAP HANA, Google Dremel, Google PowerDrill, Druid, and kdb+. Different orders for storing data are better suited to different scenarios. The data access scenario refers to what queries are made, how often, and in what proportion; how much data is read for each type of query rows, columns, and bytes; the relationship between reading and updating data; the working size of the data and how locally it is used; whether transactions are used, and how isolated they are; requirements for data replication and logical integrity; requirements for latency and throughput for each type of query, and so on. The higher the load on the system, the more important it is to customize the system set up to match the requirements of the usage scenario, and the more fine grained this customization becomes. There is no system that is equally well-suited to significantly different scenarios. If a system is adaptable to a wide set of scenarios, under a high load, the system will handle all the scenarios equally poorly, or will work well for just one or few of possible scenarios. Key Properties of OLAP Scenario¶ The vast majority of requests are for read access. Data is updated in fairly large batches (> 1000 rows), not by single rows; or it is not updated at all. Data is added to the DB but is not modified. For reads, quite a large number of rows are extracted from the DB, but only a small subset of columns. Tables are "wide," meaning they contain a large number of columns. Queries are relatively rare (usually hundreds of queries per server or less per second). For simple queries, latencies around 50 ms are allowed. Column values are fairly small: numbers and short strings (for example, 60 bytes per URL). Requires high throughput when processing a single query (up to billions of rows per second per server). Transactions are not necessary. Low requirements for data consistency. There is one large table per query. All tables are small, except for one. A query result is significantly smaller than the source data. In other words, data is filtered or aggregated, so the result fits in a single server"s RAM. It is easy to see that the OLAP scenario is very different from other popular scenarios (such as OLTP or Key-Value access). So it doesn"t make sense to try to use OLTP or a Key-Value DB for processing analytical queries if you want to get decent performance. For example, if you try to use MongoDB or Redis for analytics, you will get very poor performance compared to OLAP databases. Why Column-Oriented Databases Work Better in the OLAP Scenario¶ Column-oriented databases are better suited to OLAP scenarios: they are at least 100 times faster in processing most queries. The reasons are explained in detail below, but the fact is easier to demonstrate visually. END"""
cursor.execute("""insert into clickhouse.test_long_text (flen, field1) values (3248, '{}')""".format(text_from_issue));
node1.query('''
DROP TABLE IF EXISTS test_long_test;
CREATE TABLE test_long_text (flen UInt32, field1 String)
ENGINE = ODBC('DSN=postgresql_odbc; Servername=postgre-sql.local', 'clickhouse', 'test_long_text')''')
result = node1.query("select field1 from test_long_text;")
assert(result.strip() == text_from_issue)
long_text = "text" * 1000000
cursor.execute("""insert into clickhouse.test_long_text (flen, field1) values (400000, '{}')""".format(long_text));
result = node1.query("select field1 from test_long_text where flen=400000;")
assert(result.strip() == long_text)

View File

@ -26,6 +26,7 @@
<disk>s31</disk>
</external>
</volumes>
<move_factor>0.0</move_factor>
</hybrid>
</policies>
</storage_configuration>

View File

@ -36,6 +36,15 @@ def get_large_objects_count(cluster, size=100):
return counter
def wait_for_large_objects_count(cluster, expected, size=100, timeout=30):
while timeout > 0:
if get_large_objects_count(cluster, size) == expected:
return
timeout -= 1
time.sleep(1)
assert get_large_objects_count(cluster, size) == expected
@pytest.mark.parametrize(
"policy", ["s3"]
)
@ -67,23 +76,15 @@ def test_s3_zero_copy_replication(cluster, policy):
assert node1.query("SELECT * FROM s3_test order by id FORMAT Values") == "(0,'data'),(1,'data'),(2,'data'),(3,'data')"
# Based on version 20.x - two parts
assert get_large_objects_count(cluster) == 2
wait_for_large_objects_count(cluster, 2)
node1.query("OPTIMIZE TABLE s3_test")
time.sleep(1)
# Based on version 20.x - after merge, two old parts and one merged
assert get_large_objects_count(cluster) == 3
wait_for_large_objects_count(cluster, 3)
# Based on version 20.x - after cleanup - only one merged part
countdown = 60
while countdown > 0:
if get_large_objects_count(cluster) == 1:
break
time.sleep(1)
countdown -= 1
assert get_large_objects_count(cluster) == 1
wait_for_large_objects_count(cluster, 1, timeout=60)
node1.query("DROP TABLE IF EXISTS s3_test NO DELAY")
node2.query("DROP TABLE IF EXISTS s3_test NO DELAY")
@ -127,7 +128,7 @@ def test_s3_zero_copy_on_hybrid_storage(cluster):
assert node2.query("SELECT partition_id,disk_name FROM system.parts WHERE table='hybrid_test' FORMAT Values") == "('all','s31')"
# Check that after moving partition on node2 no new obects on s3
assert get_large_objects_count(cluster, 0) == s3_objects
wait_for_large_objects_count(cluster, s3_objects, size=0)
assert node1.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"
assert node2.query("SELECT * FROM hybrid_test ORDER BY id FORMAT Values") == "(0,'data'),(1,'data')"

View File

@ -0,0 +1,4 @@
<test>
<query>SELECT arrayFold(x, acc -> acc + 1, range(100000), toUInt64(0))</query> <!-- count -->
<query>SELECT arrayFold(x, acc -> acc + x, range(100000), toUInt64(0))</query> <!-- sum -->
</test>

View File

@ -55,14 +55,14 @@
INSERT INTO simple_key_direct_dictionary_source_table
SELECT number, number, toString(number), toDecimal64(number, 8), toString(number)
FROM system.numbers
LIMIT 100000;
LIMIT 50000;
</fill_query>
<fill_query>
INSERT INTO complex_key_direct_dictionary_source_table
SELECT number, toString(number), number, toString(number), toDecimal64(number, 8), toString(number)
FROM system.numbers
LIMIT 100000;
LIMIT 50000;
</fill_query>
<substitutions>
@ -79,47 +79,51 @@
<substitution>
<name>elements_count</name>
<values>
<value>25000</value>
<value>50000</value>
<value>75000</value>
<value>100000</value>
</values>
</substitution>
</substitutions>
<query>
SELECT dictGet('default.simple_key_direct_dictionary', {column_name}, number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictGet('default.simple_key_direct_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictGet('default.simple_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictGet('default.simple_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.simple_key_direct_dictionary', number)
WITH rand64() % toUInt64({elements_count}) as key
SELECT dictHas('default.simple_key_direct_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictGet('default.complex_key_direct_dictionary', {column_name}, (number, toString(number)))
WITH (number, toString(number)) as key
SELECT dictGet('default.complex_key_direct_dictionary', {column_name}, key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictGet('default.complex_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), (number, toString(number)))
WITH (number, toString(number)) as key
SELECT dictGet('default.complex_key_direct_dictionary', ('value_int', 'value_string', 'value_decimal', 'value_string_nullable'), key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;
</query>
<query>
SELECT dictHas('default.complex_key_direct_dictionary', (number, toString(number)))
WITH (number, toString(number)) as key
SELECT dictHas('default.complex_key_direct_dictionary', key)
FROM system.numbers
LIMIT {elements_count}
FORMAT Null;

Some files were not shown because too many files have changed in this diff Show More