Merge branch 'master' into ddl_queries_fixes

This commit is contained in:
Alexander Tokmakov 2020-10-13 19:52:56 +03:00
commit 0ca51ed266
453 changed files with 3409 additions and 2714 deletions

View File

@ -1,3 +1,5 @@
#pragma once
#include <string>
#include <common/types.h>

View File

@ -1,6 +1,9 @@
/// Original is here https://github.com/cerevra/int
#pragma once
/// Original is here https://github.com/cerevra/int
/// Distributed under the Boost Software License, Version 1.0.
/// (See at http://www.boost.org/LICENSE_1_0.txt)
#include "throwError.h"
namespace wide

View File

@ -1,3 +1,5 @@
#pragma once
#include <optional>
#include <string>
#include <Poco/AutoPtr.h>

View File

@ -1,9 +1,9 @@
# This strings autochanged from release_lib.sh:
SET(VERSION_REVISION 54441)
SET(VERSION_REVISION 54442)
SET(VERSION_MAJOR 20)
SET(VERSION_MINOR 10)
SET(VERSION_MINOR 11)
SET(VERSION_PATCH 1)
SET(VERSION_GITHASH 11a247d2f42010c1a17bf678c3e00a4bc89b23f8)
SET(VERSION_DESCRIBE v20.10.1.1-prestable)
SET(VERSION_STRING 20.10.1.1)
SET(VERSION_GITHASH 76a04fb4b4f6cd27ad999baf6dc9a25e88851c42)
SET(VERSION_DESCRIBE v20.11.1.1-prestable)
SET(VERSION_STRING 20.11.1.1)
# end of autochange

View File

@ -26,8 +26,8 @@ if (NOT USE_INTERNAL_ODBC_LIBRARY)
find_path (INCLUDE_ODBC sql.h)
if(LIBRARY_ODBC AND INCLUDE_ODBC)
add_library (unixodbc UNKNOWN IMPORTED)
set_target_properties (unixodbc PROPERTIES IMPORTED_LOCATION ${LIBRARY_ODBC})
add_library (unixodbc INTERFACE)
set_target_properties (unixodbc PROPERTIES INTERFACE_LINK_LIBRARIES ${LIBRARY_ODBC})
set_target_properties (unixodbc PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${INCLUDE_ODBC})
set_target_properties (unixodbc PROPERTIES INTERFACE_COMPILE_DEFINITIONS USE_ODBC=1)

View File

@ -58,7 +58,10 @@ if (COMPILER_CLANG)
add_warning(unused-exception-parameter)
add_warning(unused-macros)
add_warning(unused-member-function)
add_warning(zero-as-null-pointer-constant)
# XXX: libstdc++ has some of these for 3way compare
if (USE_LIBCXX)
add_warning(zero-as-null-pointer-constant)
endif()
if (WEVERYTHING)
add_warning(everything)
@ -89,6 +92,11 @@ if (COMPILER_CLANG)
no_warning(weak-template-vtables)
no_warning(weak-vtables)
# XXX: libstdc++ has some of these for 3way compare
if (NOT USE_LIBCXX)
no_warning(zero-as-null-pointer-constant)
endif()
# TODO Enable conversion, sign-conversion, double-promotion warnings.
endif ()
elseif (COMPILER_GCC)
@ -170,6 +178,11 @@ elseif (COMPILER_GCC)
add_cxx_compile_options(-Wunused)
# Warn if vector operation is not implemented via SIMD capabilities of the architecture
add_cxx_compile_options(-Wvector-operation-performance)
# XXX: libstdc++ has some of these for 3way compare
if (USE_LIBCXX)
# Warn when a literal 0 is used as null pointer constant.
add_cxx_compile_options(-Wzero-as-null-pointer-constant)
endif()
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER_EQUAL 10)
# XXX: gcc10 stuck with this option while compiling GatherUtils code

View File

@ -26,8 +26,8 @@ if (NOT USE_INTERNAL_HYPERSCAN_LIBRARY)
if (LIBRARY_HYPERSCAN AND INCLUDE_HYPERSCAN)
set (EXTERNAL_HYPERSCAN_LIBRARY_FOUND 1)
add_library (hyperscan UNKNOWN IMPORTED GLOBAL)
set_target_properties (hyperscan PROPERTIES IMPORTED_LOCATION ${LIBRARY_HYPERSCAN})
add_library (hyperscan INTERFACE)
set_target_properties (hyperscan PROPERTIES INTERFACE_LINK_LIBRARIES ${LIBRARY_HYPERSCAN})
set_target_properties (hyperscan PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${INCLUDE_HYPERSCAN})
set_property(TARGET hyperscan APPEND PROPERTY INTERFACE_COMPILE_DEFINITIONS USE_HYPERSCAN=1)
else ()

2
contrib/jemalloc vendored

@ -1 +1 @@
Subproject commit 026764f19995c53583ab25a3b9c06a2fd74e4689
Subproject commit 93e27e435cac846028da20cd9b0841fbc9110bd2

View File

@ -9,10 +9,6 @@ else()
endif ()
if (NOT ENABLE_JEMALLOC)
if(USE_INTERNAL_JEMALLOC_LIBRARY)
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't use internal jemalloc with ENABLE_JEMALLOC=OFF")
endif()
add_library(jemalloc INTERFACE)
target_compile_definitions(jemalloc INTERFACE USE_JEMALLOC=0)
@ -24,162 +20,116 @@ if (NOT OS_LINUX)
message (WARNING "jemalloc support on non-linux is EXPERIMENTAL")
endif()
option (USE_INTERNAL_JEMALLOC_LIBRARY "Use internal jemalloc library" ${NOT_UNBUNDLED})
if (OS_LINUX)
# ThreadPool select job randomly, and there can be some threads that had been
# performed some memory heavy task before and will be inactive for some time,
# but until it will became active again, the memory will not be freed since by
# default each thread has it's own arena, but there should be not more then
# 4*CPU arenas (see opt.nareans description).
#
# By enabling percpu_arena number of arenas limited to number of CPUs and hence
# this problem should go away.
#
# muzzy_decay_ms -- use MADV_FREE when available on newer Linuxes, to
# avoid spurious latencies and additional work associated with
# MADV_DONTNEED. See
# https://github.com/ClickHouse/ClickHouse/issues/11121 for motivation.
set (JEMALLOC_CONFIG_MALLOC_CONF "percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:10000")
else()
set (JEMALLOC_CONFIG_MALLOC_CONF "oversize_threshold:0,muzzy_decay_ms:10000")
endif()
# CACHE variable is empty, to allow changing defaults without necessity
# to purge cache
set (JEMALLOC_CONFIG_MALLOC_CONF_OVERRIDE "" CACHE STRING "Change default configuration string of JEMalloc" )
if (JEMALLOC_CONFIG_MALLOC_CONF_OVERRIDE)
set (JEMALLOC_CONFIG_MALLOC_CONF "${JEMALLOC_CONFIG_MALLOC_CONF_OVERRIDE}")
endif()
message (STATUS "jemalloc malloc_conf: ${JEMALLOC_CONFIG_MALLOC_CONF}")
if (NOT USE_INTERNAL_JEMALLOC_LIBRARY)
find_library(LIBRARY_JEMALLOC jemalloc)
find_path(INCLUDE_JEMALLOC jemalloc/jemalloc.h)
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/jemalloc")
if (LIBRARY_JEMALLOC AND INCLUDE_JEMALLOC)
set(EXTERNAL_JEMALLOC_LIBRARY_FOUND 1)
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads)
set (CMAKE_REQUIRED_LIBRARIES ${LIBRARY_JEMALLOC} Threads::Threads "dl")
set (CMAKE_REQUIRED_INCLUDES ${INCLUDE_JEMALLOC})
check_cxx_source_compiles (
"
#include <jemalloc/jemalloc.h>
int main() {
free(mallocx(1, 0));
}
"
EXTERNAL_JEMALLOC_LIBRARY_WORKS
)
if (EXTERNAL_JEMALLOC_LIBRARY_WORKS)
add_library (jemalloc STATIC IMPORTED)
set_property (TARGET jemalloc PROPERTY IMPORTED_LOCATION ${LIBRARY_JEMALLOC})
set_property (TARGET jemalloc PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INCLUDE_JEMALLOC})
set_property (TARGET jemalloc PROPERTY INTERFACE_LINK_LIBRARIES Threads::Threads dl)
else()
message (${RECONFIGURE_MESSAGE_LEVEL} "External jemalloc is unusable: ${LIBRARY_JEMALLOC} ${INCLUDE_JEMALLOC}")
endif ()
else()
set(EXTERNAL_JEMALLOC_LIBRARY_FOUND 0)
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't find system jemalloc")
endif()
set (SRCS
${LIBRARY_DIR}/src/arena.c
${LIBRARY_DIR}/src/background_thread.c
${LIBRARY_DIR}/src/base.c
${LIBRARY_DIR}/src/bin.c
${LIBRARY_DIR}/src/bitmap.c
${LIBRARY_DIR}/src/ckh.c
${LIBRARY_DIR}/src/ctl.c
${LIBRARY_DIR}/src/div.c
${LIBRARY_DIR}/src/extent.c
${LIBRARY_DIR}/src/extent_dss.c
${LIBRARY_DIR}/src/extent_mmap.c
${LIBRARY_DIR}/src/hash.c
${LIBRARY_DIR}/src/hook.c
${LIBRARY_DIR}/src/jemalloc.c
${LIBRARY_DIR}/src/large.c
${LIBRARY_DIR}/src/log.c
${LIBRARY_DIR}/src/malloc_io.c
${LIBRARY_DIR}/src/mutex.c
${LIBRARY_DIR}/src/mutex_pool.c
${LIBRARY_DIR}/src/nstime.c
${LIBRARY_DIR}/src/pages.c
${LIBRARY_DIR}/src/prng.c
${LIBRARY_DIR}/src/prof.c
${LIBRARY_DIR}/src/rtree.c
${LIBRARY_DIR}/src/sc.c
${LIBRARY_DIR}/src/stats.c
${LIBRARY_DIR}/src/sz.c
${LIBRARY_DIR}/src/tcache.c
${LIBRARY_DIR}/src/test_hooks.c
${LIBRARY_DIR}/src/ticker.c
${LIBRARY_DIR}/src/tsd.c
${LIBRARY_DIR}/src/witness.c
${LIBRARY_DIR}/src/safety_check.c
)
if (OS_DARWIN)
list(APPEND SRCS ${LIBRARY_DIR}/src/zone.c)
endif ()
if (NOT EXTERNAL_JEMALLOC_LIBRARY_FOUND OR NOT EXTERNAL_JEMALLOC_LIBRARY_WORKS)
set(USE_INTERNAL_JEMALLOC_LIBRARY 1)
add_library(jemalloc ${SRCS})
target_include_directories(jemalloc PRIVATE ${LIBRARY_DIR}/include)
target_include_directories(jemalloc SYSTEM PUBLIC include)
if (OS_LINUX)
# ThreadPool select job randomly, and there can be some threads that had been
# performed some memory heavy task before and will be inactive for some time,
# but until it will became active again, the memory will not be freed since by
# default each thread has it's own arena, but there should be not more then
# 4*CPU arenas (see opt.nareans description).
#
# By enabling percpu_arena number of arenas limited to number of CPUs and hence
# this problem should go away.
#
# muzzy_decay_ms -- use MADV_FREE when available on newer Linuxes, to
# avoid spurious latencies and additional work associated with
# MADV_DONTNEED. See
# https://github.com/ClickHouse/ClickHouse/issues/11121 for motivation.
set (JEMALLOC_CONFIG_MALLOC_CONF "percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:10000")
else()
set (JEMALLOC_CONFIG_MALLOC_CONF "oversize_threshold:0,muzzy_decay_ms:10000")
endif()
# CACHE variable is empty, to allow changing defaults without necessity
# to purge cache
set (JEMALLOC_CONFIG_MALLOC_CONF_OVERRIDE "" CACHE STRING "Change default configuration string of JEMalloc" )
if (JEMALLOC_CONFIG_MALLOC_CONF_OVERRIDE)
set (JEMALLOC_CONFIG_MALLOC_CONF "${JEMALLOC_CONFIG_MALLOC_CONF_OVERRIDE}")
endif()
message (STATUS "jemalloc malloc_conf: ${JEMALLOC_CONFIG_MALLOC_CONF}")
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/jemalloc")
set (SRCS
${LIBRARY_DIR}/src/arena.c
${LIBRARY_DIR}/src/background_thread.c
${LIBRARY_DIR}/src/base.c
${LIBRARY_DIR}/src/bin.c
${LIBRARY_DIR}/src/bitmap.c
${LIBRARY_DIR}/src/ckh.c
${LIBRARY_DIR}/src/ctl.c
${LIBRARY_DIR}/src/div.c
${LIBRARY_DIR}/src/extent.c
${LIBRARY_DIR}/src/extent_dss.c
${LIBRARY_DIR}/src/extent_mmap.c
${LIBRARY_DIR}/src/hash.c
${LIBRARY_DIR}/src/hook.c
${LIBRARY_DIR}/src/jemalloc.c
${LIBRARY_DIR}/src/large.c
${LIBRARY_DIR}/src/log.c
${LIBRARY_DIR}/src/malloc_io.c
${LIBRARY_DIR}/src/mutex.c
${LIBRARY_DIR}/src/mutex_pool.c
${LIBRARY_DIR}/src/nstime.c
${LIBRARY_DIR}/src/pages.c
${LIBRARY_DIR}/src/prng.c
${LIBRARY_DIR}/src/prof.c
${LIBRARY_DIR}/src/rtree.c
${LIBRARY_DIR}/src/sc.c
${LIBRARY_DIR}/src/stats.c
${LIBRARY_DIR}/src/sz.c
${LIBRARY_DIR}/src/tcache.c
${LIBRARY_DIR}/src/test_hooks.c
${LIBRARY_DIR}/src/ticker.c
${LIBRARY_DIR}/src/tsd.c
${LIBRARY_DIR}/src/witness.c
${LIBRARY_DIR}/src/safety_check.c
)
if (OS_DARWIN)
list(APPEND SRCS ${LIBRARY_DIR}/src/zone.c)
endif ()
add_library(jemalloc ${SRCS})
target_include_directories(jemalloc PRIVATE ${LIBRARY_DIR}/include)
target_include_directories(jemalloc SYSTEM PUBLIC include)
set (JEMALLOC_INCLUDE_PREFIX)
# OS_
if (OS_LINUX)
set (JEMALLOC_INCLUDE_PREFIX "include_linux")
elseif (OS_FREEBSD)
set (JEMALLOC_INCLUDE_PREFIX "include_freebsd")
elseif (OS_DARWIN)
set (JEMALLOC_INCLUDE_PREFIX "include_darwin")
else ()
message (FATAL_ERROR "internal jemalloc: This OS is not supported")
endif ()
# ARCH_
if (ARCH_AMD64)
set(JEMALLOC_INCLUDE_PREFIX "${JEMALLOC_INCLUDE_PREFIX}_x86_64")
elseif (ARCH_ARM)
set(JEMALLOC_INCLUDE_PREFIX "${JEMALLOC_INCLUDE_PREFIX}_aarch64")
else ()
message (FATAL_ERROR "internal jemalloc: This arch is not supported")
endif ()
configure_file(${JEMALLOC_INCLUDE_PREFIX}/jemalloc/internal/jemalloc_internal_defs.h.in
${JEMALLOC_INCLUDE_PREFIX}/jemalloc/internal/jemalloc_internal_defs.h)
target_include_directories(jemalloc SYSTEM PRIVATE
${CMAKE_CURRENT_BINARY_DIR}/${JEMALLOC_INCLUDE_PREFIX}/jemalloc/internal)
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_NO_PRIVATE_NAMESPACE)
if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG")
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_DEBUG=1 -DJEMALLOC_PROF=1)
if (USE_UNWIND)
target_compile_definitions (jemalloc PRIVATE -DJEMALLOC_PROF_LIBUNWIND=1)
target_link_libraries (jemalloc PRIVATE unwind)
endif ()
endif ()
target_compile_options(jemalloc PRIVATE -Wno-redundant-decls)
# for RTLD_NEXT
target_compile_options(jemalloc PRIVATE -D_GNU_SOURCE)
set (USE_INTERNAL_JEMALLOC_LIBRARY 1)
set (JEMALLOC_INCLUDE_PREFIX)
# OS_
if (OS_LINUX)
set (JEMALLOC_INCLUDE_PREFIX "include_linux")
elseif (OS_FREEBSD)
set (JEMALLOC_INCLUDE_PREFIX "include_freebsd")
elseif (OS_DARWIN)
set (JEMALLOC_INCLUDE_PREFIX "include_darwin")
else ()
message (FATAL_ERROR "internal jemalloc: This OS is not supported")
endif ()
# ARCH_
if (ARCH_AMD64)
set(JEMALLOC_INCLUDE_PREFIX "${JEMALLOC_INCLUDE_PREFIX}_x86_64")
elseif (ARCH_ARM)
set(JEMALLOC_INCLUDE_PREFIX "${JEMALLOC_INCLUDE_PREFIX}_aarch64")
else ()
message (FATAL_ERROR "internal jemalloc: This arch is not supported")
endif ()
configure_file(${JEMALLOC_INCLUDE_PREFIX}/jemalloc/internal/jemalloc_internal_defs.h.in
${JEMALLOC_INCLUDE_PREFIX}/jemalloc/internal/jemalloc_internal_defs.h)
target_include_directories(jemalloc SYSTEM PRIVATE
${CMAKE_CURRENT_BINARY_DIR}/${JEMALLOC_INCLUDE_PREFIX}/jemalloc/internal)
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_NO_PRIVATE_NAMESPACE)
if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG")
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_DEBUG=1 -DJEMALLOC_PROF=1)
if (USE_UNWIND)
target_compile_definitions (jemalloc PRIVATE -DJEMALLOC_PROF_LIBUNWIND=1)
target_link_libraries (jemalloc PRIVATE unwind)
endif ()
endif ()
target_compile_options(jemalloc PRIVATE -Wno-redundant-decls)
# for RTLD_NEXT
target_compile_options(jemalloc PRIVATE -D_GNU_SOURCE)
set_property(TARGET jemalloc APPEND PROPERTY INTERFACE_COMPILE_DEFINITIONS USE_JEMALLOC=1)
if (MAKE_STATIC_LIBRARIES)

2
contrib/libhdfs3 vendored

@ -1 +1 @@
Subproject commit 24b058c356794ef6cc2d31323dc9adf0386652ff
Subproject commit 30552ac527f2c14070d834e171493b2e7f662375

View File

@ -6,8 +6,8 @@ if (NOT USE_INTERNAL_LZ4_LIBRARY)
if (LIBRARY_LZ4 AND INCLUDE_LZ4)
set(EXTERNAL_LZ4_LIBRARY_FOUND 1)
add_library (lz4 UNKNOWN IMPORTED)
set_property (TARGET lz4 PROPERTY IMPORTED_LOCATION ${LIBRARY_LZ4})
add_library (lz4 INTERFACE)
set_property (TARGET lz4 PROPERTY INTERFACE_LINK_LIBRARIES ${LIBRARY_LZ4})
set_property (TARGET lz4 PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INCLUDE_LZ4})
set_property (TARGET lz4 APPEND PROPERTY INTERFACE_COMPILE_DEFINITIONS USE_XXHASH=0)
else()

2
contrib/openssl vendored

@ -1 +1 @@
Subproject commit 07e9623064508d15dd61367f960ebe7fc9aecd77
Subproject commit 237260dd6a4bca5cb5a321d366a8a9c807957455

4
debian/changelog vendored
View File

@ -1,5 +1,5 @@
clickhouse (20.10.1.1) unstable; urgency=low
clickhouse (20.11.1.1) unstable; urgency=low
* Modified source code
-- clickhouse-release <clickhouse-release@yandex-team.ru> Tue, 08 Sep 2020 17:04:39 +0300
-- clickhouse-release <clickhouse-release@yandex-team.ru> Sat, 10 Oct 2020 18:39:55 +0300

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb https://repo.clickhouse.tech/deb/stable/ main/"
ARG version=20.10.1.*
ARG version=20.11.1.*
RUN apt-get update \
&& apt-get install --yes --no-install-recommends \

View File

@ -1,7 +1,7 @@
FROM ubuntu:20.04
ARG repository="deb https://repo.clickhouse.tech/deb/stable/ main/"
ARG version=20.10.1.*
ARG version=20.11.1.*
ARG gosu_ver=1.10
RUN apt-get update \

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb https://repo.clickhouse.tech/deb/stable/ main/"
ARG version=20.10.1.*
ARG version=20.11.1.*
RUN apt-get update && \
apt-get install -y apt-transport-https dirmngr && \

View File

@ -52,9 +52,9 @@ Optional parameters:
- `rabbitmq_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `rabbitmq_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
- `rabbitmq_num_queues` The number of queues per consumer. Default: `1`. Specify more queues if the capacity of one queue per consumer is insufficient.
- `rabbitmq_queue_base` - Specify a base name for queues that will be declared. By default, queues are declared unique to tables based on db and table names.
- `rabbitmq_queue_base` - Specify a hint for queue names. Use cases of this setting are described below.
- `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified.
- `persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
- `rabbitmq_persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
- `rabbitmq_skip_broken_messages` RabbitMQ message parser tolerance to schema-incompatible messages per block. Default: `0`. If `rabbitmq_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data).
- `rabbitmq_max_block_size`
- `rabbitmq_flush_interval_ms`
@ -102,12 +102,12 @@ Exchange type options:
- `fanout` - Routing to all tables (where exchange name is the same) regardless of the keys.
- `topic` - Routing is based on patterns with dot-separated keys. Examples: `*.logs`, `records.*.*.2020`, `*.2018,*.2019,*.2020`.
- `headers` - Routing is based on `key=value` matches with a setting `x-match=all` or `x-match=any`. Example table key list: `x-match=all,format=logs,type=report,year=2020`.
- `consistent-hash` - Data is evenly distributed between all bound tables (where the exchange name is the same). Note that this exchange type must be enabled with RabbitMQ plugin: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`.
- `consistent_hash` - Data is evenly distributed between all bound tables (where the exchange name is the same). Note that this exchange type must be enabled with RabbitMQ plugin: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`.
Setting `rabbitmq_queue_base` may be used for the following cases:
- to let different tables share queues, so that multiple consumers could be registered for the same queues, which makes a better performance. If using `rabbitmq_num_consumers` and/or `rabbitmq_num_queues` settings, the exact match of queues is achieved in case these parameters are the same.
- to be able to restore reading from certain durable queues when not all messages were successfully consumed. To be able to resume consumption from one specific queue - set its name in `rabbitmq_queue_base` setting and do not specify `rabbitmq_num_consumers` and `rabbitmq_num_queues` (defaults to 1). To be able to resume consumption from all queues, which were declared for a specific table - just specify the same settings: `rabbitmq_queue_base`, `rabbitmq_num_consumers`, `rabbitmq_num_queues`. By default, queue names will be unique to tables. Note: it makes sence only if messages are sent with delivery mode 2 - marked 'persistent', durable.
- to reuse queues as they are declared durable and not auto-deleted.
- to be able to restore reading from certain durable queues when not all messages were successfully consumed. To resume consumption from one specific queue - set its name in `rabbitmq_queue_base` setting and do not specify `rabbitmq_num_consumers` and `rabbitmq_num_queues` (defaults to 1). To resume consumption from all queues, which were declared for a specific table - just specify the same settings: `rabbitmq_queue_base`, `rabbitmq_num_consumers`, `rabbitmq_num_queues`. By default, queue names will be unique to tables.
- to reuse queues as they are declared durable and not auto-deleted. (Can be deleted via any of RabbitMQ CLI tools.)
To improve performance, received messages are grouped into blocks the size of [max\_insert\_block\_size](../../../operations/server-configuration-parameters/settings.md#settings-max_insert_block_size). If the block wasnt formed within [stream\_flush\_interval\_ms](../../../operations/server-configuration-parameters/settings.md) milliseconds, the data will be flushed to the table regardless of the completeness of the block.

View File

@ -3,7 +3,7 @@ toc_priority: 42
toc_title: Decimal
---
# Decimal(P, S), Decimal32(S), Decimal64(S), Decimal128(S), Decimal256(S) {#decimalp-s-decimal32s-decimal64s-decimal128s-decimal256s}
# Decimal(P, S), Decimal32(S), Decimal64(S), Decimal128(S), Decimal256(S) {#decimal}
Signed fixed-point numbers that keep precision during add, subtract and multiply operations. For division least significant digits are discarded (not rounded).

View File

@ -1495,13 +1495,13 @@ Result:
Returns the current value of a [custom setting](../../operations/settings/index.md#custom_settings).
**Syntax**
**Syntax**
```sql
getSetting('custom_setting');
getSetting('custom_setting');
```
**Parameter**
**Parameter**
- `custom_setting` — The setting name. [String](../../sql-reference/data-types/string.md).
@ -1513,7 +1513,7 @@ getSetting('custom_setting');
```sql
SET custom_a = 123;
SELECT getSetting('custom_a');
SELECT getSetting('custom_a');
```
**Result**
@ -1522,13 +1522,13 @@ SELECT getSetting('custom_a');
123
```
**See Also**
**See Also**
- [Custom Settings](../../operations/settings/index.md#custom_settings)
## isDecimalOverflow {#is-decimal-overflow}
Checks whether the [Decimal](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s) value is out of its (or specified) precision.
Checks whether the [Decimal](../../sql-reference/data-types/decimal.md) value is out of its (or specified) precision.
**Syntax**
@ -1536,10 +1536,10 @@ Checks whether the [Decimal](../../sql-reference/data-types/decimal.md#decimalp-
isDecimalOverflow(d, [p])
```
**Parameters**
**Parameters**
- `d` — value. [Decimal](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s).
- `p` — precision. Optional. If omitted, the initial presicion of the first argument is used. Using of this paratemer could be helpful for data extraction to another DBMS or file. [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
- `d` — value. [Decimal](../../sql-reference/data-types/decimal.md).
- `p` — precision. Optional. If omitted, the initial presicion of the first argument is used. Using of this paratemer could be helpful for data extraction to another DBMS or file. [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
**Returned values**
@ -1573,9 +1573,9 @@ Returns number of decimal digits you need to represent the value.
countDigits(x)
```
**Parameters**
**Parameters**
- `x` — [Int](../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64) or [Decimal](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s) value.
- `x` — [Int](../../sql-reference/data-types/int-uint.md) or [Decimal](../../sql-reference/data-types/decimal.md) value.
**Returned value**

View File

@ -13,7 +13,6 @@ ClickHouse has the [same behavior as C++ programs](https://en.cppreference.com/w
## toInt(8\|16\|32\|64\|128\|256) {#toint8163264128256}
Converts an input value to the [Int](../../sql-reference/data-types/int-uint.md) data type. This function family includes:
- `toInt8(expr)` — Results in the `Int8` data type.
@ -189,9 +188,7 @@ SELECT toDecimal32OrNull(toString(-1.111), 2) AS val, toTypeName(val)
└──────┴────────────────────────────────────────────────────┘
```
## toDecimal(32\|64\|128\'|256)OrZero {#todecimal3264128256orzero}
## toDecimal(32\|64\|128\|256)OrZero {#todecimal3264128256orzero}
Converts an input value to the [Decimal(P,S)](../../sql-reference/data-types/decimal.md) data type. This family of functions include:

View File

@ -121,7 +121,9 @@ Defines storage time for values. Can be specified only for MergeTree-family tabl
## Column Compression Codecs {#codecs}
By default, ClickHouse applies the `lz4` compression method. For `MergeTree`-engine family you can change the default compression method in the [compression](../../../operations/server-configuration-parameters/settings.md#server-settings-compression) section of a server configuration. You can also define the compression method for each individual column in the `CREATE TABLE` query.
By default, ClickHouse applies the `lz4` compression method. For `MergeTree`-engine family you can change the default compression method in the [compression](../../../operations/server-configuration-parameters/settings.md#server-settings-compression) section of a server configuration.
You can also define the compression method for each individual column in the `CREATE TABLE` query.
``` sql
CREATE TABLE codec_example
@ -136,7 +138,18 @@ ENGINE = <Engine>
...
```
If a codec is specified, the default codec doesnt apply. Codecs can be combined in a pipeline, for example, `CODEC(Delta, ZSTD)`. To select the best codec combination for you project, pass benchmarks similar to described in the Altinity [New Encodings to Improve ClickHouse Efficiency](https://www.altinity.com/blog/2019/7/new-encodings-to-improve-clickhouse) article. One thing to note is that codec can't be applied for ALIAS column type.
The `Default` codec can be specified to reference default compression which may dependend on different settings (and properties of data) in runtime.
Example: `value UInt64 CODEC(Default)` - the same as lack of codec specification.
Also you can remove current CODEC from the column and use default compression from config.xml:
``` sql
ALTER TABLE codec_example MODIFY COLUMN float_value CODEC(Default);
```
Codecs can be combined in a pipeline, for example, `CODEC(Delta, Default)`.
To select the best codec combination for you project, pass benchmarks similar to described in the Altinity [New Encodings to Improve ClickHouse Efficiency](https://www.altinity.com/blog/2019/7/new-encodings-to-improve-clickhouse) article. One thing to note is that codec can't be applied for ALIAS column type.
!!! warning "Warning"
You cant decompress ClickHouse database files with external utilities like `lz4`. Instead, use the special [clickhouse-compressor](https://github.com/ClickHouse/ClickHouse/tree/master/programs/compressor) utility.

View File

@ -1,8 +1,3 @@
---
toc_priority: 6
toc_title: RabbitMQ
---
# RabbitMQ {#rabbitmq-engine}
Движок работает с [RabbitMQ](https://www.rabbitmq.com).
@ -27,9 +22,14 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
[rabbitmq_exchange_type = 'exchange_type',]
[rabbitmq_routing_key_list = 'key1,key2,...',]
[rabbitmq_row_delimiter = 'delimiter_symbol',]
[rabbitmq_schema = '',]
[rabbitmq_num_consumers = N,]
[rabbitmq_num_queues = N,]
[rabbitmq_transactional_channel = 0]
[rabbitmq_queue_base = 'queue',]
[rabbitmq_persistent = 0,]
[rabbitmq_skip_broken_messages = N,]
[rabbitmq_max_block_size = N,]
[rabbitmq_flush_interval_ms = N]
```
Обязательные параметры:
@ -40,12 +40,17 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
Дополнительные параметры:
- `rabbitmq_exchange_type` тип точки обмена в RabbitMQ: `direct`, `fanout`, `topic`, `headers`, `consistent-hash`. По умолчанию: `fanout`.
- `rabbitmq_exchange_type` тип точки обмена в RabbitMQ: `direct`, `fanout`, `topic`, `headers`, `consistent_hash`. По умолчанию: `fanout`.
- `rabbitmq_routing_key_list` список ключей маршрутизации, через запятую.
- `rabbitmq_row_delimiter` символ-разделитель, который завершает сообщение.
- `rabbitmq_schema` опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Capn Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
- `rabbitmq_num_consumers` количество потребителей на таблицу. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна.
- `rabbitmq_num_queues` количество очередей на потребителя. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одной очереди на потребителя недостаточна. Одна очередь поддерживает до 50 тысяч сообщений одновременно.
- `rabbitmq_transactional_channel` обернутые запросы `INSERT` в транзакциях. По умолчанию: `0`.
- `rabbitmq_num_queues` количество очередей на потребителя. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одной очереди на потребителя недостаточна.
- `rabbitmq_queue_base` - настройка для имен очередей. Сценарии использования описаны ниже.
- `rabbitmq_persistent` - флаг, от которого зависит настройка 'durable' для сообщений при запросах `INSERT`. По умолчанию: `0`.
- `rabbitmq_skip_broken_messages` максимальное количество некорректных сообщений в блоке. Если `rabbitmq_skip_broken_messages = N`, то движок отбрасывает `N` сообщений, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию 0.
- `rabbitmq_max_block_size`
- `rabbitmq_flush_interval_ms`
Требуемая конфигурация:
@ -90,15 +95,23 @@ Example:
- `fanout` - маршрутизация по всем таблицам, где имя точки обмена совпадает, независимо от ключей.
- `topic` - маршрутизация основана на правилах с ключами, разделенными точками. Например: `*.logs`, `records.*.*.2020`, `*.2018,*.2019,*.2020`.
- `headers` - маршрутизация основана на совпадении `key=value` с настройкой `x-match=all` или `x-match=any`. Пример списка ключей таблицы: `x-match=all,format=logs,type=report,year=2020`.
- `consistent-hash` - данные равномерно распределяются между всеми связанными таблицами, где имя точки обмена совпадает. Обратите внимание, что этот тип обмена должен быть включен с помощью плагина RabbitMQ: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`.
- `consistent_hash` - данные равномерно распределяются между всеми связанными таблицами, где имя точки обмена совпадает. Обратите внимание, что этот тип обмена должен быть включен с помощью плагина RabbitMQ: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`.
Если тип точки обмена не задан, по умолчанию используется `fanout`. В таком случае ключи маршрутизации для публикации данных должны быть рандомизированы в диапазоне `[1, num_consumers]` за каждое сообщение/пакет (или в диапазоне `[1, num_consumers * num_queues]`, если `rabbitmq_num_queues` задано). Эта конфигурация таблицы работает быстрее, чем любая другая, особенно когда заданы параметры `rabbitmq_num_consumers` и/или `rabbitmq_num_queues`.
Настройка `rabbitmq_queue_base` может быть использована в следующих случаях:
1. чтобы восстановить чтение из ранее созданных очередей, если оно прекратилось по какой-либо причине, но очереди остались непустыми. Для восстановления чтения из одной конкретной очереди, нужно написать ее имя в `rabbitmq_queue_base` настройку и не указывать настройки `rabbitmq_num_consumers` и `rabbitmq_num_queues`. Чтобы восстановить чтение из всех очередей, которые были созданы для конкретной таблицы, необходимо совпадение следующих настроек: `rabbitmq_queue_base`, `rabbitmq_num_consumers`, `rabbitmq_num_queues`. По умолчанию, если настройка `rabbitmq_queue_base` не указана, будут использованы уникальные для каждой таблицы имена очередей.
2. чтобы объявить одни и те же очереди для разных таблиц, что позволяет создавать несколько параллельных подписчиков на каждую из очередей. То есть обеспечивается лучшая производительность. В данном случае, для таких таблиц также необходимо совпадение настроек: `rabbitmq_num_consumers`, `rabbitmq_num_queues`.
3. чтобы повторно использовать созданные c `durable` настройкой очереди, так как они не удаляются автоматически (но могут быть удалены с помощью любого RabbitMQ CLI).
Для улучшения производительности полученные сообщения группируются в блоки размера [max\_insert\_block\_size](../../../operations/settings/settings.md#settings-max_insert_block_size). Если блок не удалось сформировать за [stream\_flush\_interval\_ms](../../../operations/settings/settings.md#stream-flush-interval-ms) миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока.
Если параметры`rabbitmq_num_consumers` и/или `rabbitmq_num_queues` заданы вместе с параметром `rabbitmq_exchange_type`:
- плагин `rabbitmq-consistent-hash-exchange` должен быть включен.
- свойство `message_id` должно быть определено (уникальное для каждого сообщения/пакета).
При запросах `INSERT` отправляемым сообщениям добавляются метаданные: `messageID` и флаг `republished` - доступны через заголовки сообщений (headers).
Для запросов чтения и вставки не должна использоваться одна и та же таблица.
Пример:
``` sql
@ -120,3 +133,11 @@ Example:
SELECT key, value FROM daily ORDER BY key;
```
## Virtual Columns {#virtual-columns}
- `_exchange_name` - имя точки обмена RabbitMQ.
- `_channel_id` - идентификатор канала `ChannelID`, на котором было получено сообщение.
- `_delivery_tag` - значение `DeliveryTag` полученного сообщения. Уникально в рамках одного канала.
- `_redelivered` - флаг `redelivered`. (Не равно нулю, если есть возможность, что сообщение было получено более, чем одним каналом.)
- `_message_id` - значение `MessageID` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.

View File

@ -468,7 +468,7 @@ clickhouse-client --query "INSERT INTO tutorial.hits_v1 FORMAT TSV" --max_insert
clickhouse-client --query "INSERT INTO tutorial.visits_v1 FORMAT TSV" --max_insert_block_size=100000 < visits_v1.tsv
```
ClickHouse有很多 [要调整的设置](../operations/settings/index.md) 在控制台客户端中指定它们的一种方法是通过参数,我们可以看到 `--max_insert_block_size`. 找出可用的设置,它们意味着什么以及默认值的最简单方法是查询 `system.settings` 表:
ClickHouse有很多 [要调整的设置](../operations/settings/index.md) 在控制台客户端中指定它们的一种方法是通过参数,就像我们看到上面语句中的 `--max_insert_block_size`。找出可用的设置、含义及其默认值的最简单方法是查询 `system.settings` 表:
``` sql
SELECT name, value, changed, description
@ -479,7 +479,7 @@ FORMAT TSV
max_insert_block_size 1048576 0 "The maximum block size for insertion, if we control the creation of blocks for insertion."
```
您也可以 [OPTIMIZE](../sql-reference/statements/misc.md#misc_operations-optimize) 导入后的表。 使用MergeTree-family引擎配置的表总是在后台合并数据部分以优化数据存储或至少检查是否有意义。 这些查询强制表引擎立即进行存储优化,而不是稍后进行一段时间:
您也可以 [OPTIMIZE](../sql-reference/statements/misc.md#misc_operations-optimize) 导入后的表。 使用MergeTree-family引擎配置的表总是在后台合并数据部分以优化数据存储或至少检查是否有意义。 这些查询强制表引擎立即进行存储优化,而不是稍后一段时间执行:
``` bash
clickhouse-client --query "OPTIMIZE TABLE tutorial.hits_v1 FINAL"
@ -521,14 +521,14 @@ WHERE (CounterID = 912887) AND (toYYYYMM(StartDate) = 201403) AND (domain(StartU
ClickHouse集群是一个同质集群。 设置步骤:
1. 在群集的所有计算机上安装ClickHouse服务器
1. 在群集的所有机器上安装ClickHouse服务端
2. 在配置文件中设置群集配置
3. 在每个实例上创建本地表
4. 创建一个 [分布式表](../engines/table-engines/special/distributed.md)
[分布式表](../engines/table-engines/special/distributed.md) 实际上是一种 “view” 到ClickHouse集群的本地表。 从分布式表中选择查询使用集群所有分片的资源执行。 您可以为多个集群指定configs并创建多个分布式表为不同的集群提供视图。
[分布式表](../engines/table-engines/special/distributed.md) 实际上是一种 “视图”映射到ClickHouse集群的本地表。 从分布式表中执行 **SELECT** 查询会使用集群所有分片的资源。 您可以为多个集群指定configs并创建多个分布式表为不同的集群提供视图。
具有三个分片的集群的示例配置,每个分片一个副本:
具有三个分片,每个分片一个副本的集群的示例配置:
``` xml
<remote_servers>
@ -555,7 +555,7 @@ ClickHouse集群是一个同质集群。 设置步骤:
</remote_servers>
```
为了进一步演示,让我们创建一个新的本地表 `CREATE TABLE` 我们用于查询 `hits_v1`,但不同的表名:
为了进一步演示,让我们使用和创建 `hits_v1` 表相同的 `CREATE TABLE` 语句创建一个新的本地表,但表名不同:
``` sql
CREATE TABLE tutorial.hits_local (...) ENGINE = MergeTree() ...
@ -570,14 +570,14 @@ ENGINE = Distributed(perftest_3shards_1replicas, tutorial, hits_local, rand());
常见的做法是在集群的所有计算机上创建类似的分布式表。 它允许在群集的任何计算机上运行分布式查询。 还有一个替代选项可以使用以下方法为给定的SELECT查询创建临时分布式表 [远程](../sql-reference/table-functions/remote.md) 表功能。
我们走吧 [INSERT SELECT](../sql-reference/statements/insert-into.md) 将该表传播到多个服务器。
让我们运行 [INSERT SELECT](../sql-reference/statements/insert-into.md) 将该表传播到多个服务器。
``` sql
INSERT INTO tutorial.hits_all SELECT * FROM tutorial.hits_v1;
```
!!! warning "碌莽禄Notice:"
这种方法不适合大型表的分片。 有一个单独的工具 [ツ环板-ョツ嘉ッツ偲](../operations/utilities/clickhouse-copier.md) 这可以重新分片任意大表。
!!! warning "注意:"
这种方法不适合大型表的分片。 有一个单独的工具 [clickhouse-copier](../operations/utilities/clickhouse-copier.md) 这可以重新分片任意大表。
正如您所期望的那样如果计算量大的查询使用3台服务器而不是一个则运行速度快N倍。
@ -609,10 +609,10 @@ INSERT INTO tutorial.hits_all SELECT * FROM tutorial.hits_v1;
</remote_servers>
```
启用本机复制 [动物园管理员](http://zookeeper.apache.org/) 是必需的。 ClickHouse负责所有副本的数据一致性并在失败后自动运行恢复过程。 建议将ZooKeeper集群部署在单独的服务器上其中没有其他进程包括ClickHouse正在运行)。
启用本机复制 [Zookeeper](http://zookeeper.apache.org/) 是必需的。 ClickHouse负责所有副本的数据一致性并在失败后自动运行恢复过程。 建议将ZooKeeper集群部署在单独的服务器上其中没有其他进程包括运行的ClickHouse
!!! note "注"
ZooKeeper不是一个严格的requirement:在某些简单的情况下,您可以通过将数据写入应用程序代码中的所有副本来复制数据。 这种方法是 **不** 建议在这种情况下ClickHouse将无法保证所有副本上的数据一致性。 因此,它成为您的应用程序的责任
ZooKeeper不是一个严格的要求:在某些简单的情况下,您可以通过将数据写入应用程序代码中的所有副本来复制数据。 这种方法是 **不** 建议在这种情况下ClickHouse将无法保证所有副本上的数据一致性。 因此需要由您的应用来保证这一点
ZooKeeper位置在配置文件中指定:

View File

@ -134,7 +134,6 @@ private:
bool stdout_is_a_tty = false; /// stdout is a terminal.
std::unique_ptr<Connection> connection; /// Connection to DB.
String query_id; /// Current query_id.
String full_query; /// Current query as it was given to the client.
// Current query as it will be sent to the server. It may differ from the
@ -219,6 +218,9 @@ private:
QueryFuzzer fuzzer;
int query_fuzzer_runs = 0;
/// We will format query_id in interactive mode in various ways, the default is just to print Query id: ...
std::vector<std::pair<String, String>> query_id_formats;
void initialize(Poco::Util::Application & self) override
{
Poco::Util::Application::initialize(self);
@ -243,6 +245,17 @@ private:
/// Set path for format schema files
if (config().has("format_schema_path"))
context.setFormatSchemaPath(Poco::Path(config().getString("format_schema_path")).toString());
/// Initialize query_id_formats if any
if (config().has("query_id_formats"))
{
Poco::Util::AbstractConfiguration::Keys keys;
config().keys("query_id_formats", keys);
for (const auto & name : keys)
query_id_formats.emplace_back(name + ":", config().getString("query_id_formats." + name));
}
if (query_id_formats.empty())
query_id_formats.emplace_back("Query id:", " {query_id}\n");
}
@ -559,7 +572,7 @@ private:
if (is_interactive)
{
if (!query_id.empty())
if (config().has("query_id"))
throw Exception("query_id could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS);
if (print_time_to_stderr)
throw Exception("time option could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS);
@ -665,7 +678,9 @@ private:
}
else
{
query_id = config().getString("query_id", "");
auto query_id = config().getString("query_id", "");
if (!query_id.empty())
context.setCurrentQueryId(query_id);
if (query_fuzzer_runs)
{
nonInteractiveWithFuzzing();
@ -1274,6 +1289,19 @@ private:
std_out.next();
}
if (is_interactive)
{
// Generate a new query_id
context.setCurrentQueryId("");
for (const auto & query_id_format : query_id_formats)
{
writeString(query_id_format.first, std_out);
writeString(fmt::format(query_id_format.second, fmt::arg("query_id", context.getCurrentQueryId())), std_out);
writeChar('\n', std_out);
std_out.next();
}
}
watch.restart();
processed_rows = 0;
progress.reset();
@ -1399,7 +1427,7 @@ private:
connection->sendQuery(
connection_parameters.timeouts,
query_to_send,
query_id,
context.getCurrentQueryId(),
QueryProcessingStage::Complete,
&context.getSettingsRef(),
&context.getClientInfo(),
@ -1440,7 +1468,7 @@ private:
connection->sendQuery(
connection_parameters.timeouts,
query_to_send,
query_id,
context.getCurrentQueryId(),
QueryProcessingStage::Complete,
&context.getSettingsRef(),
&context.getClientInfo(),

View File

@ -168,6 +168,26 @@ ASTPtr extractOrderBy(const ASTPtr & storage_ast)
throw Exception("ORDER BY cannot be empty", ErrorCodes::BAD_ARGUMENTS);
}
/// Wraps only identifiers with backticks.
std::string wrapIdentifiersWithBackticks(const ASTPtr & root)
{
if (auto identifier = std::dynamic_pointer_cast<ASTIdentifier>(root))
return backQuote(identifier->name);
if (auto function = std::dynamic_pointer_cast<ASTFunction>(root))
return function->name + '(' + wrapIdentifiersWithBackticks(function->arguments) + ')';
if (auto expression_list = std::dynamic_pointer_cast<ASTExpressionList>(root))
{
Names function_arguments(expression_list->children.size());
for (size_t i = 0; i < expression_list->children.size(); ++i)
function_arguments[i] = wrapIdentifiersWithBackticks(expression_list->children[0]);
return boost::algorithm::join(function_arguments, ", ");
}
throw Exception("Primary key could be represented only as columns or functions from columns.", ErrorCodes::BAD_ARGUMENTS);
}
Names extractPrimaryKeyColumnNames(const ASTPtr & storage_ast)
{
@ -189,13 +209,14 @@ Names extractPrimaryKeyColumnNames(const ASTPtr & storage_ast)
ErrorCodes::BAD_ARGUMENTS);
Names primary_key_columns;
Names sorting_key_columns;
NameSet primary_key_columns_set;
for (size_t i = 0; i < sorting_key_size; ++i)
{
/// Column name could be represented as a f_1(f_2(...f_n(column_name))).
/// Each f_i could take one or more parameters.
/// We will wrap identifiers with backticks to allow non-standart identifier names.
String sorting_key_column = sorting_key_expr_list->children[i]->getColumnName();
sorting_key_columns.push_back(sorting_key_column);
if (i < primary_key_size)
{
@ -208,7 +229,7 @@ Names extractPrimaryKeyColumnNames(const ASTPtr & storage_ast)
if (!primary_key_columns_set.emplace(pk_column).second)
throw Exception("Primary key contains duplicate columns", ErrorCodes::BAD_ARGUMENTS);
primary_key_columns.push_back(pk_column);
primary_key_columns.push_back(wrapIdentifiersWithBackticks(primary_key_expr_list->children[i]));
}
}

View File

@ -6,6 +6,9 @@
#include <Core/Defines.h>
#include <ext/map.h>
#include <boost/algorithm/string/join.hpp>
namespace DB
{
@ -269,7 +272,7 @@ inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConf
ParserStorage parser_storage;
engine_push_ast = parseQuery(parser_storage, engine_push_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
engine_push_partition_key_ast = extractPartitionKey(engine_push_ast);
primary_key_comma_separated = Nested::createCommaSeparatedStringFrom(extractPrimaryKeyColumnNames(engine_push_ast));
primary_key_comma_separated = boost::algorithm::join(extractPrimaryKeyColumnNames(engine_push_ast), ", ");
is_replicated_table = isReplicatedTableEngine(engine_push_ast);
}

View File

@ -578,17 +578,22 @@ void ContextAccess::checkGrantOption(const AccessRightsElements & elements) cons
template <typename Container, typename GetNameFunction>
void ContextAccess::checkAdminOptionImpl(const Container & role_ids, const GetNameFunction & get_name_function) const
bool ContextAccess::checkAdminOptionImpl(bool throw_on_error, const Container & role_ids, const GetNameFunction & get_name_function) const
{
if (isGranted(AccessType::ROLE_ADMIN))
return;
return true;
auto info = getRolesInfo();
if (!info)
{
if (!user)
throw Exception(user_name + ": User has been dropped", ErrorCodes::UNKNOWN_USER);
return;
{
if (throw_on_error)
throw Exception(user_name + ": User has been dropped", ErrorCodes::UNKNOWN_USER);
else
return false;
}
return true;
}
size_t i = 0;
@ -604,38 +609,73 @@ void ContextAccess::checkAdminOptionImpl(const Container & role_ids, const GetNa
String msg = "To execute this query it's necessary to have the role " + backQuoteIfNeed(*role_name) + " granted with ADMIN option";
if (info->enabled_roles.count(role_id))
msg = "Role " + backQuote(*role_name) + " is granted, but without ADMIN option. " + msg;
throw Exception(getUserName() + ": Not enough privileges. " + msg, ErrorCodes::ACCESS_DENIED);
if (throw_on_error)
throw Exception(getUserName() + ": Not enough privileges. " + msg, ErrorCodes::ACCESS_DENIED);
else
return false;
}
return true;
}
bool ContextAccess::hasAdminOption(const UUID & role_id) const
{
return checkAdminOptionImpl(false, to_array(role_id), [this](const UUID & id, size_t) { return manager->tryReadName(id); });
}
bool ContextAccess::hasAdminOption(const UUID & role_id, const String & role_name) const
{
return checkAdminOptionImpl(false, to_array(role_id), [&role_name](const UUID &, size_t) { return std::optional<String>{role_name}; });
}
bool ContextAccess::hasAdminOption(const UUID & role_id, const std::unordered_map<UUID, String> & names_of_roles) const
{
return checkAdminOptionImpl(false, to_array(role_id), [&names_of_roles](const UUID & id, size_t) { auto it = names_of_roles.find(id); return (it != names_of_roles.end()) ? it->second : std::optional<String>{}; });
}
bool ContextAccess::hasAdminOption(const std::vector<UUID> & role_ids) const
{
return checkAdminOptionImpl(false, role_ids, [this](const UUID & id, size_t) { return manager->tryReadName(id); });
}
bool ContextAccess::hasAdminOption(const std::vector<UUID> & role_ids, const Strings & names_of_roles) const
{
return checkAdminOptionImpl(false, role_ids, [&names_of_roles](const UUID &, size_t i) { return std::optional<String>{names_of_roles[i]}; });
}
bool ContextAccess::hasAdminOption(const std::vector<UUID> & role_ids, const std::unordered_map<UUID, String> & names_of_roles) const
{
return checkAdminOptionImpl(false, role_ids, [&names_of_roles](const UUID & id, size_t) { auto it = names_of_roles.find(id); return (it != names_of_roles.end()) ? it->second : std::optional<String>{}; });
}
void ContextAccess::checkAdminOption(const UUID & role_id) const
{
checkAdminOptionImpl(to_array(role_id), [this](const UUID & id, size_t) { return manager->tryReadName(id); });
checkAdminOptionImpl(true, to_array(role_id), [this](const UUID & id, size_t) { return manager->tryReadName(id); });
}
void ContextAccess::checkAdminOption(const UUID & role_id, const String & role_name) const
{
checkAdminOptionImpl(to_array(role_id), [&role_name](const UUID &, size_t) { return std::optional<String>{role_name}; });
checkAdminOptionImpl(true, to_array(role_id), [&role_name](const UUID &, size_t) { return std::optional<String>{role_name}; });
}
void ContextAccess::checkAdminOption(const UUID & role_id, const std::unordered_map<UUID, String> & names_of_roles) const
{
checkAdminOptionImpl(to_array(role_id), [&names_of_roles](const UUID & id, size_t) { auto it = names_of_roles.find(id); return (it != names_of_roles.end()) ? it->second : std::optional<String>{}; });
checkAdminOptionImpl(true, to_array(role_id), [&names_of_roles](const UUID & id, size_t) { auto it = names_of_roles.find(id); return (it != names_of_roles.end()) ? it->second : std::optional<String>{}; });
}
void ContextAccess::checkAdminOption(const std::vector<UUID> & role_ids) const
{
checkAdminOptionImpl(role_ids, [this](const UUID & id, size_t) { return manager->tryReadName(id); });
checkAdminOptionImpl(true, role_ids, [this](const UUID & id, size_t) { return manager->tryReadName(id); });
}
void ContextAccess::checkAdminOption(const std::vector<UUID> & role_ids, const Strings & names_of_roles) const
{
checkAdminOptionImpl(role_ids, [&names_of_roles](const UUID &, size_t i) { return std::optional<String>{names_of_roles[i]}; });
checkAdminOptionImpl(true, role_ids, [&names_of_roles](const UUID &, size_t i) { return std::optional<String>{names_of_roles[i]}; });
}
void ContextAccess::checkAdminOption(const std::vector<UUID> & role_ids, const std::unordered_map<UUID, String> & names_of_roles) const
{
checkAdminOptionImpl(role_ids, [&names_of_roles](const UUID & id, size_t) { auto it = names_of_roles.find(id); return (it != names_of_roles.end()) ? it->second : std::optional<String>{}; });
checkAdminOptionImpl(true, role_ids, [&names_of_roles](const UUID & id, size_t) { auto it = names_of_roles.find(id); return (it != names_of_roles.end()) ? it->second : std::optional<String>{}; });
}
}

View File

@ -138,6 +138,13 @@ public:
void checkAdminOption(const std::vector<UUID> & role_ids, const Strings & names_of_roles) const;
void checkAdminOption(const std::vector<UUID> & role_ids, const std::unordered_map<UUID, String> & names_of_roles) const;
bool hasAdminOption(const UUID & role_id) const;
bool hasAdminOption(const UUID & role_id, const String & role_name) const;
bool hasAdminOption(const UUID & role_id, const std::unordered_map<UUID, String> & names_of_roles) const;
bool hasAdminOption(const std::vector<UUID> & role_ids) const;
bool hasAdminOption(const std::vector<UUID> & role_ids, const Strings & names_of_roles) const;
bool hasAdminOption(const std::vector<UUID> & role_ids, const std::unordered_map<UUID, String> & names_of_roles) const;
/// Makes an instance of ContextAccess which provides full access to everything
/// without any limitations. This is used for the global context.
static std::shared_ptr<const ContextAccess> getFullAccess();
@ -183,7 +190,7 @@ private:
void checkAccessImpl2(const AccessFlags & flags, const Args &... args) const;
template <typename Container, typename GetNameFunction>
void checkAdminOptionImpl(const Container & role_ids, const GetNameFunction & get_name_function) const;
bool checkAdminOptionImpl(bool throw_on_error, const Container & role_ids, const GetNameFunction & get_name_function) const;
const AccessControlManager * manager = nullptr;
const Params params;

View File

@ -34,18 +34,23 @@ ext::scope_guard EnabledRoles::subscribeForChanges(const OnChangeHandler & handl
}
void EnabledRoles::setRolesInfo(const std::shared_ptr<const EnabledRolesInfo> & info_)
void EnabledRoles::setRolesInfo(const std::shared_ptr<const EnabledRolesInfo> & info_, ext::scope_guard & notifications)
{
std::vector<OnChangeHandler> handlers_to_notify;
SCOPE_EXIT({ for (const auto & handler : handlers_to_notify) handler(info_); });
std::lock_guard lock{mutex};
if (info && info_ && *info == *info_)
return;
info = info_;
std::vector<OnChangeHandler> handlers_to_notify;
boost::range::copy(handlers, std::back_inserter(handlers_to_notify));
notifications.join(ext::scope_guard([info = info, handlers_to_notify = std::move(handlers_to_notify)]
{
for (const auto & handler : handlers_to_notify)
handler(info);
}));
}
}

View File

@ -43,7 +43,7 @@ private:
friend class RoleCache;
EnabledRoles(const Params & params_);
void setRolesInfo(const std::shared_ptr<const EnabledRolesInfo> & info_);
void setRolesInfo(const std::shared_ptr<const EnabledRolesInfo> & info_, ext::scope_guard & notifications);
const Params params;
mutable std::shared_ptr<const EnabledRolesInfo> info;

View File

@ -65,8 +65,10 @@ RoleCache::~RoleCache() = default;
std::shared_ptr<const EnabledRoles>
RoleCache::getEnabledRoles(const boost::container::flat_set<UUID> & roles, const boost::container::flat_set<UUID> & roles_with_admin_option)
{
std::lock_guard lock{mutex};
/// Declared before `lock` to send notifications after the mutex will be unlocked.
ext::scope_guard notifications;
std::lock_guard lock{mutex};
EnabledRoles::Params params;
params.current_roles = roles;
params.current_roles_with_admin_option = roles_with_admin_option;
@ -80,13 +82,13 @@ RoleCache::getEnabledRoles(const boost::container::flat_set<UUID> & roles, const
}
auto res = std::shared_ptr<EnabledRoles>(new EnabledRoles(params));
collectEnabledRoles(*res);
collectEnabledRoles(*res, notifications);
enabled_roles.emplace(std::move(params), res);
return res;
}
void RoleCache::collectEnabledRoles()
void RoleCache::collectEnabledRoles(ext::scope_guard & notifications)
{
/// `mutex` is already locked.
@ -97,14 +99,14 @@ void RoleCache::collectEnabledRoles()
i = enabled_roles.erase(i);
else
{
collectEnabledRoles(*elem);
collectEnabledRoles(*elem, notifications);
++i;
}
}
}
void RoleCache::collectEnabledRoles(EnabledRoles & enabled)
void RoleCache::collectEnabledRoles(EnabledRoles & enabled, ext::scope_guard & notifications)
{
/// `mutex` is already locked.
@ -119,7 +121,7 @@ void RoleCache::collectEnabledRoles(EnabledRoles & enabled)
collectRoles(*new_info, skip_ids, get_role_function, current_role, true, true);
/// Collect data from the collected roles.
enabled.setRolesInfo(new_info);
enabled.setRolesInfo(new_info, notifications);
}
@ -156,21 +158,27 @@ RolePtr RoleCache::getRole(const UUID & role_id)
void RoleCache::roleChanged(const UUID & role_id, const RolePtr & changed_role)
{
/// Declared before `lock` to send notifications after the mutex will be unlocked.
ext::scope_guard notifications;
std::lock_guard lock{mutex};
auto role_from_cache = cache.get(role_id);
if (!role_from_cache)
return;
role_from_cache->first = changed_role;
cache.update(role_id, role_from_cache);
collectEnabledRoles();
collectEnabledRoles(notifications);
}
void RoleCache::roleRemoved(const UUID & role_id)
{
/// Declared before `lock` to send notifications after the mutex will be unlocked.
ext::scope_guard notifications;
std::lock_guard lock{mutex};
cache.remove(role_id);
collectEnabledRoles();
collectEnabledRoles(notifications);
}
}

View File

@ -23,8 +23,8 @@ public:
const boost::container::flat_set<UUID> & current_roles, const boost::container::flat_set<UUID> & current_roles_with_admin_option);
private:
void collectEnabledRoles();
void collectEnabledRoles(EnabledRoles & enabled);
void collectEnabledRoles(ext::scope_guard & notifications);
void collectEnabledRoles(EnabledRoles & enabled, ext::scope_guard & notifications);
RolePtr getRole(const UUID & role_id);
void roleChanged(const UUID & role_id, const RolePtr & changed_role);
void roleRemoved(const UUID & role_id);

View File

@ -143,13 +143,13 @@ void LinearModelData::updateState()
void LinearModelData::predict(
ColumnVector<Float64>::Container & container,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
const Context & context) const
{
gradient_computer->predict(container, block, offset, limit, arguments, weights, bias, context);
gradient_computer->predict(container, columns, offset, limit, arguments, weights, bias, context);
}
void LinearModelData::returnWeights(IColumn & to) const
@ -449,7 +449,7 @@ void IWeightsUpdater::addToBatch(
void LogisticRegression::predict(
ColumnVector<Float64>::Container & container,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
@ -457,7 +457,7 @@ void LogisticRegression::predict(
Float64 bias,
const Context & /*context*/) const
{
size_t rows_num = block.rows();
size_t rows_num = columns[arguments.front()].column->size();
if (offset > rows_num || offset + limit > rows_num)
throw Exception("Invalid offset and limit for LogisticRegression::predict. "
@ -468,7 +468,7 @@ void LogisticRegression::predict(
for (size_t i = 1; i < arguments.size(); ++i)
{
const ColumnWithTypeAndName & cur_col = block.getByPosition(arguments[i]);
const ColumnWithTypeAndName & cur_col = columns[arguments[i]];
if (!isNativeNumber(cur_col.type))
throw Exception("Prediction arguments must have numeric type", ErrorCodes::BAD_ARGUMENTS);
@ -518,7 +518,7 @@ void LogisticRegression::compute(
void LinearRegression::predict(
ColumnVector<Float64>::Container & container,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
@ -531,7 +531,7 @@ void LinearRegression::predict(
throw Exception("In predict function number of arguments differs from the size of weights vector", ErrorCodes::LOGICAL_ERROR);
}
size_t rows_num = block.rows();
size_t rows_num = columns[arguments.front()].column->size();
if (offset > rows_num || offset + limit > rows_num)
throw Exception("Invalid offset and limit for LogisticRegression::predict. "
@ -542,7 +542,7 @@ void LinearRegression::predict(
for (size_t i = 1; i < arguments.size(); ++i)
{
const ColumnWithTypeAndName & cur_col = block.getByPosition(arguments[i]);
const ColumnWithTypeAndName & cur_col = columns[arguments[i]];
if (!isNativeNumber(cur_col.type))
throw Exception("Prediction arguments must have numeric type", ErrorCodes::BAD_ARGUMENTS);

View File

@ -39,7 +39,7 @@ public:
virtual void predict(
ColumnVector<Float64>::Container & container,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
@ -65,7 +65,7 @@ public:
void predict(
ColumnVector<Float64>::Container & container,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
@ -91,7 +91,7 @@ public:
void predict(
ColumnVector<Float64>::Container & container,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
@ -264,7 +264,7 @@ public:
void predict(
ColumnVector<Float64>::Container & container,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
@ -364,7 +364,7 @@ public:
void predictValues(
ConstAggregateDataPtr place,
IColumn & to,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
@ -382,7 +382,7 @@ public:
throw Exception("Cast of column of predictions is incorrect. getReturnTypeToPredict must return same value as it is casted to",
ErrorCodes::LOGICAL_ERROR);
this->data(place).predict(column->getData(), block, offset, limit, arguments, context);
this->data(place).predict(column->getData(), columns, offset, limit, arguments, context);
}
/** This function is called if aggregate function without State modifier is selected in a query.

View File

@ -1,4 +1,3 @@
#pragma once
#include <DataTypes/DataTypeAggregateFunction.h>

View File

@ -114,7 +114,7 @@ public:
virtual void predictValues(
ConstAggregateDataPtr /* place */,
IColumn & /*to*/,
Block & /*block*/,
ColumnsWithTypeAndName & /*block*/,
size_t /*offset*/,
size_t /*limit*/,
const ColumnNumbers & /*arguments*/,

View File

@ -161,7 +161,7 @@ MutableColumnPtr ColumnAggregateFunction::convertToValues(MutableColumnPtr colum
return res;
}
MutableColumnPtr ColumnAggregateFunction::predictValues(Block & block, const ColumnNumbers & arguments, const Context & context) const
MutableColumnPtr ColumnAggregateFunction::predictValues(ColumnsWithTypeAndName & block, const ColumnNumbers & arguments, const Context & context) const
{
MutableColumnPtr res = func->getReturnTypeToPredict()->createColumn();
res->reserve(data.size());
@ -172,7 +172,7 @@ MutableColumnPtr ColumnAggregateFunction::predictValues(Block & block, const Col
if (data.size() == 1)
{
/// Case for const column. Predict using single model.
machine_learning_function->predictValues(data[0], *res, block, 0, block.rows(), arguments, context);
machine_learning_function->predictValues(data[0], *res, block, 0, block[arguments.front()].column->size(), arguments, context);
}
else
{

View File

@ -119,7 +119,7 @@ public:
const char * getFamilyName() const override { return "AggregateFunction"; }
TypeIndex getDataType() const override { return TypeIndex::AggregateFunction; }
MutableColumnPtr predictValues(Block & block, const ColumnNumbers & arguments, const Context & context) const;
MutableColumnPtr predictValues(ColumnsWithTypeAndName & block, const ColumnNumbers & arguments, const Context & context) const;
size_t size() const override
{

View File

@ -187,16 +187,16 @@ ColumnWithTypeAndName ColumnFunction::reduce() const
throw Exception("Cannot call function " + function->getName() + " because is has " + toString(args) +
"arguments but " + toString(captured) + " columns were captured.", ErrorCodes::LOGICAL_ERROR);
Block block(captured_columns);
block.insert({nullptr, function->getReturnType(), ""});
auto columns = captured_columns;
columns.emplace_back(ColumnWithTypeAndName {nullptr, function->getReturnType(), ""});
ColumnNumbers arguments(captured_columns.size());
for (size_t i = 0; i < captured_columns.size(); ++i)
arguments[i] = i;
function->execute(block, arguments, captured_columns.size(), size_);
function->execute(columns, arguments, captured_columns.size(), size_);
return block.getByPosition(captured_columns.size());
return columns[captured_columns.size()];
}
}

View File

@ -1,3 +1,4 @@
#pragma once
/**
* This file implements template methods of IColumn that depend on other types
* we don't want to include.
@ -5,8 +6,6 @@
* implementation.
*/
#pragma once
#include <Columns/IColumn.h>
#include <Common/PODArray.h>

View File

@ -1,7 +1,7 @@
#pragma once
/**
* This file provides forward declarations for Allocator.
*/
#pragma once
template <bool clear_memory_, bool mmap_populate = false>
class Allocator;

View File

@ -1,3 +1,4 @@
#pragma once
#include <Common/Arena.h>
#include <Common/Allocator.h>

View File

@ -34,13 +34,6 @@
M(QueryThread, "Number of query processing threads") \
M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \
M(MemoryTracking, "Total amount of memory (bytes) allocated by the server.") \
M(MemoryTrackingInBackgroundProcessingPool, "Total amount of memory (bytes) allocated in background processing pool (that is dedicated for background merges, mutations and fetches). Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \
M(MemoryTrackingInBackgroundMoveProcessingPool, "Total amount of memory (bytes) allocated in background processing pool (that is dedicated for background moves). Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \
M(MemoryTrackingInBackgroundSchedulePool, "Total amount of memory (bytes) allocated in background schedule pool (that is dedicated for bookkeeping tasks of Replicated tables).") \
M(MemoryTrackingInBackgroundBufferFlushSchedulePool, "Total amount of memory (bytes) allocated in background buffer flushes pool (that is dedicated for background buffer flushes).") \
M(MemoryTrackingInBackgroundDistributedSchedulePool, "Total amount of memory (bytes) allocated in background distributed schedule pool (that is dedicated for distributed sends).") \
M(MemoryTrackingInBackgroundMessageBrokerSchedulePool, "Total amount of memory (bytes) allocated in background message broker pool (that is dedicated for background message streaming).") \
M(MemoryTrackingForMerges, "Total amount of memory (bytes) allocated for background merges. Included in MemoryTrackingInBackgroundProcessingPool. Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \
M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \
M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \
M(ZooKeeperWatch, "Number of watches (event subscriptions) in ZooKeeper.") \

View File

@ -73,7 +73,7 @@ inline DB::UInt64 intHashCRC32(DB::UInt64 x, DB::UInt64 updated_value)
}
template <typename T>
inline typename std::enable_if<(sizeof(T) > sizeof(DB::UInt64)) && !is_big_int_v<T>, DB::UInt64>::type
inline typename std::enable_if<(sizeof(T) > sizeof(DB::UInt64)), DB::UInt64>::type
intHashCRC32(const T & x, DB::UInt64 updated_value)
{
auto * begin = reinterpret_cast<const char *>(&x);
@ -86,16 +86,6 @@ intHashCRC32(const T & x, DB::UInt64 updated_value)
return updated_value;
}
template <typename T>
inline typename std::enable_if<is_big_int_v<T>, DB::UInt64>::type
intHashCRC32(const T & x, DB::UInt64 updated_value)
{
std::vector<UInt64> parts = DB::BigInt<T>::toIntArray(x);
for (const auto & part : parts)
updated_value = intHashCRC32(part, updated_value);
return updated_value;
}
inline UInt32 updateWeakHash32(const DB::UInt8 * pos, size_t size, DB::UInt32 updated_value)
{
@ -248,22 +238,7 @@ inline size_t hashCRC32(std::enable_if_t<(sizeof(T) <= sizeof(UInt64)), T> key)
template <typename T>
inline size_t hashCRC32(std::enable_if_t<(sizeof(T) > sizeof(UInt64)), T> key)
{
if constexpr (std::is_same_v<T, DB::Int128>)
{
return intHashCRC32(static_cast<UInt64>(key) ^ static_cast<UInt64>(key >> 64));
}
else if constexpr (std::is_same_v<T, DB::UInt128>)
{
return intHashCRC32(key.low ^ key.high);
}
else if constexpr (is_big_int_v<T> && sizeof(T) == 32)
{
return intHashCRC32(static_cast<UInt64>(key) ^
static_cast<UInt64>(key >> 64) ^
static_cast<UInt64>(key >> 128) ^
static_cast<UInt64>(key >> 256));
}
__builtin_unreachable();
return intHashCRC32(key, -1);
}
#define DEFINE_HASH(T) \

View File

@ -1,3 +1,4 @@
#pragma once
#include <array>
namespace Poco { namespace Net { class IPAddress; }}

View File

@ -1,3 +1,4 @@
#pragma once
/**
* This file contains some using-declarations that define various kinds of
* PODArray.

View File

@ -1,3 +1,4 @@
#pragma once
#include <cstdint>

View File

@ -1,3 +1,4 @@
#pragma once
#include <utility>
#include <cstddef>

View File

@ -1,3 +1,4 @@
#pragma once
#if defined(__linux__)
#include <linux/capability.h>

View File

@ -1,3 +1,4 @@
#pragma once
namespace DB
{

View File

@ -1,3 +1,4 @@
#pragma once
#include <Functions/FunctionFactory.h>
#include <Functions/registerFunctions.h>

View File

@ -279,7 +279,9 @@ int main(int argc, char ** argv)
if (!method || method == 1) test<identity> (n, data.data(), "0: identity");
if (!method || method == 2) test<intHash32> (n, data.data(), "1: intHash32");
#if !defined(__APPLE__) /// The difference in size_t: unsigned long on Linux, unsigned long long on Mac OS.
if (!method || method == 3) test<intHash64> (n, data.data(), "2: intHash64");
#endif
if (!method || method == 4) test<hash3> (n, data.data(), "3: two rounds");
if (!method || method == 5) test<hash4> (n, data.data(), "4: two rounds and two variables");
if (!method || method == 6) test<hash5> (n, data.data(), "5: two rounds with less ops");

View File

@ -150,10 +150,9 @@ Coordination::WatchCallback BackgroundSchedulePoolTaskInfo::getWatchCallback()
}
BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, CurrentMetrics::Metric memory_metric_, const char *thread_name_)
BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, const char *thread_name_)
: size(size_)
, tasks_metric(tasks_metric_)
, memory_metric(memory_metric_)
, thread_name(thread_name_)
{
LOG_INFO(&Poco::Logger::get("BackgroundSchedulePool/" + thread_name), "Create BackgroundSchedulePool with {} threads", size);
@ -249,8 +248,6 @@ void BackgroundSchedulePool::threadFunction()
attachToThreadGroup();
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
if (auto * memory_tracker = CurrentThread::getMemoryTracker())
memory_tracker->setMetric(memory_metric);
while (!shutdown)
{

View File

@ -51,7 +51,7 @@ public:
size_t getNumberOfThreads() const { return size; }
/// thread_name_ cannot be longer then 13 bytes (2 bytes is reserved for "/D" suffix for delayExecutionThreadFunction())
BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, CurrentMetrics::Metric memory_metric_, const char *thread_name_);
BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, const char *thread_name_);
~BackgroundSchedulePool();
private:
@ -85,7 +85,6 @@ private:
ThreadGroupStatusPtr thread_group;
CurrentMetrics::Metric tasks_metric;
CurrentMetrics::Metric memory_metric;
std::string thread_name;
void attachToThreadGroup();

View File

@ -1,8 +1,10 @@
#pragma once
#include <common/StringRef.h>
#include <common/unaligned.h>
#include <Core/Types.h>
namespace DB
{
@ -14,8 +16,7 @@ struct BigInt
static StringRef serialize(const T & x, char * pos)
{
//unalignedStore<T>(pos, x);
memcpy(pos, &x, size);
unalignedStore<T>(pos, x);
return StringRef(pos, size);
}
@ -28,20 +29,7 @@ struct BigInt
static T deserialize(const char * pos)
{
//return unalignedLoad<T>(pos);
T res;
memcpy(&res, pos, size);
return res;
}
static std::vector<UInt64> toIntArray(const T & x)
{
std::vector<UInt64> parts(4, 0);
parts[0] = UInt64(x);
parts[1] = UInt64(x >> 64);
parts[2] = UInt64(x >> 128);
parts[4] = UInt64(x >> 192);
return parts;
return unalignedLoad<T>(pos);
}
};

View File

@ -152,6 +152,12 @@ public:
private:
void eraseImpl(size_t position);
void initializeIndexByName();
/// This is needed to allow function execution over data.
/// It is safe because functions does not change column names, so index is unaffected.
/// It is temporary.
friend struct ExpressionAction;
friend class ActionsDAG;
};
using Blocks = std::vector<Block>;

View File

@ -60,14 +60,14 @@ public:
using ArrayA = typename ColVecA::Container;
using ArrayB = typename ColVecB::Container;
DecimalComparison(Block & block, size_t result, const ColumnWithTypeAndName & col_left, const ColumnWithTypeAndName & col_right)
DecimalComparison(ColumnsWithTypeAndName & data, size_t result, const ColumnWithTypeAndName & col_left, const ColumnWithTypeAndName & col_right)
{
if (!apply(block, result, col_left, col_right))
if (!apply(data, result, col_left, col_right))
throw Exception("Wrong decimal comparison with " + col_left.type->getName() + " and " + col_right.type->getName(),
ErrorCodes::LOGICAL_ERROR);
}
static bool apply(Block & block, size_t result [[maybe_unused]],
static bool apply(ColumnsWithTypeAndName & data, size_t result [[maybe_unused]],
const ColumnWithTypeAndName & col_left, const ColumnWithTypeAndName & col_right)
{
if constexpr (_actual)
@ -77,7 +77,7 @@ public:
c_res = applyWithScale(col_left.column, col_right.column, shift);
if (c_res)
block.getByPosition(result).column = std::move(c_res);
data[result].column = std::move(c_res);
return true;
}
return false;

View File

@ -902,7 +902,7 @@ public:
{
type_to_method[user_auth_type]->authenticate(user_name, context, mt, address);
mt.send(Messaging::AuthenticationOk(), true);
LOG_INFO(log, "Authentication for user {} was successful.", user_name);
LOG_DEBUG(log, "Authentication for user {} was successful.", user_name);
return;
}

View File

@ -159,6 +159,7 @@ class IColumn;
\
M(UInt64, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled.", 0) \
M(Milliseconds, insert_quorum_timeout, 600000, "", 0) \
M(Bool, insert_quorum_parallel, false, "For quorum INSERT queries - enable to make parallel inserts without linearizability", 0) \
M(UInt64, select_sequential_consistency, 0, "For SELECT queries from the replicated table, throw an exception if the replica does not have a chunk written with the quorum; do not read the parts that have not yet been written with the quorum.", 0) \
M(UInt64, table_function_remote_max_addresses, 1000, "The maximum number of different shards and the maximum number of replicas of one shard in the `remote` function.", 0) \
M(Milliseconds, read_backoff_min_latency_ms, 1000, "Setting to reduce the number of threads in case of slow reads. Pay attention only to reads that took at least that much time.", 0) \
@ -463,6 +464,7 @@ class IColumn;
M(Bool, output_format_write_statistics, true, "Write statistics about read rows, bytes, time elapsed in suitable output formats.", 0) \
M(Bool, allow_non_metadata_alters, true, "Allow to execute alters which affects not only tables metadata, but also data on disk", 0) \
M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \
M(Bool, enable_global_with_statement, false, "Propagate WITH statements to UNION queries and all subqueries", 0) \
#define LIST_OF_SETTINGS(M) \
COMMON_SETTINGS(M) \

View File

@ -1,3 +1,4 @@
#pragma once
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
@ -9,7 +10,6 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Sources/SourceWithProgress.h>

View File

@ -1,192 +0,0 @@
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnConst.h>
#include <Columns/FilterDescription.h>
#include <Interpreters/ExpressionActions.h>
#include <Common/typeid_cast.h>
#include <DataStreams/FilterBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
}
FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, ExpressionActionsPtr expression_,
String filter_column_name_, bool remove_filter_)
: remove_filter(remove_filter_)
, expression(std::move(expression_))
, filter_column_name(std::move(filter_column_name_))
{
children.push_back(input);
/// Determine position of filter column.
header = input->getHeader();
expression->execute(header);
filter_column = header.getPositionByName(filter_column_name);
auto & column_elem = header.safeGetByPosition(filter_column);
/// Isn't the filter already constant?
if (column_elem.column)
constant_filter_description = ConstantFilterDescription(*column_elem.column);
if (!constant_filter_description.always_false
&& !constant_filter_description.always_true)
{
/// Replace the filter column to a constant with value 1.
FilterDescription filter_description_check(*column_elem.column);
column_elem.column = column_elem.type->createColumnConst(header.rows(), 1u);
}
if (remove_filter)
header.erase(filter_column_name);
}
String FilterBlockInputStream::getName() const { return "Filter"; }
Block FilterBlockInputStream::getTotals()
{
totals = children.back()->getTotals();
expression->execute(totals);
return totals;
}
Block FilterBlockInputStream::getHeader() const
{
return header;
}
Block FilterBlockInputStream::readImpl()
{
Block res;
if (constant_filter_description.always_false)
return removeFilterIfNeed(std::move(res));
if (expression->checkColumnIsAlwaysFalse(filter_column_name))
return {};
/// Until non-empty block after filtering or end of stream.
while (true)
{
res = children.back()->read();
if (!res)
return res;
expression->execute(res);
if (constant_filter_description.always_true)
return removeFilterIfNeed(std::move(res));
size_t columns = res.columns();
ColumnPtr column = res.safeGetByPosition(filter_column).column;
/** It happens that at the stage of analysis of expressions (in sample_block) the columns-constants have not been calculated yet,
* and now - are calculated. That is, not all cases are covered by the code above.
* This happens if the function returns a constant for a non-constant argument.
* For example, `ignore` function.
*/
constant_filter_description = ConstantFilterDescription(*column);
if (constant_filter_description.always_false)
{
res.clear();
return res;
}
if (constant_filter_description.always_true)
return removeFilterIfNeed(std::move(res));
FilterDescription filter_and_holder(*column);
/** Let's find out how many rows will be in result.
* To do this, we filter out the first non-constant column
* or calculate number of set bytes in the filter.
*/
size_t first_non_constant_column = 0;
for (size_t i = 0; i < columns; ++i)
{
if (!isColumnConst(*res.safeGetByPosition(i).column))
{
first_non_constant_column = i;
if (first_non_constant_column != static_cast<size_t>(filter_column))
break;
}
}
size_t filtered_rows = 0;
if (first_non_constant_column != static_cast<size_t>(filter_column))
{
ColumnWithTypeAndName & current_column = res.safeGetByPosition(first_non_constant_column);
current_column.column = current_column.column->filter(*filter_and_holder.data, -1);
filtered_rows = current_column.column->size();
}
else
{
filtered_rows = countBytesInFilter(*filter_and_holder.data);
}
/// If the current block is completely filtered out, let's move on to the next one.
if (filtered_rows == 0)
continue;
/// If all the rows pass through the filter.
if (filtered_rows == filter_and_holder.data->size())
{
/// Replace the column with the filter by a constant.
res.safeGetByPosition(filter_column).column = res.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, 1u);
/// No need to touch the rest of the columns.
return removeFilterIfNeed(std::move(res));
}
/// Filter the rest of the columns.
for (size_t i = 0; i < columns; ++i)
{
ColumnWithTypeAndName & current_column = res.safeGetByPosition(i);
if (i == static_cast<size_t>(filter_column))
{
/// The column with filter itself is replaced with a column with a constant `1`, since after filtering, nothing else will remain.
/// NOTE User could pass column with something different than 0 and 1 for filter.
/// Example:
/// SELECT materialize(100) AS x WHERE x
/// will work incorrectly.
current_column.column = current_column.type->createColumnConst(filtered_rows, 1u);
continue;
}
if (i == first_non_constant_column)
continue;
if (isColumnConst(*current_column.column))
current_column.column = current_column.column->cut(0, filtered_rows);
else
current_column.column = current_column.column->filter(*filter_and_holder.data, -1);
}
return removeFilterIfNeed(std::move(res));
}
}
Block FilterBlockInputStream::removeFilterIfNeed(Block && block) const
{
if (block && remove_filter)
block.erase(static_cast<size_t>(filter_column));
return std::move(block);
}
}

View File

@ -1,46 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Columns/FilterDescription.h>
namespace DB
{
class ExpressionActions;
/** Implements WHERE, HAVING operations.
* A stream of blocks and an expression, which adds to the block one ColumnUInt8 column containing the filtering conditions, are passed as input.
* The expression is evaluated and a stream of blocks is returned, which contains only the filtered rows.
*/
class FilterBlockInputStream : public IBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
public:
FilterBlockInputStream(const BlockInputStreamPtr & input, ExpressionActionsPtr expression_,
String filter_column_name_, bool remove_filter_ = false);
String getName() const override;
Block getTotals() override;
Block getHeader() const override;
protected:
Block readImpl() override;
bool remove_filter;
private:
ExpressionActionsPtr expression;
Block header;
String filter_column_name;
ssize_t filter_column;
ConstantFilterDescription constant_filter_description;
Block removeFilterIfNeed(Block && block) const;
};
}

View File

@ -7,7 +7,6 @@ namespace DB
{
/** A stream of blocks from which you can read one block.
* Also see BlocksListBlockInputStream.
*/
class OneBlockInputStream : public IBlockInputStream
{

View File

@ -1,6 +1,7 @@
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataTypes/NestedUtils.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>

View File

@ -2,7 +2,6 @@
#include <DataStreams/copyData.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <Storages/StorageMaterializedView.h>

View File

@ -150,7 +150,7 @@ void TTLBlockInputStream::readSuffixImpl()
data_part->expired_columns = std::move(empty_columns);
if (rows_removed)
LOG_INFO(log, "Removed {} rows with expired TTL from part {}", rows_removed, data_part->name);
LOG_DEBUG(log, "Removed {} rows with expired TTL from part {}", rows_removed, data_part->name);
}
void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)

View File

@ -1,4 +0,0 @@
set(SRCS)
add_executable (finish_sorting_stream finish_sorting_stream.cpp ${SRCS})
target_link_libraries (finish_sorting_stream PRIVATE clickhouse_aggregate_functions dbms)

View File

@ -1,132 +0,0 @@
#include <iostream>
#include <iomanip>
#include <pcg_random.hpp>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Core/SortDescription.h>
#include <Interpreters/sortBlock.h>
#include <Processors/Transforms/FinishSortingTransform.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <Processors/Transforms/MergeSortingTransform.h>
#include <DataStreams/BlocksListBlockInputStream.h>
using namespace DB;
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
int main(int argc, char ** argv)
{
pcg64 rng;
try
{
size_t m = argc >= 2 ? std::stol(argv[1]) : 2;
size_t n = argc >= 3 ? std::stol(argv[2]) : 10;
SortDescription sort_descr;
sort_descr.emplace_back("col1", 1, 1);
Block block_header;
BlocksList blocks;
for (size_t t = 0; t < m; ++t)
{
Block block;
for (size_t i = 0; i < 2; ++i)
{
ColumnWithTypeAndName column;
column.name = "col" + std::to_string(i + 1);
column.type = std::make_shared<DataTypeInt32>();
auto col = ColumnInt32::create();
auto & vec = col->getData();
vec.resize(n);
for (size_t j = 0; j < n; ++j)
vec[j] = rng() % 10;
column.column = std::move(col);
block.insert(column);
}
if (!block_header)
block_header = block.cloneEmpty();
sortBlock(block, sort_descr);
blocks.emplace_back(std::move(block));
}
auto blocks_stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
Pipe source(std::make_shared<SourceFromInputStream>(std::move(blocks_stream)));
QueryPipeline pipeline;
pipeline.init(std::move(source));
pipeline.addTransform(std::make_shared<MergeSortingTransform>(pipeline.getHeader(), sort_descr, n, 0, 0, 0, nullptr, 0));
SortDescription sort_descr_final;
sort_descr_final.emplace_back("col1", 1, 1);
sort_descr_final.emplace_back("col2", 1, 1);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FinishSortingTransform>(header, sort_descr, sort_descr_final, n, 0);
});
auto stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
{
Stopwatch stopwatch;
stopwatch.start();
Block res_block = block_header;
while (Block block = stream->read())
{
for (size_t i = 0; i < block.columns(); ++i)
{
MutableColumnPtr ptr = IColumn::mutate(std::move(res_block.getByPosition(i).column));
ptr->insertRangeFrom(*block.getByPosition(i).column.get(), 0, block.rows());
}
}
if (res_block.rows() != n * m)
throw Exception("Result block size mismatch", ErrorCodes::LOGICAL_ERROR);
const auto & columns = res_block.getColumns();
for (size_t i = 1; i < res_block.rows(); ++i)
for (const auto & col : columns)
{
int res = col->compareAt(i - 1, i, *col, 1);
if (res < 0)
break;
else if (res > 0)
throw Exception("Result stream not sorted", ErrorCodes::LOGICAL_ERROR);
}
stopwatch.stop();
std::cout << std::fixed << std::setprecision(2)
<< "Elapsed " << stopwatch.elapsedSeconds() << " sec."
<< ", " << n / stopwatch.elapsedSeconds() << " rows/sec."
<< std::endl;
}
}
catch (const Exception & e)
{
std::cerr << e.displayText() << std::endl;
return -1;
}
return 0;
}

View File

@ -25,7 +25,6 @@ SRCS(
DistinctSortedBlockInputStream.cpp
ExecutionSpeedLimits.cpp
ExpressionBlockInputStream.cpp
FilterBlockInputStream.cpp
finalizeBlock.cpp
IBlockInputStream.cpp
InputStreamFromASTInsertQuery.cpp

View File

@ -70,17 +70,6 @@ std::pair<std::string, std::string> splitName(const std::string & name)
return {{ begin, first_end }, { second_begin, end }};
}
std::string createCommaSeparatedStringFrom(const Names & names)
{
std::ostringstream ss;
if (!names.empty())
{
std::copy(names.begin(), std::prev(names.end()), std::ostream_iterator<std::string>(ss, ", "));
ss << names.back();
}
return ss.str();
}
std::string extractTableName(const std::string & nested_name)
{

View File

@ -13,8 +13,6 @@ namespace Nested
std::pair<std::string, std::string> splitName(const std::string & name);
std::string createCommaSeparatedStringFrom(const Names & names);
/// Returns the prefix of the name to the first '.'. Or the name is unchanged if there is no dot.
std::string extractTableName(const std::string & nested_name);

View File

@ -115,6 +115,10 @@ void DatabaseAtomic::dropTable(const Context &, const String & table_name, bool
table_name_to_path.erase(table_name);
}
tryRemoveSymlink(table_name);
/// Remove the inner table (if any) to avoid deadlock
/// (due to attemp to execute DROP from the worker thread)
if (auto * mv = dynamic_cast<StorageMaterializedView *>(table.get()))
mv->dropInnerTable(no_delay);
/// Notify DatabaseCatalog that table was dropped. It will remove table data in background.
/// Cleanup is performed outside of database to allow easily DROP DATABASE without waiting for cleanup to complete.
DatabaseCatalog::instance().enqueueDroppedTableCleanup(table->getStorageID(), table, table_metadata_path_drop, no_delay);

View File

@ -1,3 +1,4 @@
#pragma once
#include <Databases/DatabaseOnDisk.h>
#include <boost/smart_ptr/atomic_shared_ptr.hpp>
#include <ext/scope_guard.h>

View File

@ -1,3 +1,4 @@
#pragma once
#include "CacheDictionary.h"
#include <Columns/ColumnsNumber.h>

View File

@ -173,7 +173,6 @@ namespace
std::function<void(WriteBufferFromFile &)> send_data;
ThreadFromGlobalPool thread;
};
}

View File

@ -1,3 +1,4 @@
#pragma once
#include <Disks/DiskLocal.h>
#include <Disks/DiskMemory.h>
#include <Disks/IDisk.h>

View File

@ -6,6 +6,7 @@
#include <Core/DecimalFunctions.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/extractTimeZoneFromFunctionArguments.h>
#include <Functions/IFunctionImpl.h>
#include <Common/Exception.h>
#include <common/DateLUTImpl.h>
@ -115,29 +116,29 @@ template <typename FromDataType, typename ToDataType>
struct CustomWeekTransformImpl
{
template <typename Transform>
static void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/, Transform transform = {})
static void execute(ColumnsWithTypeAndName & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/, Transform transform = {})
{
const auto op = Transformer<typename FromDataType::FieldType, typename ToDataType::FieldType, Transform>{std::move(transform)};
UInt8 week_mode = DEFAULT_WEEK_MODE;
if (arguments.size() > 1)
{
if (const auto week_mode_column = checkAndGetColumnConst<ColumnUInt8>(block.getByPosition(arguments[1]).column.get()))
if (const auto week_mode_column = checkAndGetColumnConst<ColumnUInt8>(block[arguments[1]].column.get()))
week_mode = week_mode_column->getValue<UInt8>();
}
const DateLUTImpl & time_zone = extractTimeZoneFromFunctionArguments(block, arguments, 2, 0);
const ColumnPtr source_col = block.getByPosition(arguments[0]).column;
const ColumnPtr source_col = block[arguments[0]].column;
if (const auto * sources = checkAndGetColumn<typename FromDataType::ColumnType>(source_col.get()))
{
auto col_to = ToDataType::ColumnType::create();
op.vector(sources->getData(), col_to->getData(), week_mode, time_zone);
block.getByPosition(result).column = std::move(col_to);
block[result].column = std::move(col_to);
}
else
{
throw Exception(
"Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of first argument of function "
"Illegal column " + block[arguments[0]].column->getName() + " of first argument of function "
+ Transform::name,
ErrorCodes::ILLEGAL_COLUMN);
}

View File

@ -6,6 +6,7 @@
#include <Columns/ColumnVector.h>
#include <Columns/ColumnDecimal.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunctionImpl.h>
#include <Functions/extractTimeZoneFromFunctionArguments.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
@ -682,25 +683,25 @@ struct Transformer
template <typename FromDataType, typename ToDataType, typename Transform>
struct DateTimeTransformImpl
{
static void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/, const Transform & transform = {})
static void execute(ColumnsWithTypeAndName & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/, const Transform & transform = {})
{
using Op = Transformer<typename FromDataType::FieldType, typename ToDataType::FieldType, Transform>;
const DateLUTImpl & time_zone = extractTimeZoneFromFunctionArguments(block, arguments, 1, 0);
const ColumnPtr source_col = block.getByPosition(arguments[0]).column;
const ColumnPtr source_col = block[arguments[0]].column;
if (const auto * sources = checkAndGetColumn<typename FromDataType::ColumnType>(source_col.get()))
{
auto mutable_result_col = block.getByPosition(result).type->createColumn();
auto mutable_result_col = block[result].type->createColumn();
auto * col_to = assert_cast<typename ToDataType::ColumnType *>(mutable_result_col.get());
Op::vector(sources->getData(), col_to->getData(), time_zone, transform);
block.getByPosition(result).column = std::move(mutable_result_col);
block[result].column = std::move(mutable_result_col);
}
else
{
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
throw Exception("Illegal column " + block[arguments[0]].column->getName()
+ " of first argument of function " + Transform::name,
ErrorCodes::ILLEGAL_COLUMN);
}

View File

@ -1,3 +1,4 @@
#pragma once
#if !defined(ARCADIA_BUILD)
# include "config_functions.h"
#endif
@ -92,12 +93,12 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const override
{
const ColumnPtr column_string = block.getByPosition(arguments[0]).column;
const ColumnPtr column_string = block[arguments[0]].column;
const ColumnString * input = checkAndGetColumn<ColumnString>(column_string.get());
if (!input)
throw Exception(
"Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of first argument of function " + getName(),
"Illegal column " + block[arguments[0]].column->getName() + " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
auto dst_column = ColumnString::create();
@ -165,7 +166,7 @@ public:
dst_data.resize(dst_pos - dst);
block.getByPosition(result).column = std::move(dst_column);
block[result].column = std::move(dst_column);
}
};
}

View File

@ -616,14 +616,14 @@ class FunctionBinaryArithmetic : public IFunction
void executeAggregateMultiply(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const
{
ColumnNumbers new_arguments = arguments;
if (WhichDataType(block.getByPosition(new_arguments[1]).type).isAggregateFunction())
if (WhichDataType(block[new_arguments[1]].type).isAggregateFunction())
std::swap(new_arguments[0], new_arguments[1]);
if (!isColumnConst(*block.getByPosition(new_arguments[1]).column))
throw Exception{"Illegal column " + block.getByPosition(new_arguments[1]).column->getName()
if (!isColumnConst(*block[new_arguments[1]].column))
throw Exception{"Illegal column " + block[new_arguments[1]].column->getName()
+ " of argument of aggregation state multiply. Should be integer constant", ErrorCodes::ILLEGAL_COLUMN};
const IColumn & agg_state_column = *block.getByPosition(new_arguments[0]).column;
const IColumn & agg_state_column = *block[new_arguments[0]].column;
bool agg_state_is_const = isColumnConst(agg_state_column);
const ColumnAggregateFunction & column = typeid_cast<const ColumnAggregateFunction &>(
agg_state_is_const ? assert_cast<const ColumnConst &>(agg_state_column).getDataColumn() : agg_state_column);
@ -647,7 +647,7 @@ class FunctionBinaryArithmetic : public IFunction
auto & vec_to = column_to->getData();
auto & vec_from = column_from->getData();
UInt64 m = typeid_cast<const ColumnConst *>(block.getByPosition(new_arguments[1]).column.get())->getValue<UInt64>();
UInt64 m = typeid_cast<const ColumnConst *>(block[new_arguments[1]].column.get())->getValue<UInt64>();
// Since we merge the function states by ourselves, we have to have an
// Arena for this. Pass it to the resulting column so that the arena
@ -674,16 +674,16 @@ class FunctionBinaryArithmetic : public IFunction
}
if (agg_state_is_const)
block.getByPosition(result).column = ColumnConst::create(std::move(column_to), input_rows_count);
block[result].column = ColumnConst::create(std::move(column_to), input_rows_count);
else
block.getByPosition(result).column = std::move(column_to);
block[result].column = std::move(column_to);
}
/// Merge two aggregation states together.
void executeAggregateAddition(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const
{
const IColumn & lhs_column = *block.getByPosition(arguments[0]).column;
const IColumn & rhs_column = *block.getByPosition(arguments[1]).column;
const IColumn & lhs_column = *block[arguments[0]].column;
const IColumn & rhs_column = *block[arguments[1]].column;
bool lhs_is_const = isColumnConst(lhs_column);
bool rhs_is_const = isColumnConst(rhs_column);
@ -707,9 +707,9 @@ class FunctionBinaryArithmetic : public IFunction
}
if (lhs_is_const && rhs_is_const)
block.getByPosition(result).column = ColumnConst::create(std::move(column_to), input_rows_count);
block[result].column = ColumnConst::create(std::move(column_to), input_rows_count);
else
block.getByPosition(result).column = std::move(column_to);
block[result].column = std::move(column_to);
}
void executeDateTimeIntervalPlusMinus(Block & block, const ColumnNumbers & arguments,
@ -718,19 +718,19 @@ class FunctionBinaryArithmetic : public IFunction
ColumnNumbers new_arguments = arguments;
/// Interval argument must be second.
if (WhichDataType(block.getByPosition(arguments[1]).type).isDateOrDateTime())
if (WhichDataType(block[arguments[1]].type).isDateOrDateTime())
std::swap(new_arguments[0], new_arguments[1]);
/// Change interval argument type to its representation
Block new_block = block;
new_block.getByPosition(new_arguments[1]).type = std::make_shared<DataTypeNumber<DataTypeInterval::FieldType>>();
new_block[new_arguments[1]].type = std::make_shared<DataTypeNumber<DataTypeInterval::FieldType>>();
ColumnsWithTypeAndName new_arguments_with_type_and_name =
{new_block.getByPosition(new_arguments[0]), new_block.getByPosition(new_arguments[1])};
{new_block[new_arguments[0]], new_block[new_arguments[1]]};
auto function = function_builder->build(new_arguments_with_type_and_name);
function->execute(new_block, new_arguments, result, input_rows_count);
block.getByPosition(result).column = new_block.getByPosition(result).column;
block[result].column = new_block[result].column;
}
public:
@ -855,8 +855,8 @@ public:
{
using OpImpl = FixedStringOperationImpl<Op<UInt8, UInt8>>;
auto col_left_raw = block.getByPosition(arguments[0]).column.get();
auto col_right_raw = block.getByPosition(arguments[1]).column.get();
auto col_left_raw = block[arguments[0]].column.get();
auto col_right_raw = block[arguments[1]].column.get();
if (auto col_left_const = checkAndGetColumnConst<ColumnFixedString>(col_left_raw))
{
if (auto col_right_const = checkAndGetColumnConst<ColumnFixedString>(col_right_raw))
@ -872,7 +872,7 @@ public:
col_right->getChars().data(),
out_chars.data(),
out_chars.size());
block.getByPosition(result).column = ColumnConst::create(std::move(col_res), block.rows());
block[result].column = ColumnConst::create(std::move(col_res), col_left_raw->size());
return true;
}
}
@ -922,7 +922,7 @@ public:
out_chars.size(),
col_left->getN());
}
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
return true;
}
return false;
@ -944,8 +944,8 @@ public:
using ColVecT1 = std::conditional_t<IsDecimalNumber<T1>, ColumnDecimal<T1>, ColumnVector<T1>>;
using ColVecResult = std::conditional_t<IsDecimalNumber<ResultType>, ColumnDecimal<ResultType>, ColumnVector<ResultType>>;
auto col_left_raw = block.getByPosition(arguments[0]).column.get();
auto col_right_raw = block.getByPosition(arguments[1]).column.get();
auto col_left_raw = block[arguments[0]].column.get();
auto col_right_raw = block[arguments[1]].column.get();
auto col_left_const = checkAndGetColumnConst<ColVecT0>(col_left_raw);
auto col_right_const = checkAndGetColumnConst<ColVecT1>(col_right_raw);
@ -981,14 +981,14 @@ public:
OpImplCheck::template constantConstant<dec_a, dec_b>(const_a, const_b, scale_a, scale_b) :
OpImpl::template constantConstant<dec_a, dec_b>(const_a, const_b, scale_a, scale_b);
block.getByPosition(result).column = ResultDataType(type.getPrecision(), type.getScale()).createColumnConst(
block[result].column = ResultDataType(type.getPrecision(), type.getScale()).createColumnConst(
col_left_const->size(), toField(res, type.getScale()));
return true;
}
col_res = ColVecResult::create(0, type.getScale());
auto & vec_res = col_res->getData();
vec_res.resize(block.rows());
vec_res.resize(col_left_raw->size());
if (col_left && col_right)
{
@ -1026,13 +1026,13 @@ public:
if (col_left_const && col_right_const)
{
auto res = OpImpl::constantConstant(col_left_const->template getValue<T0>(), col_right_const->template getValue<T1>());
block.getByPosition(result).column = ResultDataType().createColumnConst(col_left_const->size(), toField(res));
block[result].column = ResultDataType().createColumnConst(col_left_const->size(), toField(res));
return true;
}
col_res = ColVecResult::create();
auto & vec_res = col_res->getData();
vec_res.resize(block.rows());
vec_res.resize(col_left_raw->size());
if (col_left && col_right)
{
@ -1050,7 +1050,7 @@ public:
return false;
}
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
return true;
}
return false;
@ -1059,14 +1059,14 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const override
{
/// Special case when multiply aggregate function state
if (isAggregateMultiply(block.getByPosition(arguments[0]).type, block.getByPosition(arguments[1]).type))
if (isAggregateMultiply(block[arguments[0]].type, block[arguments[1]].type))
{
executeAggregateMultiply(block, arguments, result, input_rows_count);
return;
}
/// Special case - addition of two aggregate functions states
if (isAggregateAddition(block.getByPosition(arguments[0]).type, block.getByPosition(arguments[1]).type))
if (isAggregateAddition(block[arguments[0]].type, block[arguments[1]].type))
{
executeAggregateAddition(block, arguments, result, input_rows_count);
return;
@ -1074,14 +1074,14 @@ public:
/// Special case when the function is plus or minus, one of arguments is Date/DateTime and another is Interval.
if (auto function_builder
= getFunctionForIntervalArithmetic(block.getByPosition(arguments[0]).type, block.getByPosition(arguments[1]).type, context))
= getFunctionForIntervalArithmetic(block[arguments[0]].type, block[arguments[1]].type, context))
{
executeDateTimeIntervalPlusMinus(block, arguments, result, input_rows_count, function_builder);
return;
}
const auto & left_argument = block.getByPosition(arguments[0]);
const auto & right_argument = block.getByPosition(arguments[1]);
const auto & left_argument = block[arguments[0]];
const auto & right_argument = block[arguments[1]];
auto * left_generic = left_argument.type.get();
auto * right_generic = right_argument.type.get();
bool valid = castBothTypes(left_generic, right_generic, [&](const auto & left, const auto & right)
@ -1171,6 +1171,7 @@ class FunctionBinaryArithmeticWithConstants : public FunctionBinaryArithmetic<Op
public:
using Base = FunctionBinaryArithmetic<Op, Name, valid_on_default_arguments>;
using Monotonicity = typename Base::Monotonicity;
using Block = typename Base::Block;
static FunctionPtr create(
const ColumnWithTypeAndName & left_,
@ -1194,21 +1195,23 @@ public:
{
if (left.column && isColumnConst(*left.column) && arguments.size() == 1)
{
Block block_with_constant
ColumnsWithTypeAndName block_with_constant
= {{left.column->cloneResized(input_rows_count), left.type, left.name},
block.getByPosition(arguments[0]),
block.getByPosition(result)};
block[arguments[0]],
block[result]};
Base::executeImpl(block_with_constant, {0, 1}, 2, input_rows_count);
block.getByPosition(result) = block_with_constant.getByPosition(2);
block[result] = block_with_constant[2];
}
else if (right.column && isColumnConst(*right.column) && arguments.size() == 1)
{
Block block_with_constant
= {block.getByPosition(arguments[0]),
ColumnsWithTypeAndName block_with_constant
= {block[arguments[0]],
{right.column->cloneResized(input_rows_count), right.type, right.name},
block.getByPosition(result)};
block[result]};
Base::executeImpl(block_with_constant, {0, 1}, 2, input_rows_count);
block.getByPosition(result) = block_with_constant.getByPosition(2);
block[result] = block_with_constant[2];
}
else
Base::executeImpl(block, arguments, result, input_rows_count);
@ -1242,13 +1245,14 @@ public:
{
auto transform = [&](const Field & point)
{
Block block_with_constant
ColumnsWithTypeAndName block_with_constant
= {{left.column->cloneResized(1), left.type, left.name},
{right.type->createColumnConst(1, point), right.type, right.name},
{nullptr, return_type, ""}};
Base::executeImpl(block_with_constant, {0, 1}, 2, 1);
Field point_transformed;
block_with_constant.getByPosition(2).column->get(0, point_transformed);
block_with_constant[2].column->get(0, point_transformed);
return point_transformed;
};
transform(left_point);
@ -1277,13 +1281,14 @@ public:
{
auto transform = [&](const Field & point)
{
Block block_with_constant
ColumnsWithTypeAndName block_with_constant
= {{left.type->createColumnConst(1, point), left.type, left.name},
{right.column->cloneResized(1), right.type, right.name},
{nullptr, return_type, ""}};
Base::executeImpl(block_with_constant, {0, 1}, 2, 1);
Field point_transformed;
block_with_constant.getByPosition(2).column->get(0, point_transformed);
block_with_constant[2].column->get(0, point_transformed);
return point_transformed;
};

View File

@ -56,7 +56,7 @@ public:
void executeImpl(Block & block , const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) const override
{
const auto value_col = block.getByPosition(arguments.front()).column.get();
const auto value_col = block[arguments.front()].column.get();
if (!execute<UInt8>(block, arguments, result, value_col)
&& !execute<UInt16>(block, arguments, result, value_col)
@ -98,7 +98,7 @@ private:
out[i] = Impl::apply(val[i], mask[i]);
}
block.getByPosition(result).column = std::move(out_col);
block[result].column = std::move(out_col);
return true;
}
else if (const auto value_col_const = checkAndGetColumnConst<ColumnVector<T>>(value_col_untyped))
@ -110,7 +110,7 @@ private:
if (is_const)
{
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(size, toField(Impl::apply(val, const_mask)));
block[result].column = block[result].type->createColumnConst(size, toField(Impl::apply(val, const_mask)));
}
else
{
@ -122,7 +122,7 @@ private:
for (const auto i : ext::range(0, size))
out[i] = Impl::apply(val, mask[i]);
block.getByPosition(result).column = std::move(out_col);
block[result].column = std::move(out_col);
}
return true;
@ -139,7 +139,7 @@ private:
for (const auto i : ext::range(1, arguments.size()))
{
if (auto pos_col_const = checkAndGetColumnConst<ColumnVector<ValueType>>(block.getByPosition(arguments[i]).column.get()))
if (auto pos_col_const = checkAndGetColumnConst<ColumnVector<ValueType>>(block[arguments[i]].column.get()))
{
const auto pos = pos_col_const->getUInt(0);
if (pos < 8 * sizeof(ValueType))
@ -162,7 +162,7 @@ private:
for (const auto i : ext::range(1, arguments.size()))
{
const auto pos_col = block.getByPosition(arguments[i]).column.get();
const auto pos_col = block[arguments[i]].column.get();
if (!addToMaskImpl<UInt8>(mask, pos_col)
&& !addToMaskImpl<UInt16>(mask, pos_col)

View File

@ -1,3 +1,4 @@
#pragma once
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
@ -97,7 +98,7 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const override
{
const IDataType * from_type = block.getByPosition(arguments[0]).type.get();
const IDataType * from_type = block[arguments[0]].type.get();
WhichDataType which(from_type);
if (which.isDate())
@ -114,7 +115,7 @@ public:
}
else
throw Exception(
"Illegal type " + block.getByPosition(arguments[0]).type->getName() + " of argument of function " + getName(),
"Illegal type " + block[arguments[0]].type->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}

View File

@ -1,3 +1,4 @@
#pragma once
#include <common/DateLUTImpl.h>
#include <DataTypes/DataTypeDate.h>
@ -304,7 +305,7 @@ private:
template <typename FromDataType, typename ToDataType, typename Transform>
struct DateTimeAddIntervalImpl
{
static void execute(Transform transform, Block & block, const ColumnNumbers & arguments, size_t result)
static void execute(Transform transform, ColumnsWithTypeAndName & block, const ColumnNumbers & arguments, size_t result)
{
using FromValueType = typename FromDataType::FieldType;
using FromColumnType = typename FromDataType::ColumnType;
@ -314,14 +315,14 @@ struct DateTimeAddIntervalImpl
const DateLUTImpl & time_zone = extractTimeZoneFromFunctionArguments(block, arguments, 2, 0);
const ColumnPtr source_col = block.getByPosition(arguments[0]).column;
const ColumnPtr source_col = block[arguments[0]].column;
auto result_col = block.getByPosition(result).type->createColumn();
auto result_col = block[result].type->createColumn();
auto col_to = assert_cast<ToColumnType *>(result_col.get());
if (const auto * sources = checkAndGetColumn<FromColumnType>(source_col.get()))
{
const IColumn & delta_column = *block.getByPosition(arguments[1]).column;
const IColumn & delta_column = *block[arguments[1]].column;
if (const auto * delta_const_column = typeid_cast<const ColumnConst *>(&delta_column))
op.vectorConstant(sources->getData(), col_to->getData(), delta_const_column->getInt(0), time_zone);
@ -333,16 +334,16 @@ struct DateTimeAddIntervalImpl
op.constantVector(
sources_const->template getValue<FromValueType>(),
col_to->getData(),
*block.getByPosition(arguments[1]).column, time_zone);
*block[arguments[1]].column, time_zone);
}
else
{
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
throw Exception("Illegal column " + block[arguments[0]].column->getName()
+ " of first argument of function " + Transform::name,
ErrorCodes::ILLEGAL_COLUMN);
}
block.getByPosition(result).column = std::move(result_col);
block[result].column = std::move(result_col);
}
};
@ -464,7 +465,7 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) const override
{
const IDataType * from_type = block.getByPosition(arguments[0]).type.get();
const IDataType * from_type = block[arguments[0]].type.get();
WhichDataType which(from_type);
if (which.isDate())
@ -483,7 +484,7 @@ public:
Transform{datetime64_type->getScale()}, block, arguments, result);
}
else
throw Exception("Illegal type " + block.getByPosition(arguments[0]).type->getName() + " of first argument of function " + getName(),
throw Exception("Illegal type " + block[arguments[0]].type->getName() + " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
};

View File

@ -1,3 +1,4 @@
#pragma once
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Functions/IFunctionImpl.h>
@ -96,7 +97,7 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const override
{
const IDataType * from_type = block.getByPosition(arguments[0]).type.get();
const IDataType * from_type = block[arguments[0]].type.get();
WhichDataType which(from_type);
if (which.isDate())
@ -110,7 +111,7 @@ public:
DateTimeTransformImpl<DataTypeDateTime64, ToDataType, decltype(transformer)>::execute(block, arguments, result, input_rows_count, transformer);
}
else
throw Exception("Illegal type " + block.getByPosition(arguments[0]).type->getName() + " of argument of function " + getName(),
throw Exception("Illegal type " + block[arguments[0]].type->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}

View File

@ -36,7 +36,7 @@ public:
void executeImpl(Block & block, const ColumnNumbers &, size_t result, size_t input_rows_count) const override
{
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(
block[result].column = block[result].type->createColumnConst(
input_rows_count, getFQDNOrHostName())->convertToFullColumnIfConst();
}
};

View File

@ -2,6 +2,7 @@
#include <Functions/IFunctionAdaptors.h>
#include <Common/IFactoryWithAliases.h>
#include <Interpreters/Context.h>
#include <functional>
#include <memory>

View File

@ -51,14 +51,14 @@ Columns convertConstTupleToConstantElements(const ColumnConst & column)
}
static Block createBlockWithNestedColumnsImpl(const Block & block, const std::unordered_set<size_t> & args)
static ColumnsWithTypeAndName createBlockWithNestedColumnsImpl(const ColumnsWithTypeAndName & columns, const std::unordered_set<size_t> & args)
{
Block res;
size_t columns = block.columns();
ColumnsWithTypeAndName res;
size_t num_columns = columns.size();
for (size_t i = 0; i < columns; ++i)
for (size_t i = 0; i < num_columns; ++i)
{
const auto & col = block.getByPosition(i);
const auto & col = columns[i];
if (args.count(i) && col.type->isNullable())
{
@ -66,40 +66,40 @@ static Block createBlockWithNestedColumnsImpl(const Block & block, const std::un
if (!col.column)
{
res.insert({nullptr, nested_type, col.name});
res.emplace_back(ColumnWithTypeAndName{nullptr, nested_type, col.name});
}
else if (const auto * nullable = checkAndGetColumn<ColumnNullable>(*col.column))
{
const auto & nested_col = nullable->getNestedColumnPtr();
res.insert({nested_col, nested_type, col.name});
res.emplace_back(ColumnWithTypeAndName{nested_col, nested_type, col.name});
}
else if (const auto * const_column = checkAndGetColumn<ColumnConst>(*col.column))
{
const auto & nested_col = checkAndGetColumn<ColumnNullable>(const_column->getDataColumn())->getNestedColumnPtr();
res.insert({ ColumnConst::create(nested_col, col.column->size()), nested_type, col.name});
res.emplace_back(ColumnWithTypeAndName{ ColumnConst::create(nested_col, col.column->size()), nested_type, col.name});
}
else
throw Exception("Illegal column for DataTypeNullable", ErrorCodes::ILLEGAL_COLUMN);
}
else
res.insert(col);
res.emplace_back(col);
}
return res;
}
Block createBlockWithNestedColumns(const Block & block, const ColumnNumbers & args)
ColumnsWithTypeAndName createBlockWithNestedColumns(const ColumnsWithTypeAndName & columns, const ColumnNumbers & args)
{
std::unordered_set<size_t> args_set(args.begin(), args.end());
return createBlockWithNestedColumnsImpl(block, args_set);
return createBlockWithNestedColumnsImpl(columns, args_set);
}
Block createBlockWithNestedColumns(const Block & block, const ColumnNumbers & args, size_t result)
ColumnsWithTypeAndName createBlockWithNestedColumns(const ColumnsWithTypeAndName & columns, const ColumnNumbers & args, size_t result)
{
std::unordered_set<size_t> args_set(args.begin(), args.end());
args_set.insert(result);
return createBlockWithNestedColumnsImpl(block, args_set);
return createBlockWithNestedColumnsImpl(columns, args_set);
}
void validateArgumentType(const IFunction & func, const DataTypes & arguments,

View File

@ -85,10 +85,10 @@ Columns convertConstTupleToConstantElements(const ColumnConst & column);
/// Returns the copy of a given block in which each column specified in
/// the "arguments" parameter is replaced with its respective nested
/// column if it is nullable.
Block createBlockWithNestedColumns(const Block & block, const ColumnNumbers & args);
ColumnsWithTypeAndName createBlockWithNestedColumns(const ColumnsWithTypeAndName & columns, const ColumnNumbers & args);
/// Similar function as above. Additionally transform the result type if needed.
Block createBlockWithNestedColumns(const Block & block, const ColumnNumbers & args, size_t result);
ColumnsWithTypeAndName createBlockWithNestedColumns(const ColumnsWithTypeAndName & columns, const ColumnNumbers & args, size_t result);
/// Checks argument type at specified index with predicate.
/// throws if there is no argument at specified index or if predicate returns false.

View File

@ -19,19 +19,19 @@ namespace ErrorCodes
template <bool or_null>
void ExecutableFunctionJoinGet<or_null>::execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t)
{
Block keys;
ColumnsWithTypeAndName keys;
for (size_t i = 2; i < arguments.size(); ++i)
{
auto key = block.getByPosition(arguments[i]);
keys.insert(std::move(key));
auto key = block[arguments[i]];
keys.emplace_back(std::move(key));
}
block.getByPosition(result) = join->joinGet(keys, result_block);
block[result] = join->joinGet(keys, result_block);
}
template <bool or_null>
ExecutableFunctionImplPtr FunctionJoinGet<or_null>::prepare(const Block &, const ColumnNumbers &, size_t) const
{
return std::make_unique<ExecutableFunctionJoinGet<or_null>>(join, Block{{return_type->createColumn(), return_type, attr_name}});
return std::make_unique<ExecutableFunctionJoinGet<or_null>>(join, DB::Block{{return_type->createColumn(), return_type, attr_name}});
}
static auto getJoin(const ColumnsWithTypeAndName & arguments, const Context & context)

View File

@ -1,6 +1,8 @@
#pragma once
#include <Functions/IFunctionImpl.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/TableLockHolder.h>
#include <Core/Block.h>
namespace DB
{
@ -13,7 +15,7 @@ template <bool or_null>
class ExecutableFunctionJoinGet final : public IExecutableFunctionImpl
{
public:
ExecutableFunctionJoinGet(HashJoinPtr join_, const Block & result_block_)
ExecutableFunctionJoinGet(HashJoinPtr join_, const DB::Block & result_block_)
: join(std::move(join_)), result_block(result_block_) {}
static constexpr auto name = or_null ? "joinGetOrNull" : "joinGet";
@ -28,7 +30,7 @@ public:
private:
HashJoinPtr join;
Block result_block;
DB::Block result_block;
};
template <bool or_null>

View File

@ -95,7 +95,7 @@ private:
memcpy(&dst_data[rows_size], dst_remaining, rows_remaining * sizeof(Float64));
}
block.getByPosition(result).column = std::move(dst);
block[result].column = std::move(dst);
return true;
}
@ -157,7 +157,7 @@ private:
memcpy(&dst_data[rows_size], dst_remaining, rows_remaining * sizeof(Float64));
}
block.getByPosition(result).column = std::move(dst);
block[result].column = std::move(dst);
return true;
}
if (const auto right_arg_typed = checkAndGetColumnConst<ColumnVector<RightType>>(right_arg))
@ -200,7 +200,7 @@ private:
memcpy(&dst_data[rows_size], dst_remaining, rows_remaining * sizeof(Float64));
}
block.getByPosition(result).column = std::move(dst);
block[result].column = std::move(dst);
return true;
}
@ -209,8 +209,8 @@ private:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) const override
{
const ColumnWithTypeAndName & col_left = block.getByPosition(arguments[0]);
const ColumnWithTypeAndName & col_right = block.getByPosition(arguments[1]);
const ColumnWithTypeAndName & col_left = block[arguments[0]];
const ColumnWithTypeAndName & col_right = block[arguments[1]];
auto call = [&](const auto & types) -> bool
{

View File

@ -27,7 +27,7 @@ private:
void executeImpl(Block & block, const ColumnNumbers &, size_t result, size_t input_rows_count) const override
{
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(input_rows_count, Impl::value);
block[result].column = block[result].type->createColumnConst(input_rows_count, Impl::value);
}
};

View File

@ -124,7 +124,7 @@ private:
executeInIterations(src_data.data(), dst_data.data(), size);
block.getByPosition(result).column = std::move(dst);
block[result].column = std::move(dst);
return true;
}
@ -144,7 +144,7 @@ private:
executeInIterations(dst_data.data(), dst_data.data(), size);
block.getByPosition(result).column = std::move(dst);
block[result].column = std::move(dst);
return true;
}
@ -152,7 +152,7 @@ private:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) const override
{
const ColumnWithTypeAndName & col = block.getByPosition(arguments[0]);
const ColumnWithTypeAndName & col = block[arguments[0]];
auto call = [&](const auto & types) -> bool
{

View File

@ -1,3 +1,4 @@
#pragma once
#include <Functions/IFunctionImpl.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/DataTypesNumber.h>
@ -47,7 +48,7 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) const override
{
const auto in = block.getByPosition(arguments.front()).column.get();
const auto in = block[arguments.front()].column.get();
if ( !execute<UInt8>(block, in, result)
&& !execute<UInt16>(block, in, result)
@ -77,7 +78,7 @@ public:
for (const auto i : ext::range(0, size))
out_data[i] = Impl::execute(in_data[i]);
block.getByPosition(result).column = std::move(out);
block[result].column = std::move(out);
return true;
}

View File

@ -1,3 +1,4 @@
#pragma once
#include <Functions/FunctionHelpers.h>
#include <Functions/GatherUtils/GatherUtils.h>
#include <Functions/GatherUtils/Sources.h>
@ -64,8 +65,8 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const override
{
const IColumn * haystack_column = block.getByPosition(arguments[0]).column.get();
const IColumn * needle_column = block.getByPosition(arguments[1]).column.get();
const IColumn * haystack_column = block[arguments[0]].column.get();
const IColumn * needle_column = block[arguments[1]].column.get();
auto col_res = ColumnVector<UInt8>::create();
typename ColumnVector<UInt8>::Container & vec_res = col_res->getData();
@ -83,7 +84,7 @@ public:
else
throw Exception("Illegal combination of columns as arguments of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
}
private:
@ -158,7 +159,7 @@ public:
#endif
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const override
void executeImpl(ColumnsWithTypeAndName & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const override
{
selector.selectAndExecute(block, arguments, result, input_rows_count);
}

View File

@ -1,3 +1,4 @@
#pragma once
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/IFunctionImpl.h>
@ -51,7 +52,7 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) const override
{
const ColumnPtr column = block.getByPosition(arguments[0]).column;
const ColumnPtr column = block[arguments[0]].column;
if (const ColumnString * col = checkAndGetColumn<ColumnString>(column.get()))
{
auto col_res = ColumnVector<ResultType>::create();
@ -60,7 +61,7 @@ public:
vec_res.resize(col->size());
Impl::vector(col->getChars(), col->getOffsets(), vec_res);
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
}
else if (const ColumnFixedString * col_fixed = checkAndGetColumn<ColumnFixedString>(column.get()))
{
@ -69,7 +70,7 @@ public:
ResultType res = 0;
Impl::vectorFixedToConstant(col_fixed->getChars(), col_fixed->getN(), res);
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(col_fixed->size(), toField(res));
block[result].column = block[result].type->createColumnConst(col_fixed->size(), toField(res));
}
else
{
@ -79,7 +80,7 @@ public:
vec_res.resize(col_fixed->size());
Impl::vectorFixedToVector(col_fixed->getChars(), col_fixed->getN(), vec_res);
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
}
}
else if (const ColumnArray * col_arr = checkAndGetColumn<ColumnArray>(column.get()))
@ -90,10 +91,10 @@ public:
vec_res.resize(col_arr->size());
Impl::array(col_arr->getOffsets(), vec_res);
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
}
else
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of argument of function " + getName(),
throw Exception("Illegal column " + block[arguments[0]].column->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
};

View File

@ -54,15 +54,15 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) const override
{
const ColumnPtr column_src = block.getByPosition(arguments[0]).column;
const ColumnPtr column_needle = block.getByPosition(arguments[1]).column;
const ColumnPtr column_replacement = block.getByPosition(arguments[2]).column;
const ColumnPtr column_src = block[arguments[0]].column;
const ColumnPtr column_needle = block[arguments[1]].column;
const ColumnPtr column_replacement = block[arguments[2]].column;
if (!isColumnConst(*column_needle) || !isColumnConst(*column_replacement))
throw Exception("2nd and 3rd arguments of function " + getName() + " must be constants.", ErrorCodes::ILLEGAL_COLUMN);
const IColumn * c1 = block.getByPosition(arguments[1]).column.get();
const IColumn * c2 = block.getByPosition(arguments[2]).column.get();
const IColumn * c1 = block[arguments[1]].column.get();
const IColumn * c2 = block[arguments[2]].column.get();
const ColumnConst * c1_const = typeid_cast<const ColumnConst *>(c1);
const ColumnConst * c2_const = typeid_cast<const ColumnConst *>(c2);
String needle = c1_const->getValue<String>();
@ -75,17 +75,17 @@ public:
{
auto col_res = ColumnString::create();
Impl::vector(col->getChars(), col->getOffsets(), needle, replacement, col_res->getChars(), col_res->getOffsets());
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
}
else if (const ColumnFixedString * col_fixed = checkAndGetColumn<ColumnFixedString>(column_src.get()))
{
auto col_res = ColumnString::create();
Impl::vectorFixed(col_fixed->getChars(), col_fixed->getN(), needle, replacement, col_res->getChars(), col_res->getOffsets());
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
}
else
throw Exception(
"Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of first argument of function " + getName(),
"Illegal column " + block[arguments[0]].column->getName() + " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
};

View File

@ -1,3 +1,4 @@
#pragma once
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>
@ -35,7 +36,7 @@ public:
return 1;
}
bool isInjective(const Block &) const override
bool isInjective(const ColumnsWithTypeAndName &) const override
{
return is_injective;
}
@ -53,22 +54,22 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) const override
{
const ColumnPtr column = block.getByPosition(arguments[0]).column;
const ColumnPtr column = block[arguments[0]].column;
if (const ColumnString * col = checkAndGetColumn<ColumnString>(column.get()))
{
auto col_res = ColumnString::create();
Impl::vector(col->getChars(), col->getOffsets(), col_res->getChars(), col_res->getOffsets());
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
}
else if (const ColumnFixedString * col_fixed = checkAndGetColumn<ColumnFixedString>(column.get()))
{
auto col_res = ColumnFixedString::create(col_fixed->getN());
Impl::vectorFixed(col_fixed->getChars(), col_fixed->getN(), col_res->getChars());
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
}
else
throw Exception(
"Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of argument of function " + getName(),
"Illegal column " + block[arguments[0]].column->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
}
};

View File

@ -117,7 +117,7 @@ public:
}
size_t getNumberOfArguments() const override { return 1; }
bool isInjective(const Block &) const override { return is_injective; }
bool isInjective(const ColumnsWithTypeAndName &) const override { return is_injective; }
bool useDefaultImplementationForConstants() const override { return true; }
@ -156,7 +156,7 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) const override
{
bool valid = castType(block.getByPosition(arguments[0]).type.get(), [&](const auto & type)
bool valid = castType(block[arguments[0]].type.get(), [&](const auto & type)
{
using DataType = std::decay_t<decltype(type)>;
@ -164,13 +164,13 @@ public:
{
if constexpr (allow_fixed_string)
{
if (auto col = checkAndGetColumn<ColumnFixedString>(block.getByPosition(arguments[0]).column.get()))
if (auto col = checkAndGetColumn<ColumnFixedString>(block[arguments[0]].column.get()))
{
auto col_res = ColumnFixedString::create(col->getN());
auto & vec_res = col_res->getChars();
vec_res.resize(col->size() * col->getN());
FixedStringUnaryOperationImpl<Op<UInt8>>::vector(col->getChars(), vec_res);
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
return true;
}
}
@ -180,13 +180,13 @@ public:
using T0 = typename DataType::FieldType;
if constexpr (allow_decimal)
{
if (auto col = checkAndGetColumn<ColumnDecimal<T0>>(block.getByPosition(arguments[0]).column.get()))
if (auto col = checkAndGetColumn<ColumnDecimal<T0>>(block[arguments[0]].column.get()))
{
auto col_res = ColumnDecimal<typename Op<T0>::ResultType>::create(0, type.getScale());
auto & vec_res = col_res->getData();
vec_res.resize(col->getData().size());
UnaryOperationImpl<T0, Op<T0>>::vector(col->getData(), vec_res);
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
return true;
}
}
@ -194,13 +194,13 @@ public:
else
{
using T0 = typename DataType::FieldType;
if (auto col = checkAndGetColumn<ColumnVector<T0>>(block.getByPosition(arguments[0]).column.get()))
if (auto col = checkAndGetColumn<ColumnVector<T0>>(block[arguments[0]].column.get()))
{
auto col_res = ColumnVector<typename Op<T0>::ResultType>::create();
auto & vec_res = col_res->getData();
vec_res.resize(col->getData().size());
UnaryOperationImpl<T0, Op<T0>>::vector(col->getData(), vec_res);
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
return true;
}
}

View File

@ -70,8 +70,8 @@ public:
using SourceColumnType = typename SourceDataType::ColumnType;
using ResultColumnType = typename ResultDataType::ColumnType;
const auto & src = block.getByPosition(arguments[0]);
auto & res = block.getByPosition(result);
const auto & src = block[arguments[0]];
auto & res = block[result];
const auto & col = *src.column;
const SourceColumnType * source_col_typed = checkAndGetColumn<SourceColumnType>(col);

Some files were not shown because too many files have changed in this diff Show More