Merge branch 'master' into storage-read-query-plan

This commit is contained in:
Nikolai Kochetov 2020-10-12 13:12:39 +03:00
commit 7e58f99f64
466 changed files with 3906 additions and 2873 deletions

View File

@ -1,3 +1,5 @@
#pragma once
#include <string>
#include <common/types.h>

View File

@ -1,6 +1,9 @@
/// Original is here https://github.com/cerevra/int
#pragma once
/// Original is here https://github.com/cerevra/int
/// Distributed under the Boost Software License, Version 1.0.
/// (See at http://www.boost.org/LICENSE_1_0.txt)
#include "throwError.h"
namespace wide

View File

@ -1,3 +1,5 @@
#pragma once
#include <optional>
#include <string>
#include <Poco/AutoPtr.h>

View File

@ -21,8 +21,8 @@ void Pool::Entry::incrementRefCount()
{
if (!data)
return;
++data->ref_count;
if (data->ref_count == 1)
/// First reference, initialize thread
if (data->ref_count.fetch_add(1) == 0)
mysql_thread_init();
}
@ -30,12 +30,10 @@ void Pool::Entry::decrementRefCount()
{
if (!data)
return;
if (data->ref_count > 0)
{
--data->ref_count;
if (data->ref_count == 0)
mysql_thread_end();
}
/// We were the last user of this thread, deinitialize it
if (data->ref_count.fetch_sub(1) == 1)
mysql_thread_end();
}

View File

@ -3,6 +3,7 @@
#include <list>
#include <memory>
#include <mutex>
#include <atomic>
#include <Poco/Exception.h>
#include <mysqlxx/Connection.h>
@ -35,7 +36,9 @@ protected:
struct Connection
{
mysqlxx::Connection conn;
int ref_count = 0;
/// Ref count modified in constructor/descructor of Entry
/// but also read in pool code.
std::atomic<int> ref_count = 0;
};
public:

View File

@ -1,9 +1,9 @@
# This strings autochanged from release_lib.sh:
SET(VERSION_REVISION 54441)
SET(VERSION_REVISION 54442)
SET(VERSION_MAJOR 20)
SET(VERSION_MINOR 10)
SET(VERSION_MINOR 11)
SET(VERSION_PATCH 1)
SET(VERSION_GITHASH 11a247d2f42010c1a17bf678c3e00a4bc89b23f8)
SET(VERSION_DESCRIBE v20.10.1.1-prestable)
SET(VERSION_STRING 20.10.1.1)
SET(VERSION_GITHASH 76a04fb4b4f6cd27ad999baf6dc9a25e88851c42)
SET(VERSION_DESCRIBE v20.11.1.1-prestable)
SET(VERSION_STRING 20.11.1.1)
# end of autochange

View File

@ -26,8 +26,8 @@ if (NOT USE_INTERNAL_ODBC_LIBRARY)
find_path (INCLUDE_ODBC sql.h)
if(LIBRARY_ODBC AND INCLUDE_ODBC)
add_library (unixodbc UNKNOWN IMPORTED)
set_target_properties (unixodbc PROPERTIES IMPORTED_LOCATION ${LIBRARY_ODBC})
add_library (unixodbc INTERFACE)
set_target_properties (unixodbc PROPERTIES INTERFACE_LINK_LIBRARIES ${LIBRARY_ODBC})
set_target_properties (unixodbc PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${INCLUDE_ODBC})
set_target_properties (unixodbc PROPERTIES INTERFACE_COMPILE_DEFINITIONS USE_ODBC=1)

View File

@ -31,6 +31,7 @@ if (COMPILER_CLANG)
add_warning(pedantic)
no_warning(vla-extension)
no_warning(zero-length-array)
no_warning(c11-extensions)
add_warning(comma)
add_warning(conditional-uninitialized)
@ -57,7 +58,10 @@ if (COMPILER_CLANG)
add_warning(unused-exception-parameter)
add_warning(unused-macros)
add_warning(unused-member-function)
add_warning(zero-as-null-pointer-constant)
# XXX: libstdc++ has some of these for 3way compare
if (USE_LIBCXX)
add_warning(zero-as-null-pointer-constant)
endif()
if (WEVERYTHING)
add_warning(everything)
@ -169,6 +173,11 @@ elseif (COMPILER_GCC)
add_cxx_compile_options(-Wunused)
# Warn if vector operation is not implemented via SIMD capabilities of the architecture
add_cxx_compile_options(-Wvector-operation-performance)
# XXX: libstdc++ has some of these for 3way compare
if (USE_LIBCXX)
# Warn when a literal 0 is used as null pointer constant.
add_cxx_compile_options(-Wzero-as-null-pointer-constant)
endif()
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER_EQUAL 10)
# XXX: gcc10 stuck with this option while compiling GatherUtils code

View File

@ -26,8 +26,8 @@ if (NOT USE_INTERNAL_HYPERSCAN_LIBRARY)
if (LIBRARY_HYPERSCAN AND INCLUDE_HYPERSCAN)
set (EXTERNAL_HYPERSCAN_LIBRARY_FOUND 1)
add_library (hyperscan UNKNOWN IMPORTED GLOBAL)
set_target_properties (hyperscan PROPERTIES IMPORTED_LOCATION ${LIBRARY_HYPERSCAN})
add_library (hyperscan INTERFACE)
set_target_properties (hyperscan PROPERTIES INTERFACE_LINK_LIBRARIES ${LIBRARY_HYPERSCAN})
set_target_properties (hyperscan PROPERTIES INTERFACE_INCLUDE_DIRECTORIES ${INCLUDE_HYPERSCAN})
set_property(TARGET hyperscan APPEND PROPERTY INTERFACE_COMPILE_DEFINITIONS USE_HYPERSCAN=1)
else ()

2
contrib/jemalloc vendored

@ -1 +1 @@
Subproject commit 026764f19995c53583ab25a3b9c06a2fd74e4689
Subproject commit 93e27e435cac846028da20cd9b0841fbc9110bd2

View File

@ -9,10 +9,6 @@ else()
endif ()
if (NOT ENABLE_JEMALLOC)
if(USE_INTERNAL_JEMALLOC_LIBRARY)
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't use internal jemalloc with ENABLE_JEMALLOC=OFF")
endif()
add_library(jemalloc INTERFACE)
target_compile_definitions(jemalloc INTERFACE USE_JEMALLOC=0)
@ -24,162 +20,116 @@ if (NOT OS_LINUX)
message (WARNING "jemalloc support on non-linux is EXPERIMENTAL")
endif()
option (USE_INTERNAL_JEMALLOC_LIBRARY "Use internal jemalloc library" ${NOT_UNBUNDLED})
if (OS_LINUX)
# ThreadPool select job randomly, and there can be some threads that had been
# performed some memory heavy task before and will be inactive for some time,
# but until it will became active again, the memory will not be freed since by
# default each thread has it's own arena, but there should be not more then
# 4*CPU arenas (see opt.nareans description).
#
# By enabling percpu_arena number of arenas limited to number of CPUs and hence
# this problem should go away.
#
# muzzy_decay_ms -- use MADV_FREE when available on newer Linuxes, to
# avoid spurious latencies and additional work associated with
# MADV_DONTNEED. See
# https://github.com/ClickHouse/ClickHouse/issues/11121 for motivation.
set (JEMALLOC_CONFIG_MALLOC_CONF "percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:10000")
else()
set (JEMALLOC_CONFIG_MALLOC_CONF "oversize_threshold:0,muzzy_decay_ms:10000")
endif()
# CACHE variable is empty, to allow changing defaults without necessity
# to purge cache
set (JEMALLOC_CONFIG_MALLOC_CONF_OVERRIDE "" CACHE STRING "Change default configuration string of JEMalloc" )
if (JEMALLOC_CONFIG_MALLOC_CONF_OVERRIDE)
set (JEMALLOC_CONFIG_MALLOC_CONF "${JEMALLOC_CONFIG_MALLOC_CONF_OVERRIDE}")
endif()
message (STATUS "jemalloc malloc_conf: ${JEMALLOC_CONFIG_MALLOC_CONF}")
if (NOT USE_INTERNAL_JEMALLOC_LIBRARY)
find_library(LIBRARY_JEMALLOC jemalloc)
find_path(INCLUDE_JEMALLOC jemalloc/jemalloc.h)
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/jemalloc")
if (LIBRARY_JEMALLOC AND INCLUDE_JEMALLOC)
set(EXTERNAL_JEMALLOC_LIBRARY_FOUND 1)
set(THREADS_PREFER_PTHREAD_FLAG ON)
find_package(Threads)
set (CMAKE_REQUIRED_LIBRARIES ${LIBRARY_JEMALLOC} Threads::Threads "dl")
set (CMAKE_REQUIRED_INCLUDES ${INCLUDE_JEMALLOC})
check_cxx_source_compiles (
"
#include <jemalloc/jemalloc.h>
int main() {
free(mallocx(1, 0));
}
"
EXTERNAL_JEMALLOC_LIBRARY_WORKS
)
if (EXTERNAL_JEMALLOC_LIBRARY_WORKS)
add_library (jemalloc STATIC IMPORTED)
set_property (TARGET jemalloc PROPERTY IMPORTED_LOCATION ${LIBRARY_JEMALLOC})
set_property (TARGET jemalloc PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INCLUDE_JEMALLOC})
set_property (TARGET jemalloc PROPERTY INTERFACE_LINK_LIBRARIES Threads::Threads dl)
else()
message (${RECONFIGURE_MESSAGE_LEVEL} "External jemalloc is unusable: ${LIBRARY_JEMALLOC} ${INCLUDE_JEMALLOC}")
endif ()
else()
set(EXTERNAL_JEMALLOC_LIBRARY_FOUND 0)
message (${RECONFIGURE_MESSAGE_LEVEL} "Can't find system jemalloc")
endif()
set (SRCS
${LIBRARY_DIR}/src/arena.c
${LIBRARY_DIR}/src/background_thread.c
${LIBRARY_DIR}/src/base.c
${LIBRARY_DIR}/src/bin.c
${LIBRARY_DIR}/src/bitmap.c
${LIBRARY_DIR}/src/ckh.c
${LIBRARY_DIR}/src/ctl.c
${LIBRARY_DIR}/src/div.c
${LIBRARY_DIR}/src/extent.c
${LIBRARY_DIR}/src/extent_dss.c
${LIBRARY_DIR}/src/extent_mmap.c
${LIBRARY_DIR}/src/hash.c
${LIBRARY_DIR}/src/hook.c
${LIBRARY_DIR}/src/jemalloc.c
${LIBRARY_DIR}/src/large.c
${LIBRARY_DIR}/src/log.c
${LIBRARY_DIR}/src/malloc_io.c
${LIBRARY_DIR}/src/mutex.c
${LIBRARY_DIR}/src/mutex_pool.c
${LIBRARY_DIR}/src/nstime.c
${LIBRARY_DIR}/src/pages.c
${LIBRARY_DIR}/src/prng.c
${LIBRARY_DIR}/src/prof.c
${LIBRARY_DIR}/src/rtree.c
${LIBRARY_DIR}/src/sc.c
${LIBRARY_DIR}/src/stats.c
${LIBRARY_DIR}/src/sz.c
${LIBRARY_DIR}/src/tcache.c
${LIBRARY_DIR}/src/test_hooks.c
${LIBRARY_DIR}/src/ticker.c
${LIBRARY_DIR}/src/tsd.c
${LIBRARY_DIR}/src/witness.c
${LIBRARY_DIR}/src/safety_check.c
)
if (OS_DARWIN)
list(APPEND SRCS ${LIBRARY_DIR}/src/zone.c)
endif ()
if (NOT EXTERNAL_JEMALLOC_LIBRARY_FOUND OR NOT EXTERNAL_JEMALLOC_LIBRARY_WORKS)
set(USE_INTERNAL_JEMALLOC_LIBRARY 1)
add_library(jemalloc ${SRCS})
target_include_directories(jemalloc PRIVATE ${LIBRARY_DIR}/include)
target_include_directories(jemalloc SYSTEM PUBLIC include)
if (OS_LINUX)
# ThreadPool select job randomly, and there can be some threads that had been
# performed some memory heavy task before and will be inactive for some time,
# but until it will became active again, the memory will not be freed since by
# default each thread has it's own arena, but there should be not more then
# 4*CPU arenas (see opt.nareans description).
#
# By enabling percpu_arena number of arenas limited to number of CPUs and hence
# this problem should go away.
#
# muzzy_decay_ms -- use MADV_FREE when available on newer Linuxes, to
# avoid spurious latencies and additional work associated with
# MADV_DONTNEED. See
# https://github.com/ClickHouse/ClickHouse/issues/11121 for motivation.
set (JEMALLOC_CONFIG_MALLOC_CONF "percpu_arena:percpu,oversize_threshold:0,muzzy_decay_ms:10000")
else()
set (JEMALLOC_CONFIG_MALLOC_CONF "oversize_threshold:0,muzzy_decay_ms:10000")
endif()
# CACHE variable is empty, to allow changing defaults without necessity
# to purge cache
set (JEMALLOC_CONFIG_MALLOC_CONF_OVERRIDE "" CACHE STRING "Change default configuration string of JEMalloc" )
if (JEMALLOC_CONFIG_MALLOC_CONF_OVERRIDE)
set (JEMALLOC_CONFIG_MALLOC_CONF "${JEMALLOC_CONFIG_MALLOC_CONF_OVERRIDE}")
endif()
message (STATUS "jemalloc malloc_conf: ${JEMALLOC_CONFIG_MALLOC_CONF}")
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/jemalloc")
set (SRCS
${LIBRARY_DIR}/src/arena.c
${LIBRARY_DIR}/src/background_thread.c
${LIBRARY_DIR}/src/base.c
${LIBRARY_DIR}/src/bin.c
${LIBRARY_DIR}/src/bitmap.c
${LIBRARY_DIR}/src/ckh.c
${LIBRARY_DIR}/src/ctl.c
${LIBRARY_DIR}/src/div.c
${LIBRARY_DIR}/src/extent.c
${LIBRARY_DIR}/src/extent_dss.c
${LIBRARY_DIR}/src/extent_mmap.c
${LIBRARY_DIR}/src/hash.c
${LIBRARY_DIR}/src/hook.c
${LIBRARY_DIR}/src/jemalloc.c
${LIBRARY_DIR}/src/large.c
${LIBRARY_DIR}/src/log.c
${LIBRARY_DIR}/src/malloc_io.c
${LIBRARY_DIR}/src/mutex.c
${LIBRARY_DIR}/src/mutex_pool.c
${LIBRARY_DIR}/src/nstime.c
${LIBRARY_DIR}/src/pages.c
${LIBRARY_DIR}/src/prng.c
${LIBRARY_DIR}/src/prof.c
${LIBRARY_DIR}/src/rtree.c
${LIBRARY_DIR}/src/sc.c
${LIBRARY_DIR}/src/stats.c
${LIBRARY_DIR}/src/sz.c
${LIBRARY_DIR}/src/tcache.c
${LIBRARY_DIR}/src/test_hooks.c
${LIBRARY_DIR}/src/ticker.c
${LIBRARY_DIR}/src/tsd.c
${LIBRARY_DIR}/src/witness.c
${LIBRARY_DIR}/src/safety_check.c
)
if (OS_DARWIN)
list(APPEND SRCS ${LIBRARY_DIR}/src/zone.c)
endif ()
add_library(jemalloc ${SRCS})
target_include_directories(jemalloc PRIVATE ${LIBRARY_DIR}/include)
target_include_directories(jemalloc SYSTEM PUBLIC include)
set (JEMALLOC_INCLUDE_PREFIX)
# OS_
if (OS_LINUX)
set (JEMALLOC_INCLUDE_PREFIX "include_linux")
elseif (OS_FREEBSD)
set (JEMALLOC_INCLUDE_PREFIX "include_freebsd")
elseif (OS_DARWIN)
set (JEMALLOC_INCLUDE_PREFIX "include_darwin")
else ()
message (FATAL_ERROR "internal jemalloc: This OS is not supported")
endif ()
# ARCH_
if (ARCH_AMD64)
set(JEMALLOC_INCLUDE_PREFIX "${JEMALLOC_INCLUDE_PREFIX}_x86_64")
elseif (ARCH_ARM)
set(JEMALLOC_INCLUDE_PREFIX "${JEMALLOC_INCLUDE_PREFIX}_aarch64")
else ()
message (FATAL_ERROR "internal jemalloc: This arch is not supported")
endif ()
configure_file(${JEMALLOC_INCLUDE_PREFIX}/jemalloc/internal/jemalloc_internal_defs.h.in
${JEMALLOC_INCLUDE_PREFIX}/jemalloc/internal/jemalloc_internal_defs.h)
target_include_directories(jemalloc SYSTEM PRIVATE
${CMAKE_CURRENT_BINARY_DIR}/${JEMALLOC_INCLUDE_PREFIX}/jemalloc/internal)
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_NO_PRIVATE_NAMESPACE)
if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG")
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_DEBUG=1 -DJEMALLOC_PROF=1)
if (USE_UNWIND)
target_compile_definitions (jemalloc PRIVATE -DJEMALLOC_PROF_LIBUNWIND=1)
target_link_libraries (jemalloc PRIVATE unwind)
endif ()
endif ()
target_compile_options(jemalloc PRIVATE -Wno-redundant-decls)
# for RTLD_NEXT
target_compile_options(jemalloc PRIVATE -D_GNU_SOURCE)
set (USE_INTERNAL_JEMALLOC_LIBRARY 1)
set (JEMALLOC_INCLUDE_PREFIX)
# OS_
if (OS_LINUX)
set (JEMALLOC_INCLUDE_PREFIX "include_linux")
elseif (OS_FREEBSD)
set (JEMALLOC_INCLUDE_PREFIX "include_freebsd")
elseif (OS_DARWIN)
set (JEMALLOC_INCLUDE_PREFIX "include_darwin")
else ()
message (FATAL_ERROR "internal jemalloc: This OS is not supported")
endif ()
# ARCH_
if (ARCH_AMD64)
set(JEMALLOC_INCLUDE_PREFIX "${JEMALLOC_INCLUDE_PREFIX}_x86_64")
elseif (ARCH_ARM)
set(JEMALLOC_INCLUDE_PREFIX "${JEMALLOC_INCLUDE_PREFIX}_aarch64")
else ()
message (FATAL_ERROR "internal jemalloc: This arch is not supported")
endif ()
configure_file(${JEMALLOC_INCLUDE_PREFIX}/jemalloc/internal/jemalloc_internal_defs.h.in
${JEMALLOC_INCLUDE_PREFIX}/jemalloc/internal/jemalloc_internal_defs.h)
target_include_directories(jemalloc SYSTEM PRIVATE
${CMAKE_CURRENT_BINARY_DIR}/${JEMALLOC_INCLUDE_PREFIX}/jemalloc/internal)
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_NO_PRIVATE_NAMESPACE)
if (CMAKE_BUILD_TYPE_UC STREQUAL "DEBUG")
target_compile_definitions(jemalloc PRIVATE -DJEMALLOC_DEBUG=1 -DJEMALLOC_PROF=1)
if (USE_UNWIND)
target_compile_definitions (jemalloc PRIVATE -DJEMALLOC_PROF_LIBUNWIND=1)
target_link_libraries (jemalloc PRIVATE unwind)
endif ()
endif ()
target_compile_options(jemalloc PRIVATE -Wno-redundant-decls)
# for RTLD_NEXT
target_compile_options(jemalloc PRIVATE -D_GNU_SOURCE)
set_property(TARGET jemalloc APPEND PROPERTY INTERFACE_COMPILE_DEFINITIONS USE_JEMALLOC=1)
if (MAKE_STATIC_LIBRARIES)

2
contrib/libhdfs3 vendored

@ -1 +1 @@
Subproject commit 1b666578c85094306b061352078022f6350bfab8
Subproject commit 30552ac527f2c14070d834e171493b2e7f662375

View File

@ -6,8 +6,8 @@ if (NOT USE_INTERNAL_LZ4_LIBRARY)
if (LIBRARY_LZ4 AND INCLUDE_LZ4)
set(EXTERNAL_LZ4_LIBRARY_FOUND 1)
add_library (lz4 UNKNOWN IMPORTED)
set_property (TARGET lz4 PROPERTY IMPORTED_LOCATION ${LIBRARY_LZ4})
add_library (lz4 INTERFACE)
set_property (TARGET lz4 PROPERTY INTERFACE_LINK_LIBRARIES ${LIBRARY_LZ4})
set_property (TARGET lz4 PROPERTY INTERFACE_INCLUDE_DIRECTORIES ${INCLUDE_LZ4})
set_property (TARGET lz4 APPEND PROPERTY INTERFACE_COMPILE_DEFINITIONS USE_XXHASH=0)
else()

@ -1 +1 @@
Subproject commit 3f512fedf0ba0f769a1b4852b4bac542d92c5b20
Subproject commit f5638e954a79f50bac7c7a5deaa5a241e0ce8b5f

2
contrib/openssl vendored

@ -1 +1 @@
Subproject commit 07e9623064508d15dd61367f960ebe7fc9aecd77
Subproject commit 237260dd6a4bca5cb5a321d366a8a9c807957455

4
debian/changelog vendored
View File

@ -1,5 +1,5 @@
clickhouse (20.10.1.1) unstable; urgency=low
clickhouse (20.11.1.1) unstable; urgency=low
* Modified source code
-- clickhouse-release <clickhouse-release@yandex-team.ru> Tue, 08 Sep 2020 17:04:39 +0300
-- clickhouse-release <clickhouse-release@yandex-team.ru> Sat, 10 Oct 2020 18:39:55 +0300

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb https://repo.clickhouse.tech/deb/stable/ main/"
ARG version=20.10.1.*
ARG version=20.11.1.*
RUN apt-get update \
&& apt-get install --yes --no-install-recommends \

View File

@ -1,7 +1,7 @@
FROM ubuntu:20.04
ARG repository="deb https://repo.clickhouse.tech/deb/stable/ main/"
ARG version=20.10.1.*
ARG version=20.11.1.*
ARG gosu_ver=1.10
RUN apt-get update \

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb https://repo.clickhouse.tech/deb/stable/ main/"
ARG version=20.10.1.*
ARG version=20.11.1.*
RUN apt-get update && \
apt-get install -y apt-transport-https dirmngr && \

View File

@ -52,9 +52,9 @@ Optional parameters:
- `rabbitmq_schema` Parameter that must be used if the format requires a schema definition. For example, [Capn Proto](https://capnproto.org/) requires the path to the schema file and the name of the root `schema.capnp:Message` object.
- `rabbitmq_num_consumers` The number of consumers per table. Default: `1`. Specify more consumers if the throughput of one consumer is insufficient.
- `rabbitmq_num_queues` The number of queues per consumer. Default: `1`. Specify more queues if the capacity of one queue per consumer is insufficient.
- `rabbitmq_queue_base` - Specify a base name for queues that will be declared. By default, queues are declared unique to tables based on db and table names.
- `rabbitmq_queue_base` - Specify a hint for queue names. Use cases of this setting are described below.
- `rabbitmq_deadletter_exchange` - Specify name for a [dead letter exchange](https://www.rabbitmq.com/dlx.html). You can create another table with this exchange name and collect messages in cases when they are republished to dead letter exchange. By default dead letter exchange is not specified.
- `persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
- `rabbitmq_persistent` - If set to 1 (true), in insert query delivery mode will be set to 2 (marks messages as 'persistent'). Default: `0`.
- `rabbitmq_skip_broken_messages` RabbitMQ message parser tolerance to schema-incompatible messages per block. Default: `0`. If `rabbitmq_skip_broken_messages = N` then the engine skips *N* RabbitMQ messages that cannot be parsed (a message equals a row of data).
- `rabbitmq_max_block_size`
- `rabbitmq_flush_interval_ms`
@ -102,12 +102,12 @@ Exchange type options:
- `fanout` - Routing to all tables (where exchange name is the same) regardless of the keys.
- `topic` - Routing is based on patterns with dot-separated keys. Examples: `*.logs`, `records.*.*.2020`, `*.2018,*.2019,*.2020`.
- `headers` - Routing is based on `key=value` matches with a setting `x-match=all` or `x-match=any`. Example table key list: `x-match=all,format=logs,type=report,year=2020`.
- `consistent-hash` - Data is evenly distributed between all bound tables (where the exchange name is the same). Note that this exchange type must be enabled with RabbitMQ plugin: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`.
- `consistent_hash` - Data is evenly distributed between all bound tables (where the exchange name is the same). Note that this exchange type must be enabled with RabbitMQ plugin: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`.
Setting `rabbitmq_queue_base` may be used for the following cases:
- to let different tables share queues, so that multiple consumers could be registered for the same queues, which makes a better performance. If using `rabbitmq_num_consumers` and/or `rabbitmq_num_queues` settings, the exact match of queues is achieved in case these parameters are the same.
- to be able to restore reading from certain durable queues when not all messages were successfully consumed. To be able to resume consumption from one specific queue - set its name in `rabbitmq_queue_base` setting and do not specify `rabbitmq_num_consumers` and `rabbitmq_num_queues` (defaults to 1). To be able to resume consumption from all queues, which were declared for a specific table - just specify the same settings: `rabbitmq_queue_base`, `rabbitmq_num_consumers`, `rabbitmq_num_queues`. By default, queue names will be unique to tables. Note: it makes sence only if messages are sent with delivery mode 2 - marked 'persistent', durable.
- to reuse queues as they are declared durable and not auto-deleted.
- to be able to restore reading from certain durable queues when not all messages were successfully consumed. To resume consumption from one specific queue - set its name in `rabbitmq_queue_base` setting and do not specify `rabbitmq_num_consumers` and `rabbitmq_num_queues` (defaults to 1). To resume consumption from all queues, which were declared for a specific table - just specify the same settings: `rabbitmq_queue_base`, `rabbitmq_num_consumers`, `rabbitmq_num_queues`. By default, queue names will be unique to tables.
- to reuse queues as they are declared durable and not auto-deleted. (Can be deleted via any of RabbitMQ CLI tools.)
To improve performance, received messages are grouped into blocks the size of [max\_insert\_block\_size](../../../operations/server-configuration-parameters/settings.md#settings-max_insert_block_size). If the block wasnt formed within [stream\_flush\_interval\_ms](../../../operations/server-configuration-parameters/settings.md) milliseconds, the data will be flushed to the table regardless of the completeness of the block.

View File

@ -20,6 +20,7 @@ toc_title: Client Libraries
- [simpod/clickhouse-client](https://packagist.org/packages/simpod/clickhouse-client)
- [seva-code/php-click-house-client](https://packagist.org/packages/seva-code/php-click-house-client)
- [SeasClick C++ client](https://github.com/SeasX/SeasClick)
- [one-ck](https://github.com/lizhichao/one-ck)
- Go
- [clickhouse](https://github.com/kshvakov/clickhouse/)
- [go-clickhouse](https://github.com/roistat/go-clickhouse)

View File

@ -70,6 +70,35 @@ Works with tables in the MergeTree family.
If `force_primary_key=1`, ClickHouse checks to see if the query has a primary key condition that can be used for restricting data ranges. If there is no suitable condition, it throws an exception. However, it does not check whether the condition reduces the amount of data to read. For more information about data ranges in MergeTree tables, see [MergeTree](../../engines/table-engines/mergetree-family/mergetree.md).
## force\_data\_skipping\_indices {#settings-force_data_skipping_indices}
Disables query execution if passed data skipping indices wasn't used.
Consider the following example:
```sql
CREATE TABLE data
(
key Int,
d1 Int,
d1_null Nullable(Int),
INDEX d1_idx d1 TYPE minmax GRANULARITY 1,
INDEX d1_null_idx assumeNotNull(d1_null) TYPE minmax GRANULARITY 1
)
Engine=MergeTree()
ORDER BY key;
SELECT * FROM data_01515;
SELECT * FROM data_01515 SETTINGS force_data_skipping_indices=''; -- query will produce CANNOT_PARSE_TEXT error.
SELECT * FROM data_01515 SETTINGS force_data_skipping_indices='d1_idx'; -- query will produce INDEX_NOT_USED error.
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices='d1_idx'; -- Ok.
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices='`d1_idx`'; -- Ok (example of full featured parser).
SELECT * FROM data_01515 WHERE d1 = 0 SETTINGS force_data_skipping_indices='`d1_idx`, d1_null_idx'; -- query will produce INDEX_NOT_USED error, since d1_null_idx is not used.
SELECT * FROM data_01515 WHERE d1 = 0 AND assumeNotNull(d1_null) = 0 SETTINGS force_data_skipping_indices='`d1_idx`, d1_null_idx'; -- Ok.
```
Works with tables in the MergeTree family.
## format\_schema {#format-schema}
This parameter is useful when you are using formats that require a schema definition, such as [Capn Proto](https://capnproto.org/) or [Protobuf](https://developers.google.com/protocol-buffers/). The value depends on the format.
@ -1565,7 +1594,7 @@ See also:
## allow\_introspection\_functions {#settings-allow_introspection_functions}
Enables of disables [introspections functions](../../sql-reference/functions/introspection.md) for query profiling.
Enables or disables [introspections functions](../../sql-reference/functions/introspection.md) for query profiling.
Possible values:
@ -2027,3 +2056,14 @@ Result:
```
[Original article](https://clickhouse.tech/docs/en/operations/settings/settings/) <!-- hide -->
## allow_experimental_bigint_types {#allow_experimental_bigint_types}
Enables or disables integer values exceeding the range that is supported by the int data type.
Possible values:
- 1 — The bigint data type is enabled.
- 0 — The bigint data type is disabled.
Default value: `0`.

View File

@ -3,7 +3,7 @@ toc_priority: 42
toc_title: Decimal
---
# Decimal(P, S), Decimal32(S), Decimal64(S), Decimal128(S), Decimal256(S) {#decimalp-s-decimal32s-decimal64s-decimal128s}
# Decimal(P, S), Decimal32(S), Decimal64(S), Decimal128(S), Decimal256(S) {#decimal}
Signed fixed-point numbers that keep precision during add, subtract and multiply operations. For division least significant digits are discarded (not rounded).
@ -107,4 +107,8 @@ SELECT toDecimal32(1, 8) < 100
DB::Exception: Can't compare.
```
**See also**
- [isDecimalOverflow](../../sql-reference/functions/other-functions.md#is-decimal-overflow)
- [countDigits](../../sql-reference/functions/other-functions.md#count-digits)
[Original article](https://clickhouse.tech/docs/en/data_types/decimal/) <!--hide-->

View File

@ -3,7 +3,7 @@ toc_priority: 40
toc_title: UInt8, UInt16, UInt32, UInt64, UInt256, Int8, Int16, Int32, Int64, Int128, Int256
---
# UInt8, UInt16, UInt32, UInt64, UInt256, Int8, Int16, Int32, Int64, Int128, Int256 {#uint8-uint16-uint32-uint64-int8-int16-int32-int64}
# UInt8, UInt16, UInt32, UInt64, UInt256, Int8, Int16, Int32, Int64, Int128, Int256 {#uint8-uint16-uint32-uint64-uint256-int8-int16-int32-int64-int128-int256}
Fixed-length integers, with or without a sign.

View File

@ -1495,13 +1495,13 @@ Result:
Returns the current value of a [custom setting](../../operations/settings/index.md#custom_settings).
**Syntax**
**Syntax**
```sql
getSetting('custom_setting');
getSetting('custom_setting');
```
**Parameter**
**Parameter**
- `custom_setting` — The setting name. [String](../../sql-reference/data-types/string.md).
@ -1513,7 +1513,7 @@ getSetting('custom_setting');
```sql
SET custom_a = 123;
SELECT getSetting('custom_a');
SELECT getSetting('custom_a');
```
**Result**
@ -1522,9 +1522,84 @@ SELECT getSetting('custom_a');
123
```
**See Also**
**See Also**
- [Custom Settings](../../operations/settings/index.md#custom_settings)
## isDecimalOverflow {#is-decimal-overflow}
Checks whether the [Decimal](../../sql-reference/data-types/decimal.md) value is out of its (or specified) precision.
**Syntax**
``` sql
isDecimalOverflow(d, [p])
```
**Parameters**
- `d` — value. [Decimal](../../sql-reference/data-types/decimal.md).
- `p` — precision. Optional. If omitted, the initial presicion of the first argument is used. Using of this paratemer could be helpful for data extraction to another DBMS or file. [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
**Returned values**
- `1` — Decimal value has more digits then it's precision allow,
- `0` — Decimal value satisfies the specified precision.
**Example**
Query:
``` sql
SELECT isDecimalOverflow(toDecimal32(1000000000, 0), 9),
isDecimalOverflow(toDecimal32(1000000000, 0)),
isDecimalOverflow(toDecimal32(-1000000000, 0), 9),
isDecimalOverflow(toDecimal32(-1000000000, 0));
```
Result:
``` text
1 1 1 1
```
## countDigits {#count-digits}
Returns number of decimal digits you need to represent the value.
**Syntax**
``` sql
countDigits(x)
```
**Parameters**
- `x` — [Int](../../sql-reference/data-types/int-uint.md) or [Decimal](../../sql-reference/data-types/decimal.md) value.
**Returned value**
Number of digits.
Type: [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
!!! note "Note"
For `Decimal` values takes into account their scales: calculates result over underlying integer type which is `(value * scale)`. For example: `countDigits(42) = 2`, `countDigits(42.000) = 5`, `countDigits(0.04200) = 4`. I.e. you may check decimal overflow for `Decimal64` with `countDecimal(x) > 18`. It's a slow variant of [isDecimalOverflow](#is-decimal-overflow).
**Example**
Query:
``` sql
SELECT countDigits(toDecimal32(1, 9)), countDigits(toDecimal32(-1, 9)),
countDigits(toDecimal64(1, 18)), countDigits(toDecimal64(-1, 18)),
countDigits(toDecimal128(1, 38)), countDigits(toDecimal128(-1, 38));
```
Result:
``` text
10 10 19 19 39 39
```
[Original article](https://clickhouse.tech/docs/en/query_language/functions/other_functions/) <!--hide-->

View File

@ -11,7 +11,7 @@ When you convert a value from one to another data type, you should remember that
ClickHouse has the [same behavior as C++ programs](https://en.cppreference.com/w/cpp/language/implicit_conversion).
## toInt(8\|16\|32\|64\|128\|256) {#toint8163264}
## toInt(8\|16\|32\|64\|128\|256) {#toint8163264128256}
Converts an input value to the [Int](../../sql-reference/data-types/int-uint.md) data type. This function family includes:
@ -62,7 +62,7 @@ select toInt64OrZero('123123'), toInt8OrZero('123qwe123')
└─────────────────────────┴───────────────────────────┘
```
## toInt(8\|16\|32\|64\|128\|256)OrNull {#toint8163264ornull}
## toInt(8\|16\|32\|64\|128\|256)OrNull {#toint8163264128256ornull}
It takes an argument of type String and tries to parse it into Int (8 \| 16 \| 32 \| 64 \| 128 \| 256). If failed, returns NULL.
@ -78,7 +78,7 @@ select toInt64OrNull('123123'), toInt8OrNull('123qwe123')
└─────────────────────────┴───────────────────────────┘
```
## toUInt(8\|16\|32\|64\|256) {#touint8163264}
## toUInt(8\|16\|32\|64\|256) {#touint8163264256}
Converts an input value to the [UInt](../../sql-reference/data-types/int-uint.md) data type. This function family includes:
@ -112,9 +112,9 @@ SELECT toUInt64(nan), toUInt32(-32), toUInt16('16'), toUInt8(8.8)
└─────────────────────┴───────────────┴────────────────┴──────────────┘
```
## toUInt(8\|16\|32\|64\|256)OrZero {#touint8163264orzero}
## toUInt(8\|16\|32\|64\|256)OrZero {#touint8163264256orzero}
## toUInt(8\|16\|32\|64\|256)OrNull {#touint8163264ornull}
## toUInt(8\|16\|32\|64\|256)OrNull {#touint8163264256ornull}
## toFloat(32\|64) {#tofloat3264}
@ -134,7 +134,7 @@ SELECT toUInt64(nan), toUInt32(-32), toUInt16('16'), toUInt8(8.8)
## toDateTimeOrNull {#todatetimeornull}
## toDecimal(32\|64\|128\|256) {#todecimal3264128}
## toDecimal(32\|64\|128\|256) {#todecimal3264128256}
Converts `value` to the [Decimal](../../sql-reference/data-types/decimal.md) data type with precision of `S`. The `value` can be a number or a string. The `S` (scale) parameter specifies the number of decimal places.
@ -143,7 +143,7 @@ Converts `value` to the [Decimal](../../sql-reference/data-types/decimal.md) dat
- `toDecimal128(value, S)`
- `toDecimal256(value, S)`
## toDecimal(32\|64\|128\|256)OrNull {#todecimal3264128ornull}
## toDecimal(32\|64\|128\|256)OrNull {#todecimal3264128256ornull}
Converts an input string to a [Nullable(Decimal(P,S))](../../sql-reference/data-types/decimal.md) data type value. This family of functions include:
@ -188,7 +188,7 @@ SELECT toDecimal32OrNull(toString(-1.111), 2) AS val, toTypeName(val)
└──────┴────────────────────────────────────────────────────┘
```
## toDecimal(32\|64\|128\|256)OrZero {#todecimal3264128orzero}
## toDecimal(32\|64\|128\|256)OrZero {#todecimal3264128256orzero}
Converts an input value to the [Decimal(P,S)](../../sql-reference/data-types/decimal.md) data type. This family of functions include:

View File

@ -121,7 +121,9 @@ Defines storage time for values. Can be specified only for MergeTree-family tabl
## Column Compression Codecs {#codecs}
By default, ClickHouse applies the `lz4` compression method. For `MergeTree`-engine family you can change the default compression method in the [compression](../../../operations/server-configuration-parameters/settings.md#server-settings-compression) section of a server configuration. You can also define the compression method for each individual column in the `CREATE TABLE` query.
By default, ClickHouse applies the `lz4` compression method. For `MergeTree`-engine family you can change the default compression method in the [compression](../../../operations/server-configuration-parameters/settings.md#server-settings-compression) section of a server configuration.
You can also define the compression method for each individual column in the `CREATE TABLE` query.
``` sql
CREATE TABLE codec_example
@ -136,7 +138,18 @@ ENGINE = <Engine>
...
```
If a codec is specified, the default codec doesnt apply. Codecs can be combined in a pipeline, for example, `CODEC(Delta, ZSTD)`. To select the best codec combination for you project, pass benchmarks similar to described in the Altinity [New Encodings to Improve ClickHouse Efficiency](https://www.altinity.com/blog/2019/7/new-encodings-to-improve-clickhouse) article. One thing to note is that codec can't be applied for ALIAS column type.
The `Default` codec can be specified to reference default compression which may dependend on different settings (and properties of data) in runtime.
Example: `value UInt64 CODEC(Default)` - the same as lack of codec specification.
Also you can remove current CODEC from the column and use default compression from config.xml:
``` sql
ALTER TABLE codec_example MODIFY COLUMN float_value CODEC(Default);
```
Codecs can be combined in a pipeline, for example, `CODEC(Delta, Default)`.
To select the best codec combination for you project, pass benchmarks similar to described in the Altinity [New Encodings to Improve ClickHouse Efficiency](https://www.altinity.com/blog/2019/7/new-encodings-to-improve-clickhouse) article. One thing to note is that codec can't be applied for ALIAS column type.
!!! warning "Warning"
You cant decompress ClickHouse database files with external utilities like `lz4`. Instead, use the special [clickhouse-compressor](https://github.com/ClickHouse/ClickHouse/tree/master/programs/compressor) utility.

View File

@ -1,8 +1,3 @@
---
toc_priority: 6
toc_title: RabbitMQ
---
# RabbitMQ {#rabbitmq-engine}
Движок работает с [RabbitMQ](https://www.rabbitmq.com).
@ -27,9 +22,14 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
[rabbitmq_exchange_type = 'exchange_type',]
[rabbitmq_routing_key_list = 'key1,key2,...',]
[rabbitmq_row_delimiter = 'delimiter_symbol',]
[rabbitmq_schema = '',]
[rabbitmq_num_consumers = N,]
[rabbitmq_num_queues = N,]
[rabbitmq_transactional_channel = 0]
[rabbitmq_queue_base = 'queue',]
[rabbitmq_persistent = 0,]
[rabbitmq_skip_broken_messages = N,]
[rabbitmq_max_block_size = N,]
[rabbitmq_flush_interval_ms = N]
```
Обязательные параметры:
@ -40,12 +40,17 @@ CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
Дополнительные параметры:
- `rabbitmq_exchange_type` тип точки обмена в RabbitMQ: `direct`, `fanout`, `topic`, `headers`, `consistent-hash`. По умолчанию: `fanout`.
- `rabbitmq_exchange_type` тип точки обмена в RabbitMQ: `direct`, `fanout`, `topic`, `headers`, `consistent_hash`. По умолчанию: `fanout`.
- `rabbitmq_routing_key_list` список ключей маршрутизации, через запятую.
- `rabbitmq_row_delimiter` символ-разделитель, который завершает сообщение.
- `rabbitmq_schema` опциональный параметр, необходимый, если используется формат, требующий определения схемы. Например, [Capn Proto](https://capnproto.org/) требует путь к файлу со схемой и название корневого объекта `schema.capnp:Message`.
- `rabbitmq_num_consumers` количество потребителей на таблицу. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одного потребителя недостаточна.
- `rabbitmq_num_queues` количество очередей на потребителя. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одной очереди на потребителя недостаточна. Одна очередь поддерживает до 50 тысяч сообщений одновременно.
- `rabbitmq_transactional_channel` обернутые запросы `INSERT` в транзакциях. По умолчанию: `0`.
- `rabbitmq_num_queues` количество очередей на потребителя. По умолчанию: `1`. Укажите больше потребителей, если пропускная способность одной очереди на потребителя недостаточна.
- `rabbitmq_queue_base` - настройка для имен очередей. Сценарии использования описаны ниже.
- `rabbitmq_persistent` - флаг, от которого зависит настройка 'durable' для сообщений при запросах `INSERT`. По умолчанию: `0`.
- `rabbitmq_skip_broken_messages` максимальное количество некорректных сообщений в блоке. Если `rabbitmq_skip_broken_messages = N`, то движок отбрасывает `N` сообщений, которые не получилось обработать. Одно сообщение в точности соответствует одной записи (строке). Значение по умолчанию 0.
- `rabbitmq_max_block_size`
- `rabbitmq_flush_interval_ms`
Требуемая конфигурация:
@ -90,15 +95,23 @@ Example:
- `fanout` - маршрутизация по всем таблицам, где имя точки обмена совпадает, независимо от ключей.
- `topic` - маршрутизация основана на правилах с ключами, разделенными точками. Например: `*.logs`, `records.*.*.2020`, `*.2018,*.2019,*.2020`.
- `headers` - маршрутизация основана на совпадении `key=value` с настройкой `x-match=all` или `x-match=any`. Пример списка ключей таблицы: `x-match=all,format=logs,type=report,year=2020`.
- `consistent-hash` - данные равномерно распределяются между всеми связанными таблицами, где имя точки обмена совпадает. Обратите внимание, что этот тип обмена должен быть включен с помощью плагина RabbitMQ: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`.
- `consistent_hash` - данные равномерно распределяются между всеми связанными таблицами, где имя точки обмена совпадает. Обратите внимание, что этот тип обмена должен быть включен с помощью плагина RabbitMQ: `rabbitmq-plugins enable rabbitmq_consistent_hash_exchange`.
Если тип точки обмена не задан, по умолчанию используется `fanout`. В таком случае ключи маршрутизации для публикации данных должны быть рандомизированы в диапазоне `[1, num_consumers]` за каждое сообщение/пакет (или в диапазоне `[1, num_consumers * num_queues]`, если `rabbitmq_num_queues` задано). Эта конфигурация таблицы работает быстрее, чем любая другая, особенно когда заданы параметры `rabbitmq_num_consumers` и/или `rabbitmq_num_queues`.
Настройка `rabbitmq_queue_base` может быть использована в следующих случаях:
1. чтобы восстановить чтение из ранее созданных очередей, если оно прекратилось по какой-либо причине, но очереди остались непустыми. Для восстановления чтения из одной конкретной очереди, нужно написать ее имя в `rabbitmq_queue_base` настройку и не указывать настройки `rabbitmq_num_consumers` и `rabbitmq_num_queues`. Чтобы восстановить чтение из всех очередей, которые были созданы для конкретной таблицы, необходимо совпадение следующих настроек: `rabbitmq_queue_base`, `rabbitmq_num_consumers`, `rabbitmq_num_queues`. По умолчанию, если настройка `rabbitmq_queue_base` не указана, будут использованы уникальные для каждой таблицы имена очередей.
2. чтобы объявить одни и те же очереди для разных таблиц, что позволяет создавать несколько параллельных подписчиков на каждую из очередей. То есть обеспечивается лучшая производительность. В данном случае, для таких таблиц также необходимо совпадение настроек: `rabbitmq_num_consumers`, `rabbitmq_num_queues`.
3. чтобы повторно использовать созданные c `durable` настройкой очереди, так как они не удаляются автоматически (но могут быть удалены с помощью любого RabbitMQ CLI).
Для улучшения производительности полученные сообщения группируются в блоки размера [max\_insert\_block\_size](../../../operations/settings/settings.md#settings-max_insert_block_size). Если блок не удалось сформировать за [stream\_flush\_interval\_ms](../../../operations/settings/settings.md#stream-flush-interval-ms) миллисекунд, то данные будут сброшены в таблицу независимо от полноты блока.
Если параметры`rabbitmq_num_consumers` и/или `rabbitmq_num_queues` заданы вместе с параметром `rabbitmq_exchange_type`:
- плагин `rabbitmq-consistent-hash-exchange` должен быть включен.
- свойство `message_id` должно быть определено (уникальное для каждого сообщения/пакета).
При запросах `INSERT` отправляемым сообщениям добавляются метаданные: `messageID` и флаг `republished` - доступны через заголовки сообщений (headers).
Для запросов чтения и вставки не должна использоваться одна и та же таблица.
Пример:
``` sql
@ -120,3 +133,11 @@ Example:
SELECT key, value FROM daily ORDER BY key;
```
## Virtual Columns {#virtual-columns}
- `_exchange_name` - имя точки обмена RabbitMQ.
- `_channel_id` - идентификатор канала `ChannelID`, на котором было получено сообщение.
- `_delivery_tag` - значение `DeliveryTag` полученного сообщения. Уникально в рамках одного канала.
- `_redelivered` - флаг `redelivered`. (Не равно нулю, если есть возможность, что сообщение было получено более, чем одним каналом.)
- `_message_id` - значение `MessageID` полученного сообщения. Данное поле непусто, если указано в параметрах при отправке сообщения.

View File

@ -102,4 +102,9 @@ SELECT toDecimal32(1, 8) < 100
DB::Exception: Can't compare.
```
**Смотрите также**
- [isDecimalOverflow](../../sql-reference/functions/other-functions.md#is-decimal-overflow)
- [countDigits](../../sql-reference/functions/other-functions.md#count-digits)
[Оригинальная статья](https://clickhouse.tech/docs/ru/data_types/decimal/) <!--hide-->

View File

@ -1431,5 +1431,80 @@ SELECT randomStringUTF8(13)
```
## isDecimalOverflow {#is-decimal-overflow}
Проверяет, находится ли число [Decimal](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s) вне собственной (или заданной) области значений.
**Синтаксис**
``` sql
isDecimalOverflow(d, [p])
```
**Параметры**
- `d` — число. [Decimal](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s).
- `p` — точность. Необязательный параметр. Если опущен, используется исходная точность первого аргумента. Использование этого параметра может быть полезно для извлечения данных в другую СУБД или файл. [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
**Возвращаемое значение**
- `1` — число имеет больше цифр, чем позволяет точность.
- `0` — число удовлетворяет заданной точности.
**Пример**
Запрос:
``` sql
SELECT isDecimalOverflow(toDecimal32(1000000000, 0), 9),
isDecimalOverflow(toDecimal32(1000000000, 0)),
isDecimalOverflow(toDecimal32(-1000000000, 0), 9),
isDecimalOverflow(toDecimal32(-1000000000, 0));
```
Результат:
``` text
1 1 1 1
```
## countDigits {#count-digits}
Возвращает количество десятичных цифр, необходимых для представления значения.
**Синтаксис**
``` sql
countDigits(x)
```
**Параметры**
- `x` — [целое](../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64) или [дробное](../../sql-reference/data-types/decimal.md#decimalp-s-decimal32s-decimal64s-decimal128s) число.
**Возвращаемое значение**
Количество цифр.
Тип: [UInt8](../../sql-reference/data-types/int-uint.md#uint-ranges).
!!! note "Примечание"
Для `Decimal` значений учитывается их масштаб: вычисляется результат по базовому целочисленному типу, полученному как `(value * scale)`. Например: `countDigits(42) = 2`, `countDigits(42.000) = 5`, `countDigits(0.04200) = 4`. То есть вы можете проверить десятичное переполнение для `Decimal64` с помощью `countDecimal(x) > 18`. Это медленный вариант [isDecimalOverflow](#is-decimal-overflow).
**Пример**
Запрос:
``` sql
SELECT countDigits(toDecimal32(1, 9)), countDigits(toDecimal32(-1, 9)),
countDigits(toDecimal64(1, 18)), countDigits(toDecimal64(-1, 18)),
countDigits(toDecimal128(1, 38)), countDigits(toDecimal128(-1, 38));
```
Результат:
``` text
10 10 19 19 39 39
```
[Оригинальная статья](https://clickhouse.tech/docs/ru/query_language/functions/other_functions/) <!--hide-->

View File

@ -24,14 +24,14 @@ toc_title: "\u0412\u0432\u0435\u0434\u0435\u043D\u0438\u0435"
| Функция | Описание |
|-----------------------|---------------------------------------------------------------------------------------------------------------------------------------|
| [file](file.md) | Создаёт таблицу с движком [File](../../sql-reference/table-functions/index.md). |
| [merge](merge.md) | Создаёт таблицу с движком [Merge](../../sql-reference/table-functions/index.md). |
| [file](file.md) | Создаёт таблицу с движком [File](../../engines/table-engines/special/file.md). |
| [merge](merge.md) | Создаёт таблицу с движком [Merge](../../engines/table-engines/special/merge.md). |
| [numbers](numbers.md) | Создаёт таблицу с единственным столбцом, заполненным целыми числами. |
| [remote](remote.md) | Предоставляет доступ к удалённым серверам, не создавая таблицу с движком [Distributed](../../sql-reference/table-functions/index.md). |
| [url](url.md) | Создаёт таблицу с движком [Url](../../sql-reference/table-functions/index.md). |
| [mysql](mysql.md) | Создаёт таблицу с движком [MySQL](../../sql-reference/table-functions/index.md). |
| [jdbc](jdbc.md) | Создаёт таблицу с дижком [JDBC](../../sql-reference/table-functions/index.md). |
| [odbc](odbc.md) | Создаёт таблицу с движком [ODBC](../../sql-reference/table-functions/index.md). |
| [hdfs](hdfs.md) | Создаёт таблицу с движком [HDFS](../../sql-reference/table-functions/index.md). |
| [remote](remote.md) | Предоставляет доступ к удалённым серверам, не создавая таблицу с движком [Distributed](../../engines/table-engines/special/distributed.md). |
| [url](url.md) | Создаёт таблицу с движком [Url](../../engines/table-engines/special/url.md). |
| [mysql](mysql.md) | Создаёт таблицу с движком [MySQL](../../engines/table-engines/integrations/mysql.md). |
| [jdbc](jdbc.md) | Создаёт таблицу с дижком [JDBC](../../engines/table-engines/integrations/jdbc.md). |
| [odbc](odbc.md) | Создаёт таблицу с движком [ODBC](../../engines/table-engines/integrations/odbc.md). |
| [hdfs](hdfs.md) | Создаёт таблицу с движком [HDFS](../../engines/table-engines/integrations/hdfs.md). |
[Оригинальная статья](https://clickhouse.tech/docs/ru/query_language/table_functions/) <!--hide-->

View File

@ -468,7 +468,7 @@ clickhouse-client --query "INSERT INTO tutorial.hits_v1 FORMAT TSV" --max_insert
clickhouse-client --query "INSERT INTO tutorial.visits_v1 FORMAT TSV" --max_insert_block_size=100000 < visits_v1.tsv
```
ClickHouse有很多 [要调整的设置](../operations/settings/index.md) 在控制台客户端中指定它们的一种方法是通过参数,我们可以看到 `--max_insert_block_size`. 找出可用的设置,它们意味着什么以及默认值的最简单方法是查询 `system.settings` 表:
ClickHouse有很多 [要调整的设置](../operations/settings/index.md) 在控制台客户端中指定它们的一种方法是通过参数,就像我们看到上面语句中的 `--max_insert_block_size`。找出可用的设置、含义及其默认值的最简单方法是查询 `system.settings` 表:
``` sql
SELECT name, value, changed, description
@ -479,7 +479,7 @@ FORMAT TSV
max_insert_block_size 1048576 0 "The maximum block size for insertion, if we control the creation of blocks for insertion."
```
您也可以 [OPTIMIZE](../sql-reference/statements/misc.md#misc_operations-optimize) 导入后的表。 使用MergeTree-family引擎配置的表总是在后台合并数据部分以优化数据存储或至少检查是否有意义。 这些查询强制表引擎立即进行存储优化,而不是稍后进行一段时间:
您也可以 [OPTIMIZE](../sql-reference/statements/misc.md#misc_operations-optimize) 导入后的表。 使用MergeTree-family引擎配置的表总是在后台合并数据部分以优化数据存储或至少检查是否有意义。 这些查询强制表引擎立即进行存储优化,而不是稍后一段时间执行:
``` bash
clickhouse-client --query "OPTIMIZE TABLE tutorial.hits_v1 FINAL"
@ -521,14 +521,14 @@ WHERE (CounterID = 912887) AND (toYYYYMM(StartDate) = 201403) AND (domain(StartU
ClickHouse集群是一个同质集群。 设置步骤:
1. 在群集的所有计算机上安装ClickHouse服务器
1. 在群集的所有机器上安装ClickHouse服务端
2. 在配置文件中设置群集配置
3. 在每个实例上创建本地表
4. 创建一个 [分布式表](../engines/table-engines/special/distributed.md)
[分布式表](../engines/table-engines/special/distributed.md) 实际上是一种 “view” 到ClickHouse集群的本地表。 从分布式表中选择查询使用集群所有分片的资源执行。 您可以为多个集群指定configs并创建多个分布式表为不同的集群提供视图。
[分布式表](../engines/table-engines/special/distributed.md) 实际上是一种 “视图”映射到ClickHouse集群的本地表。 从分布式表中执行 **SELECT** 查询会使用集群所有分片的资源。 您可以为多个集群指定configs并创建多个分布式表为不同的集群提供视图。
具有三个分片的集群的示例配置,每个分片一个副本:
具有三个分片,每个分片一个副本的集群的示例配置:
``` xml
<remote_servers>
@ -555,7 +555,7 @@ ClickHouse集群是一个同质集群。 设置步骤:
</remote_servers>
```
为了进一步演示,让我们创建一个新的本地表 `CREATE TABLE` 我们用于查询 `hits_v1`,但不同的表名:
为了进一步演示,让我们使用和创建 `hits_v1` 表相同的 `CREATE TABLE` 语句创建一个新的本地表,但表名不同:
``` sql
CREATE TABLE tutorial.hits_local (...) ENGINE = MergeTree() ...
@ -570,14 +570,14 @@ ENGINE = Distributed(perftest_3shards_1replicas, tutorial, hits_local, rand());
常见的做法是在集群的所有计算机上创建类似的分布式表。 它允许在群集的任何计算机上运行分布式查询。 还有一个替代选项可以使用以下方法为给定的SELECT查询创建临时分布式表 [远程](../sql-reference/table-functions/remote.md) 表功能。
我们走吧 [INSERT SELECT](../sql-reference/statements/insert-into.md) 将该表传播到多个服务器。
让我们运行 [INSERT SELECT](../sql-reference/statements/insert-into.md) 将该表传播到多个服务器。
``` sql
INSERT INTO tutorial.hits_all SELECT * FROM tutorial.hits_v1;
```
!!! warning "碌莽禄Notice:"
这种方法不适合大型表的分片。 有一个单独的工具 [ツ环板-ョツ嘉ッツ偲](../operations/utilities/clickhouse-copier.md) 这可以重新分片任意大表。
!!! warning "注意:"
这种方法不适合大型表的分片。 有一个单独的工具 [clickhouse-copier](../operations/utilities/clickhouse-copier.md) 这可以重新分片任意大表。
正如您所期望的那样如果计算量大的查询使用3台服务器而不是一个则运行速度快N倍。
@ -609,10 +609,10 @@ INSERT INTO tutorial.hits_all SELECT * FROM tutorial.hits_v1;
</remote_servers>
```
启用本机复制 [动物园管理员](http://zookeeper.apache.org/) 是必需的。 ClickHouse负责所有副本的数据一致性并在失败后自动运行恢复过程。 建议将ZooKeeper集群部署在单独的服务器上其中没有其他进程包括ClickHouse正在运行)。
启用本机复制 [Zookeeper](http://zookeeper.apache.org/) 是必需的。 ClickHouse负责所有副本的数据一致性并在失败后自动运行恢复过程。 建议将ZooKeeper集群部署在单独的服务器上其中没有其他进程包括运行的ClickHouse
!!! note "注"
ZooKeeper不是一个严格的requirement:在某些简单的情况下,您可以通过将数据写入应用程序代码中的所有副本来复制数据。 这种方法是 **不** 建议在这种情况下ClickHouse将无法保证所有副本上的数据一致性。 因此,它成为您的应用程序的责任
ZooKeeper不是一个严格的要求:在某些简单的情况下,您可以通过将数据写入应用程序代码中的所有副本来复制数据。 这种方法是 **不** 建议在这种情况下ClickHouse将无法保证所有副本上的数据一致性。 因此需要由您的应用来保证这一点
ZooKeeper位置在配置文件中指定:

View File

@ -134,7 +134,6 @@ private:
bool stdout_is_a_tty = false; /// stdout is a terminal.
std::unique_ptr<Connection> connection; /// Connection to DB.
String query_id; /// Current query_id.
String full_query; /// Current query as it was given to the client.
// Current query as it will be sent to the server. It may differ from the
@ -219,6 +218,9 @@ private:
QueryFuzzer fuzzer;
int query_fuzzer_runs = 0;
/// We will format query_id in interactive mode in various ways, the default is just to print Query id: ...
std::vector<std::pair<String, String>> query_id_formats;
void initialize(Poco::Util::Application & self) override
{
Poco::Util::Application::initialize(self);
@ -243,6 +245,17 @@ private:
/// Set path for format schema files
if (config().has("format_schema_path"))
context.setFormatSchemaPath(Poco::Path(config().getString("format_schema_path")).toString());
/// Initialize query_id_formats if any
if (config().has("query_id_formats"))
{
Poco::Util::AbstractConfiguration::Keys keys;
config().keys("query_id_formats", keys);
for (const auto & name : keys)
query_id_formats.emplace_back(name + ":", config().getString("query_id_formats." + name));
}
if (query_id_formats.empty())
query_id_formats.emplace_back("Query id:", " {query_id}\n");
}
@ -559,7 +572,7 @@ private:
if (is_interactive)
{
if (!query_id.empty())
if (config().has("query_id"))
throw Exception("query_id could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS);
if (print_time_to_stderr)
throw Exception("time option could be specified only in non-interactive mode", ErrorCodes::BAD_ARGUMENTS);
@ -665,7 +678,9 @@ private:
}
else
{
query_id = config().getString("query_id", "");
auto query_id = config().getString("query_id", "");
if (!query_id.empty())
context.setCurrentQueryId(query_id);
if (query_fuzzer_runs)
{
nonInteractiveWithFuzzing();
@ -1274,6 +1289,19 @@ private:
std_out.next();
}
if (is_interactive)
{
// Generate a new query_id
context.setCurrentQueryId("");
for (const auto & query_id_format : query_id_formats)
{
writeString(query_id_format.first, std_out);
writeString(fmt::format(query_id_format.second, fmt::arg("query_id", context.getCurrentQueryId())), std_out);
writeChar('\n', std_out);
std_out.next();
}
}
watch.restart();
processed_rows = 0;
progress.reset();
@ -1399,7 +1427,7 @@ private:
connection->sendQuery(
connection_parameters.timeouts,
query_to_send,
query_id,
context.getCurrentQueryId(),
QueryProcessingStage::Complete,
&context.getSettingsRef(),
&context.getClientInfo(),
@ -1440,7 +1468,7 @@ private:
connection->sendQuery(
connection_parameters.timeouts,
query_to_send,
query_id,
context.getCurrentQueryId(),
QueryProcessingStage::Complete,
&context.getSettingsRef(),
&context.getClientInfo(),

View File

@ -168,6 +168,26 @@ ASTPtr extractOrderBy(const ASTPtr & storage_ast)
throw Exception("ORDER BY cannot be empty", ErrorCodes::BAD_ARGUMENTS);
}
/// Wraps only identifiers with backticks.
std::string wrapIdentifiersWithBackticks(const ASTPtr & root)
{
if (auto identifier = std::dynamic_pointer_cast<ASTIdentifier>(root))
return backQuote(identifier->name);
if (auto function = std::dynamic_pointer_cast<ASTFunction>(root))
return function->name + '(' + wrapIdentifiersWithBackticks(function->arguments) + ')';
if (auto expression_list = std::dynamic_pointer_cast<ASTExpressionList>(root))
{
Names function_arguments(expression_list->children.size());
for (size_t i = 0; i < expression_list->children.size(); ++i)
function_arguments[i] = wrapIdentifiersWithBackticks(expression_list->children[0]);
return boost::algorithm::join(function_arguments, ", ");
}
throw Exception("Primary key could be represented only as columns or functions from columns.", ErrorCodes::BAD_ARGUMENTS);
}
Names extractPrimaryKeyColumnNames(const ASTPtr & storage_ast)
{
@ -189,13 +209,14 @@ Names extractPrimaryKeyColumnNames(const ASTPtr & storage_ast)
ErrorCodes::BAD_ARGUMENTS);
Names primary_key_columns;
Names sorting_key_columns;
NameSet primary_key_columns_set;
for (size_t i = 0; i < sorting_key_size; ++i)
{
/// Column name could be represented as a f_1(f_2(...f_n(column_name))).
/// Each f_i could take one or more parameters.
/// We will wrap identifiers with backticks to allow non-standart identifier names.
String sorting_key_column = sorting_key_expr_list->children[i]->getColumnName();
sorting_key_columns.push_back(sorting_key_column);
if (i < primary_key_size)
{
@ -208,7 +229,7 @@ Names extractPrimaryKeyColumnNames(const ASTPtr & storage_ast)
if (!primary_key_columns_set.emplace(pk_column).second)
throw Exception("Primary key contains duplicate columns", ErrorCodes::BAD_ARGUMENTS);
primary_key_columns.push_back(pk_column);
primary_key_columns.push_back(wrapIdentifiersWithBackticks(primary_key_expr_list->children[i]));
}
}

View File

@ -6,6 +6,9 @@
#include <Core/Defines.h>
#include <ext/map.h>
#include <boost/algorithm/string/join.hpp>
namespace DB
{
@ -269,7 +272,7 @@ inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConf
ParserStorage parser_storage;
engine_push_ast = parseQuery(parser_storage, engine_push_str, 0, DBMS_DEFAULT_MAX_PARSER_DEPTH);
engine_push_partition_key_ast = extractPartitionKey(engine_push_ast);
primary_key_comma_separated = Nested::createCommaSeparatedStringFrom(extractPrimaryKeyColumnNames(engine_push_ast));
primary_key_comma_separated = boost::algorithm::join(extractPrimaryKeyColumnNames(engine_push_ast), ", ");
is_replicated_table = isReplicatedTableEngine(engine_push_ast);
}

View File

@ -267,12 +267,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
registerDictionaries();
registerDisks();
#if !defined(ARCADIA_BUILD)
#if USE_OPENCL
BitonicSort::getInstance().configure();
#endif
#endif
CurrentMetrics::set(CurrentMetrics::Revision, ClickHouseRevision::getVersionRevision());
CurrentMetrics::set(CurrentMetrics::VersionInteger, ClickHouseRevision::getVersionInteger());

View File

@ -143,13 +143,13 @@ void LinearModelData::updateState()
void LinearModelData::predict(
ColumnVector<Float64>::Container & container,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
const Context & context) const
{
gradient_computer->predict(container, block, offset, limit, arguments, weights, bias, context);
gradient_computer->predict(container, columns, offset, limit, arguments, weights, bias, context);
}
void LinearModelData::returnWeights(IColumn & to) const
@ -449,7 +449,7 @@ void IWeightsUpdater::addToBatch(
void LogisticRegression::predict(
ColumnVector<Float64>::Container & container,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
@ -457,7 +457,7 @@ void LogisticRegression::predict(
Float64 bias,
const Context & /*context*/) const
{
size_t rows_num = block.rows();
size_t rows_num = columns[arguments.front()].column->size();
if (offset > rows_num || offset + limit > rows_num)
throw Exception("Invalid offset and limit for LogisticRegression::predict. "
@ -468,7 +468,7 @@ void LogisticRegression::predict(
for (size_t i = 1; i < arguments.size(); ++i)
{
const ColumnWithTypeAndName & cur_col = block.getByPosition(arguments[i]);
const ColumnWithTypeAndName & cur_col = columns[arguments[i]];
if (!isNativeNumber(cur_col.type))
throw Exception("Prediction arguments must have numeric type", ErrorCodes::BAD_ARGUMENTS);
@ -518,7 +518,7 @@ void LogisticRegression::compute(
void LinearRegression::predict(
ColumnVector<Float64>::Container & container,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
@ -531,7 +531,7 @@ void LinearRegression::predict(
throw Exception("In predict function number of arguments differs from the size of weights vector", ErrorCodes::LOGICAL_ERROR);
}
size_t rows_num = block.rows();
size_t rows_num = columns[arguments.front()].column->size();
if (offset > rows_num || offset + limit > rows_num)
throw Exception("Invalid offset and limit for LogisticRegression::predict. "
@ -542,7 +542,7 @@ void LinearRegression::predict(
for (size_t i = 1; i < arguments.size(); ++i)
{
const ColumnWithTypeAndName & cur_col = block.getByPosition(arguments[i]);
const ColumnWithTypeAndName & cur_col = columns[arguments[i]];
if (!isNativeNumber(cur_col.type))
throw Exception("Prediction arguments must have numeric type", ErrorCodes::BAD_ARGUMENTS);

View File

@ -39,7 +39,7 @@ public:
virtual void predict(
ColumnVector<Float64>::Container & container,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
@ -65,7 +65,7 @@ public:
void predict(
ColumnVector<Float64>::Container & container,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
@ -91,7 +91,7 @@ public:
void predict(
ColumnVector<Float64>::Container & container,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
@ -264,7 +264,7 @@ public:
void predict(
ColumnVector<Float64>::Container & container,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
@ -364,7 +364,7 @@ public:
void predictValues(
ConstAggregateDataPtr place,
IColumn & to,
Block & block,
ColumnsWithTypeAndName & columns,
size_t offset,
size_t limit,
const ColumnNumbers & arguments,
@ -382,7 +382,7 @@ public:
throw Exception("Cast of column of predictions is incorrect. getReturnTypeToPredict must return same value as it is casted to",
ErrorCodes::LOGICAL_ERROR);
this->data(place).predict(column->getData(), block, offset, limit, arguments, context);
this->data(place).predict(column->getData(), columns, offset, limit, arguments, context);
}
/** This function is called if aggregate function without State modifier is selected in a query.

View File

@ -1,4 +1,3 @@
#pragma once
#include <DataTypes/DataTypeAggregateFunction.h>

View File

@ -114,7 +114,7 @@ public:
virtual void predictValues(
ConstAggregateDataPtr /* place */,
IColumn & /*to*/,
Block & /*block*/,
ColumnsWithTypeAndName & /*block*/,
size_t /*offset*/,
size_t /*limit*/,
const ColumnNumbers & /*arguments*/,

View File

@ -161,7 +161,7 @@ MutableColumnPtr ColumnAggregateFunction::convertToValues(MutableColumnPtr colum
return res;
}
MutableColumnPtr ColumnAggregateFunction::predictValues(Block & block, const ColumnNumbers & arguments, const Context & context) const
MutableColumnPtr ColumnAggregateFunction::predictValues(ColumnsWithTypeAndName & block, const ColumnNumbers & arguments, const Context & context) const
{
MutableColumnPtr res = func->getReturnTypeToPredict()->createColumn();
res->reserve(data.size());
@ -172,7 +172,7 @@ MutableColumnPtr ColumnAggregateFunction::predictValues(Block & block, const Col
if (data.size() == 1)
{
/// Case for const column. Predict using single model.
machine_learning_function->predictValues(data[0], *res, block, 0, block.rows(), arguments, context);
machine_learning_function->predictValues(data[0], *res, block, 0, block[arguments.front()].column->size(), arguments, context);
}
else
{

View File

@ -119,7 +119,7 @@ public:
const char * getFamilyName() const override { return "AggregateFunction"; }
TypeIndex getDataType() const override { return TypeIndex::AggregateFunction; }
MutableColumnPtr predictValues(Block & block, const ColumnNumbers & arguments, const Context & context) const;
MutableColumnPtr predictValues(ColumnsWithTypeAndName & block, const ColumnNumbers & arguments, const Context & context) const;
size_t size() const override
{

View File

@ -187,16 +187,16 @@ ColumnWithTypeAndName ColumnFunction::reduce() const
throw Exception("Cannot call function " + function->getName() + " because is has " + toString(args) +
"arguments but " + toString(captured) + " columns were captured.", ErrorCodes::LOGICAL_ERROR);
Block block(captured_columns);
block.insert({nullptr, function->getReturnType(), ""});
auto columns = captured_columns;
columns.emplace_back(ColumnWithTypeAndName {nullptr, function->getReturnType(), ""});
ColumnNumbers arguments(captured_columns.size());
for (size_t i = 0; i < captured_columns.size(); ++i)
arguments[i] = i;
function->execute(block, arguments, captured_columns.size(), size_);
function->execute(columns, arguments, captured_columns.size(), size_);
return block.getByPosition(captured_columns.size());
return columns[captured_columns.size()];
}
}

View File

@ -1,3 +1,4 @@
#pragma once
/**
* This file implements template methods of IColumn that depend on other types
* we don't want to include.
@ -5,8 +6,6 @@
* implementation.
*/
#pragma once
#include <Columns/IColumn.h>
#include <Common/PODArray.h>

View File

@ -1,7 +1,7 @@
#pragma once
/**
* This file provides forward declarations for Allocator.
*/
#pragma once
template <bool clear_memory_, bool mmap_populate = false>
class Allocator;

View File

@ -1,3 +1,4 @@
#pragma once
#include <Common/Arena.h>
#include <Common/Allocator.h>

View File

@ -34,13 +34,6 @@
M(QueryThread, "Number of query processing threads") \
M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \
M(MemoryTracking, "Total amount of memory (bytes) allocated by the server.") \
M(MemoryTrackingInBackgroundProcessingPool, "Total amount of memory (bytes) allocated in background processing pool (that is dedicated for background merges, mutations and fetches). Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \
M(MemoryTrackingInBackgroundMoveProcessingPool, "Total amount of memory (bytes) allocated in background processing pool (that is dedicated for background moves). Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \
M(MemoryTrackingInBackgroundSchedulePool, "Total amount of memory (bytes) allocated in background schedule pool (that is dedicated for bookkeeping tasks of Replicated tables).") \
M(MemoryTrackingInBackgroundBufferFlushSchedulePool, "Total amount of memory (bytes) allocated in background buffer flushes pool (that is dedicated for background buffer flushes).") \
M(MemoryTrackingInBackgroundDistributedSchedulePool, "Total amount of memory (bytes) allocated in background distributed schedule pool (that is dedicated for distributed sends).") \
M(MemoryTrackingInBackgroundMessageBrokerSchedulePool, "Total amount of memory (bytes) allocated in background message broker pool (that is dedicated for background message streaming).") \
M(MemoryTrackingForMerges, "Total amount of memory (bytes) allocated for background merges. Included in MemoryTrackingInBackgroundProcessingPool. Note that this value may include a drift when the memory was allocated in a context of background processing pool and freed in other context or vice-versa. This happens naturally due to caches for tables indexes and doesn't indicate memory leaks.") \
M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \
M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \
M(ZooKeeperWatch, "Number of watches (event subscriptions) in ZooKeeper.") \

View File

@ -73,7 +73,7 @@ inline DB::UInt64 intHashCRC32(DB::UInt64 x, DB::UInt64 updated_value)
}
template <typename T>
inline typename std::enable_if<(sizeof(T) > sizeof(DB::UInt64)) && !is_big_int_v<T>, DB::UInt64>::type
inline typename std::enable_if<(sizeof(T) > sizeof(DB::UInt64)), DB::UInt64>::type
intHashCRC32(const T & x, DB::UInt64 updated_value)
{
auto * begin = reinterpret_cast<const char *>(&x);
@ -86,16 +86,6 @@ intHashCRC32(const T & x, DB::UInt64 updated_value)
return updated_value;
}
template <typename T>
inline typename std::enable_if<is_big_int_v<T>, DB::UInt64>::type
intHashCRC32(const T & x, DB::UInt64 updated_value)
{
std::vector<UInt64> parts = DB::BigInt<T>::toIntArray(x);
for (const auto & part : parts)
updated_value = intHashCRC32(part, updated_value);
return updated_value;
}
inline UInt32 updateWeakHash32(const DB::UInt8 * pos, size_t size, DB::UInt32 updated_value)
{
@ -248,22 +238,7 @@ inline size_t hashCRC32(std::enable_if_t<(sizeof(T) <= sizeof(UInt64)), T> key)
template <typename T>
inline size_t hashCRC32(std::enable_if_t<(sizeof(T) > sizeof(UInt64)), T> key)
{
if constexpr (std::is_same_v<T, DB::Int128>)
{
return intHashCRC32(static_cast<UInt64>(key) ^ static_cast<UInt64>(key >> 64));
}
else if constexpr (std::is_same_v<T, DB::UInt128>)
{
return intHashCRC32(key.low ^ key.high);
}
else if constexpr (is_big_int_v<T> && sizeof(T) == 32)
{
return intHashCRC32(static_cast<UInt64>(key) ^
static_cast<UInt64>(key >> 64) ^
static_cast<UInt64>(key >> 128) ^
static_cast<UInt64>(key >> 256));
}
__builtin_unreachable();
return intHashCRC32(key, -1);
}
#define DEFINE_HASH(T) \

View File

@ -1,3 +1,4 @@
#pragma once
#include <array>
namespace Poco { namespace Net { class IPAddress; }}

View File

@ -1,3 +1,4 @@
#pragma once
/**
* This file contains some using-declarations that define various kinds of
* PODArray.

View File

@ -1,3 +1,4 @@
#pragma once
#include <cstdint>

View File

@ -82,13 +82,6 @@ inline UInt64 getCurrentTimeNanoseconds(clockid_t clock_type = CLOCK_MONOTONIC)
return ts.tv_sec * 1000000000ULL + ts.tv_nsec;
}
inline UInt64 getCurrentTimeMicroseconds()
{
struct timeval tv;
gettimeofday(&tv, nullptr);
return (tv.tv_sec) * 1000000U + (tv.tv_usec);
}
struct RUsageCounters
{
/// In nanoseconds
@ -115,13 +108,6 @@ struct RUsageCounters
hard_page_faults = static_cast<UInt64>(rusage.ru_majflt);
}
static RUsageCounters zeros(UInt64 real_time_ = getCurrentTimeNanoseconds())
{
RUsageCounters res;
res.real_time = real_time_;
return res;
}
static RUsageCounters current(UInt64 real_time_ = getCurrentTimeNanoseconds())
{
::rusage rusage {};

View File

@ -1,3 +1,4 @@
#pragma once
#include <utility>
#include <cstddef>

View File

@ -1,3 +1,4 @@
#pragma once
#if defined(__linux__)
#include <linux/capability.h>

View File

@ -1,3 +1,4 @@
#pragma once
namespace DB
{

View File

@ -1,3 +1,4 @@
#pragma once
#include <Functions/FunctionFactory.h>
#include <Functions/registerFunctions.h>

View File

@ -279,7 +279,9 @@ int main(int argc, char ** argv)
if (!method || method == 1) test<identity> (n, data.data(), "0: identity");
if (!method || method == 2) test<intHash32> (n, data.data(), "1: intHash32");
#if !defined(__APPLE__) /// The difference in size_t: unsigned long on Linux, unsigned long long on Mac OS.
if (!method || method == 3) test<intHash64> (n, data.data(), "2: intHash64");
#endif
if (!method || method == 4) test<hash3> (n, data.data(), "3: two rounds");
if (!method || method == 5) test<hash4> (n, data.data(), "4: two rounds and two variables");
if (!method || method == 6) test<hash5> (n, data.data(), "5: two rounds with less ops");

View File

@ -150,10 +150,9 @@ Coordination::WatchCallback BackgroundSchedulePoolTaskInfo::getWatchCallback()
}
BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, CurrentMetrics::Metric memory_metric_, const char *thread_name_)
BackgroundSchedulePool::BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, const char *thread_name_)
: size(size_)
, tasks_metric(tasks_metric_)
, memory_metric(memory_metric_)
, thread_name(thread_name_)
{
LOG_INFO(&Poco::Logger::get("BackgroundSchedulePool/" + thread_name), "Create BackgroundSchedulePool with {} threads", size);
@ -249,8 +248,6 @@ void BackgroundSchedulePool::threadFunction()
attachToThreadGroup();
SCOPE_EXIT({ CurrentThread::detachQueryIfNotDetached(); });
if (auto * memory_tracker = CurrentThread::getMemoryTracker())
memory_tracker->setMetric(memory_metric);
while (!shutdown)
{

View File

@ -51,7 +51,7 @@ public:
size_t getNumberOfThreads() const { return size; }
/// thread_name_ cannot be longer then 13 bytes (2 bytes is reserved for "/D" suffix for delayExecutionThreadFunction())
BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, CurrentMetrics::Metric memory_metric_, const char *thread_name_);
BackgroundSchedulePool(size_t size_, CurrentMetrics::Metric tasks_metric_, const char *thread_name_);
~BackgroundSchedulePool();
private:
@ -85,7 +85,6 @@ private:
ThreadGroupStatusPtr thread_group;
CurrentMetrics::Metric tasks_metric;
CurrentMetrics::Metric memory_metric;
std::string thread_name;
void attachToThreadGroup();

View File

@ -1,8 +1,10 @@
#pragma once
#include <common/StringRef.h>
#include <common/unaligned.h>
#include <Core/Types.h>
namespace DB
{
@ -14,8 +16,7 @@ struct BigInt
static StringRef serialize(const T & x, char * pos)
{
//unalignedStore<T>(pos, x);
memcpy(pos, &x, size);
unalignedStore<T>(pos, x);
return StringRef(pos, size);
}
@ -28,20 +29,7 @@ struct BigInt
static T deserialize(const char * pos)
{
//return unalignedLoad<T>(pos);
T res;
memcpy(&res, pos, size);
return res;
}
static std::vector<UInt64> toIntArray(const T & x)
{
std::vector<UInt64> parts(4, 0);
parts[0] = UInt64(x);
parts[1] = UInt64(x >> 64);
parts[2] = UInt64(x >> 128);
parts[4] = UInt64(x >> 192);
return parts;
return unalignedLoad<T>(pos);
}
};

View File

@ -152,6 +152,12 @@ public:
private:
void eraseImpl(size_t position);
void initializeIndexByName();
/// This is needed to allow function execution over data.
/// It is safe because functions does not change column names, so index is unaffected.
/// It is temporary.
friend struct ExpressionAction;
friend class ActionsDAG;
};
using Blocks = std::vector<Block>;

View File

@ -60,14 +60,14 @@ public:
using ArrayA = typename ColVecA::Container;
using ArrayB = typename ColVecB::Container;
DecimalComparison(Block & block, size_t result, const ColumnWithTypeAndName & col_left, const ColumnWithTypeAndName & col_right)
DecimalComparison(ColumnsWithTypeAndName & data, size_t result, const ColumnWithTypeAndName & col_left, const ColumnWithTypeAndName & col_right)
{
if (!apply(block, result, col_left, col_right))
if (!apply(data, result, col_left, col_right))
throw Exception("Wrong decimal comparison with " + col_left.type->getName() + " and " + col_right.type->getName(),
ErrorCodes::LOGICAL_ERROR);
}
static bool apply(Block & block, size_t result [[maybe_unused]],
static bool apply(ColumnsWithTypeAndName & data, size_t result [[maybe_unused]],
const ColumnWithTypeAndName & col_left, const ColumnWithTypeAndName & col_right)
{
if constexpr (_actual)
@ -77,7 +77,7 @@ public:
c_res = applyWithScale(col_left.column, col_right.column, shift);
if (c_res)
block.getByPosition(result).column = std::move(c_res);
data[result].column = std::move(c_res);
return true;
}
return false;

View File

@ -152,12 +152,14 @@ inline typename DecimalType::NativeType getFractionalPartWithScaleMultiplier(
{
using T = typename DecimalType::NativeType;
T result = decimal.value;
/// There's UB with min integer value here. But it does not matter for Decimals cause they use not full integer ranges.
/// Anycase we make modulo before compare to make scale_multiplier > 1 unaffected.
T result = decimal.value % scale_multiplier;
if constexpr (!keep_sign)
if (result < T(0))
result = -result;
return result % scale_multiplier;
return result;
}
/** Get fractional part from decimal

View File

@ -902,7 +902,7 @@ public:
{
type_to_method[user_auth_type]->authenticate(user_name, context, mt, address);
mt.send(Messaging::AuthenticationOk(), true);
LOG_INFO(log, "Authentication for user {} was successful.", user_name);
LOG_DEBUG(log, "Authentication for user {} was successful.", user_name);
return;
}

View File

@ -135,6 +135,7 @@ class IColumn;
\
M(Bool, force_index_by_date, 0, "Throw an exception if there is a partition key in a table, and it is not used.", 0) \
M(Bool, force_primary_key, 0, "Throw an exception if there is primary key in a table, and it is not used.", 0) \
M(String, force_data_skipping_indices, "", "Comma separated list of strings or literals with the name of the data skipping indices that should be used during query execution, otherwise an exception will be thrown.", 0) \
\
M(Float, max_streams_to_max_threads_ratio, 1, "Allows you to use more sources than the number of threads - to more evenly distribute work across threads. It is assumed that this is a temporary solution, since it will be possible in the future to make the number of sources equal to the number of threads, but for each source to dynamically select available work for itself.", 0) \
M(Float, max_streams_multiplier_for_merge_tables, 5, "Ask more streams when reading from Merge table. Streams will be spread across tables that Merge table will use. This allows more even distribution of work across threads and especially helpful when merged tables differ in size.", 0) \
@ -158,6 +159,7 @@ class IColumn;
\
M(UInt64, insert_quorum, 0, "For INSERT queries in the replicated table, wait writing for the specified number of replicas and linearize the addition of the data. 0 - disabled.", 0) \
M(Milliseconds, insert_quorum_timeout, 600000, "", 0) \
M(Bool, insert_quorum_parallel, false, "For quorum INSERT queries - enable to make parallel inserts without linearizability", 0) \
M(UInt64, select_sequential_consistency, 0, "For SELECT queries from the replicated table, throw an exception if the replica does not have a chunk written with the quorum; do not read the parts that have not yet been written with the quorum.", 0) \
M(UInt64, table_function_remote_max_addresses, 1000, "The maximum number of different shards and the maximum number of replicas of one shard in the `remote` function.", 0) \
M(Milliseconds, read_backoff_min_latency_ms, 1000, "Setting to reduce the number of threads in case of slow reads. Pay attention only to reads that took at least that much time.", 0) \
@ -389,14 +391,6 @@ class IColumn;
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\
M(Bool, allow_experimental_low_cardinality_type, true, "Obsolete setting, does nothing. Will be removed after 2019-08-13", 0) \
M(Bool, compile, false, "Whether query compilation is enabled. Will be removed after 2020-03-13", 0) \
M(UInt64, min_count_to_compile, 0, "Obsolete setting, does nothing. Will be removed after 2020-03-13", 0) \
M(Bool, allow_experimental_multiple_joins_emulation, true, "Obsolete setting, does nothing. Will be removed after 2020-05-31", 0) \
M(Bool, allow_experimental_cross_to_join_conversion, true, "Obsolete setting, does nothing. Will be removed after 2020-05-31", 0) \
M(Bool, allow_experimental_data_skipping_indices, true, "Obsolete setting, does nothing. Will be removed after 2020-05-31", 0) \
M(Bool, merge_tree_uniform_read_distribution, true, "Obsolete setting, does nothing. Will be removed after 2020-05-20", 0) \
M(UInt64, mark_cache_min_lifetime, 0, "Obsolete setting, does nothing. Will be removed after 2020-05-31", 0) \
M(Bool, partial_merge_join, false, "Obsolete. Use join_algorithm='prefer_partial_merge' instead.", 0) \
M(UInt64, max_memory_usage_for_all_queries, 0, "Obsolete. Will be removed after 2020-10-20", 0) \
M(UInt64, multiple_joins_rewriter_version, 0, "Obsolete setting, does nothing. Will be removed after 2021-03-31", 0) \
@ -470,6 +464,7 @@ class IColumn;
M(Bool, output_format_write_statistics, true, "Write statistics about read rows, bytes, time elapsed in suitable output formats.", 0) \
M(Bool, allow_non_metadata_alters, true, "Allow to execute alters which affects not only tables metadata, but also data on disk", 0) \
M(Bool, output_format_pretty_row_numbers, false, "Add row numbers before each row for pretty output format", 0) \
M(Bool, enable_global_with_statement, false, "Propagate WITH statements to UNION queries and all subqueries", 0) \
#define LIST_OF_SETTINGS(M) \
COMMON_SETTINGS(M) \

View File

@ -1,3 +1,4 @@
#pragma once
/* Copyright (c) 2018 BlackBerry Limited
Licensed under the Apache License, Version 2.0 (the "License");
@ -9,7 +10,6 @@ distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. */
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Processors/Sources/SourceWithProgress.h>

View File

@ -1,192 +0,0 @@
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnsCommon.h>
#include <Columns/ColumnConst.h>
#include <Columns/FilterDescription.h>
#include <Interpreters/ExpressionActions.h>
#include <Common/typeid_cast.h>
#include <DataStreams/FilterBlockInputStream.h>
namespace DB
{
namespace ErrorCodes
{
}
FilterBlockInputStream::FilterBlockInputStream(const BlockInputStreamPtr & input, ExpressionActionsPtr expression_,
String filter_column_name_, bool remove_filter_)
: remove_filter(remove_filter_)
, expression(std::move(expression_))
, filter_column_name(std::move(filter_column_name_))
{
children.push_back(input);
/// Determine position of filter column.
header = input->getHeader();
expression->execute(header);
filter_column = header.getPositionByName(filter_column_name);
auto & column_elem = header.safeGetByPosition(filter_column);
/// Isn't the filter already constant?
if (column_elem.column)
constant_filter_description = ConstantFilterDescription(*column_elem.column);
if (!constant_filter_description.always_false
&& !constant_filter_description.always_true)
{
/// Replace the filter column to a constant with value 1.
FilterDescription filter_description_check(*column_elem.column);
column_elem.column = column_elem.type->createColumnConst(header.rows(), 1u);
}
if (remove_filter)
header.erase(filter_column_name);
}
String FilterBlockInputStream::getName() const { return "Filter"; }
Block FilterBlockInputStream::getTotals()
{
totals = children.back()->getTotals();
expression->execute(totals);
return totals;
}
Block FilterBlockInputStream::getHeader() const
{
return header;
}
Block FilterBlockInputStream::readImpl()
{
Block res;
if (constant_filter_description.always_false)
return removeFilterIfNeed(std::move(res));
if (expression->checkColumnIsAlwaysFalse(filter_column_name))
return {};
/// Until non-empty block after filtering or end of stream.
while (true)
{
res = children.back()->read();
if (!res)
return res;
expression->execute(res);
if (constant_filter_description.always_true)
return removeFilterIfNeed(std::move(res));
size_t columns = res.columns();
ColumnPtr column = res.safeGetByPosition(filter_column).column;
/** It happens that at the stage of analysis of expressions (in sample_block) the columns-constants have not been calculated yet,
* and now - are calculated. That is, not all cases are covered by the code above.
* This happens if the function returns a constant for a non-constant argument.
* For example, `ignore` function.
*/
constant_filter_description = ConstantFilterDescription(*column);
if (constant_filter_description.always_false)
{
res.clear();
return res;
}
if (constant_filter_description.always_true)
return removeFilterIfNeed(std::move(res));
FilterDescription filter_and_holder(*column);
/** Let's find out how many rows will be in result.
* To do this, we filter out the first non-constant column
* or calculate number of set bytes in the filter.
*/
size_t first_non_constant_column = 0;
for (size_t i = 0; i < columns; ++i)
{
if (!isColumnConst(*res.safeGetByPosition(i).column))
{
first_non_constant_column = i;
if (first_non_constant_column != static_cast<size_t>(filter_column))
break;
}
}
size_t filtered_rows = 0;
if (first_non_constant_column != static_cast<size_t>(filter_column))
{
ColumnWithTypeAndName & current_column = res.safeGetByPosition(first_non_constant_column);
current_column.column = current_column.column->filter(*filter_and_holder.data, -1);
filtered_rows = current_column.column->size();
}
else
{
filtered_rows = countBytesInFilter(*filter_and_holder.data);
}
/// If the current block is completely filtered out, let's move on to the next one.
if (filtered_rows == 0)
continue;
/// If all the rows pass through the filter.
if (filtered_rows == filter_and_holder.data->size())
{
/// Replace the column with the filter by a constant.
res.safeGetByPosition(filter_column).column = res.safeGetByPosition(filter_column).type->createColumnConst(filtered_rows, 1u);
/// No need to touch the rest of the columns.
return removeFilterIfNeed(std::move(res));
}
/// Filter the rest of the columns.
for (size_t i = 0; i < columns; ++i)
{
ColumnWithTypeAndName & current_column = res.safeGetByPosition(i);
if (i == static_cast<size_t>(filter_column))
{
/// The column with filter itself is replaced with a column with a constant `1`, since after filtering, nothing else will remain.
/// NOTE User could pass column with something different than 0 and 1 for filter.
/// Example:
/// SELECT materialize(100) AS x WHERE x
/// will work incorrectly.
current_column.column = current_column.type->createColumnConst(filtered_rows, 1u);
continue;
}
if (i == first_non_constant_column)
continue;
if (isColumnConst(*current_column.column))
current_column.column = current_column.column->cut(0, filtered_rows);
else
current_column.column = current_column.column->filter(*filter_and_holder.data, -1);
}
return removeFilterIfNeed(std::move(res));
}
}
Block FilterBlockInputStream::removeFilterIfNeed(Block && block) const
{
if (block && remove_filter)
block.erase(static_cast<size_t>(filter_column));
return std::move(block);
}
}

View File

@ -1,46 +0,0 @@
#pragma once
#include <DataStreams/IBlockInputStream.h>
#include <Columns/FilterDescription.h>
namespace DB
{
class ExpressionActions;
/** Implements WHERE, HAVING operations.
* A stream of blocks and an expression, which adds to the block one ColumnUInt8 column containing the filtering conditions, are passed as input.
* The expression is evaluated and a stream of blocks is returned, which contains only the filtered rows.
*/
class FilterBlockInputStream : public IBlockInputStream
{
private:
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
public:
FilterBlockInputStream(const BlockInputStreamPtr & input, ExpressionActionsPtr expression_,
String filter_column_name_, bool remove_filter_ = false);
String getName() const override;
Block getTotals() override;
Block getHeader() const override;
protected:
Block readImpl() override;
bool remove_filter;
private:
ExpressionActionsPtr expression;
Block header;
String filter_column_name;
ssize_t filter_column;
ConstantFilterDescription constant_filter_description;
Block removeFilterIfNeed(Block && block) const;
};
}

View File

@ -7,7 +7,6 @@ namespace DB
{
/** A stream of blocks from which you can read one block.
* Also see BlocksListBlockInputStream.
*/
class OneBlockInputStream : public IBlockInputStream
{

View File

@ -1,6 +1,7 @@
#include <DataStreams/ConvertingBlockInputStream.h>
#include <DataStreams/PushingToViewsBlockOutputStream.h>
#include <DataStreams/SquashingBlockInputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataTypes/NestedUtils.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterInsertQuery.h>

View File

@ -2,7 +2,6 @@
#include <DataStreams/copyData.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OneBlockInputStream.h>
#include <DataStreams/MaterializingBlockInputStream.h>
#include <Storages/StorageMaterializedView.h>

View File

@ -150,7 +150,7 @@ void TTLBlockInputStream::readSuffixImpl()
data_part->expired_columns = std::move(empty_columns);
if (rows_removed)
LOG_INFO(log, "Removed {} rows with expired TTL from part {}", rows_removed, data_part->name);
LOG_DEBUG(log, "Removed {} rows with expired TTL from part {}", rows_removed, data_part->name);
}
void TTLBlockInputStream::removeRowsWithExpiredTableTTL(Block & block)

View File

@ -1,4 +0,0 @@
set(SRCS)
add_executable (finish_sorting_stream finish_sorting_stream.cpp ${SRCS})
target_link_libraries (finish_sorting_stream PRIVATE clickhouse_aggregate_functions dbms)

View File

@ -1,132 +0,0 @@
#include <iostream>
#include <iomanip>
#include <pcg_random.hpp>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnsNumber.h>
#include <Core/SortDescription.h>
#include <Interpreters/sortBlock.h>
#include <Processors/Transforms/FinishSortingTransform.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Executors/PipelineExecutingBlockInputStream.h>
#include <Processors/Transforms/MergeSortingTransform.h>
#include <DataStreams/BlocksListBlockInputStream.h>
using namespace DB;
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
}
int main(int argc, char ** argv)
{
pcg64 rng;
try
{
size_t m = argc >= 2 ? std::stol(argv[1]) : 2;
size_t n = argc >= 3 ? std::stol(argv[2]) : 10;
SortDescription sort_descr;
sort_descr.emplace_back("col1", 1, 1);
Block block_header;
BlocksList blocks;
for (size_t t = 0; t < m; ++t)
{
Block block;
for (size_t i = 0; i < 2; ++i)
{
ColumnWithTypeAndName column;
column.name = "col" + std::to_string(i + 1);
column.type = std::make_shared<DataTypeInt32>();
auto col = ColumnInt32::create();
auto & vec = col->getData();
vec.resize(n);
for (size_t j = 0; j < n; ++j)
vec[j] = rng() % 10;
column.column = std::move(col);
block.insert(column);
}
if (!block_header)
block_header = block.cloneEmpty();
sortBlock(block, sort_descr);
blocks.emplace_back(std::move(block));
}
auto blocks_stream = std::make_shared<BlocksListBlockInputStream>(std::move(blocks));
Pipe source(std::make_shared<SourceFromInputStream>(std::move(blocks_stream)));
QueryPipeline pipeline;
pipeline.init(std::move(source));
pipeline.addTransform(std::make_shared<MergeSortingTransform>(pipeline.getHeader(), sort_descr, n, 0, 0, 0, nullptr, 0));
SortDescription sort_descr_final;
sort_descr_final.emplace_back("col1", 1, 1);
sort_descr_final.emplace_back("col2", 1, 1);
pipeline.addSimpleTransform([&](const Block & header)
{
return std::make_shared<FinishSortingTransform>(header, sort_descr, sort_descr_final, n, 0);
});
auto stream = std::make_shared<PipelineExecutingBlockInputStream>(std::move(pipeline));
{
Stopwatch stopwatch;
stopwatch.start();
Block res_block = block_header;
while (Block block = stream->read())
{
for (size_t i = 0; i < block.columns(); ++i)
{
MutableColumnPtr ptr = IColumn::mutate(std::move(res_block.getByPosition(i).column));
ptr->insertRangeFrom(*block.getByPosition(i).column.get(), 0, block.rows());
}
}
if (res_block.rows() != n * m)
throw Exception("Result block size mismatch", ErrorCodes::LOGICAL_ERROR);
const auto & columns = res_block.getColumns();
for (size_t i = 1; i < res_block.rows(); ++i)
for (const auto & col : columns)
{
int res = col->compareAt(i - 1, i, *col, 1);
if (res < 0)
break;
else if (res > 0)
throw Exception("Result stream not sorted", ErrorCodes::LOGICAL_ERROR);
}
stopwatch.stop();
std::cout << std::fixed << std::setprecision(2)
<< "Elapsed " << stopwatch.elapsedSeconds() << " sec."
<< ", " << n / stopwatch.elapsedSeconds() << " rows/sec."
<< std::endl;
}
}
catch (const Exception & e)
{
std::cerr << e.displayText() << std::endl;
return -1;
}
return 0;
}

View File

@ -25,7 +25,6 @@ SRCS(
DistinctSortedBlockInputStream.cpp
ExecutionSpeedLimits.cpp
ExpressionBlockInputStream.cpp
FilterBlockInputStream.cpp
finalizeBlock.cpp
IBlockInputStream.cpp
InputStreamFromASTInsertQuery.cpp

View File

@ -70,17 +70,6 @@ std::pair<std::string, std::string> splitName(const std::string & name)
return {{ begin, first_end }, { second_begin, end }};
}
std::string createCommaSeparatedStringFrom(const Names & names)
{
std::ostringstream ss;
if (!names.empty())
{
std::copy(names.begin(), std::prev(names.end()), std::ostream_iterator<std::string>(ss, ", "));
ss << names.back();
}
return ss.str();
}
std::string extractTableName(const std::string & nested_name)
{

View File

@ -13,8 +13,6 @@ namespace Nested
std::pair<std::string, std::string> splitName(const std::string & name);
std::string createCommaSeparatedStringFrom(const Names & names);
/// Returns the prefix of the name to the first '.'. Or the name is unchanged if there is no dot.
std::string extractTableName(const std::string & nested_name);

View File

@ -1,3 +1,4 @@
#pragma once
#include <Databases/DatabaseOnDisk.h>
#include <boost/smart_ptr/atomic_shared_ptr.hpp>
#include <ext/scope_guard.h>

View File

@ -1,3 +1,4 @@
#pragma once
#include "CacheDictionary.h"
#include <Columns/ColumnsNumber.h>

View File

@ -173,7 +173,6 @@ namespace
std::function<void(WriteBufferFromFile &)> send_data;
ThreadFromGlobalPool thread;
};
}

View File

@ -1,3 +1,4 @@
#pragma once
#include <Disks/DiskLocal.h>
#include <Disks/DiskMemory.h>
#include <Disks/IDisk.h>

View File

@ -6,6 +6,7 @@
#include <Core/DecimalFunctions.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/extractTimeZoneFromFunctionArguments.h>
#include <Functions/IFunctionImpl.h>
#include <Common/Exception.h>
#include <common/DateLUTImpl.h>
@ -115,29 +116,29 @@ template <typename FromDataType, typename ToDataType>
struct CustomWeekTransformImpl
{
template <typename Transform>
static void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/, Transform transform = {})
static void execute(ColumnsWithTypeAndName & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/, Transform transform = {})
{
const auto op = Transformer<typename FromDataType::FieldType, typename ToDataType::FieldType, Transform>{std::move(transform)};
UInt8 week_mode = DEFAULT_WEEK_MODE;
if (arguments.size() > 1)
{
if (const auto week_mode_column = checkAndGetColumnConst<ColumnUInt8>(block.getByPosition(arguments[1]).column.get()))
if (const auto week_mode_column = checkAndGetColumnConst<ColumnUInt8>(block[arguments[1]].column.get()))
week_mode = week_mode_column->getValue<UInt8>();
}
const DateLUTImpl & time_zone = extractTimeZoneFromFunctionArguments(block, arguments, 2, 0);
const ColumnPtr source_col = block.getByPosition(arguments[0]).column;
const ColumnPtr source_col = block[arguments[0]].column;
if (const auto * sources = checkAndGetColumn<typename FromDataType::ColumnType>(source_col.get()))
{
auto col_to = ToDataType::ColumnType::create();
op.vector(sources->getData(), col_to->getData(), week_mode, time_zone);
block.getByPosition(result).column = std::move(col_to);
block[result].column = std::move(col_to);
}
else
{
throw Exception(
"Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of first argument of function "
"Illegal column " + block[arguments[0]].column->getName() + " of first argument of function "
+ Transform::name,
ErrorCodes::ILLEGAL_COLUMN);
}

View File

@ -6,6 +6,7 @@
#include <Columns/ColumnVector.h>
#include <Columns/ColumnDecimal.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunctionImpl.h>
#include <Functions/extractTimeZoneFromFunctionArguments.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
@ -682,25 +683,25 @@ struct Transformer
template <typename FromDataType, typename ToDataType, typename Transform>
struct DateTimeTransformImpl
{
static void execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/, const Transform & transform = {})
static void execute(ColumnsWithTypeAndName & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/, const Transform & transform = {})
{
using Op = Transformer<typename FromDataType::FieldType, typename ToDataType::FieldType, Transform>;
const DateLUTImpl & time_zone = extractTimeZoneFromFunctionArguments(block, arguments, 1, 0);
const ColumnPtr source_col = block.getByPosition(arguments[0]).column;
const ColumnPtr source_col = block[arguments[0]].column;
if (const auto * sources = checkAndGetColumn<typename FromDataType::ColumnType>(source_col.get()))
{
auto mutable_result_col = block.getByPosition(result).type->createColumn();
auto mutable_result_col = block[result].type->createColumn();
auto * col_to = assert_cast<typename ToDataType::ColumnType *>(mutable_result_col.get());
Op::vector(sources->getData(), col_to->getData(), time_zone, transform);
block.getByPosition(result).column = std::move(mutable_result_col);
block[result].column = std::move(mutable_result_col);
}
else
{
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
throw Exception("Illegal column " + block[arguments[0]].column->getName()
+ " of first argument of function " + Transform::name,
ErrorCodes::ILLEGAL_COLUMN);
}

View File

@ -1,3 +1,4 @@
#pragma once
#if !defined(ARCADIA_BUILD)
# include "config_functions.h"
#endif
@ -92,12 +93,12 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const override
{
const ColumnPtr column_string = block.getByPosition(arguments[0]).column;
const ColumnPtr column_string = block[arguments[0]].column;
const ColumnString * input = checkAndGetColumn<ColumnString>(column_string.get());
if (!input)
throw Exception(
"Illegal column " + block.getByPosition(arguments[0]).column->getName() + " of first argument of function " + getName(),
"Illegal column " + block[arguments[0]].column->getName() + " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_COLUMN);
auto dst_column = ColumnString::create();
@ -165,7 +166,7 @@ public:
dst_data.resize(dst_pos - dst);
block.getByPosition(result).column = std::move(dst_column);
block[result].column = std::move(dst_column);
}
};
}

View File

@ -616,14 +616,14 @@ class FunctionBinaryArithmetic : public IFunction
void executeAggregateMultiply(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const
{
ColumnNumbers new_arguments = arguments;
if (WhichDataType(block.getByPosition(new_arguments[1]).type).isAggregateFunction())
if (WhichDataType(block[new_arguments[1]].type).isAggregateFunction())
std::swap(new_arguments[0], new_arguments[1]);
if (!isColumnConst(*block.getByPosition(new_arguments[1]).column))
throw Exception{"Illegal column " + block.getByPosition(new_arguments[1]).column->getName()
if (!isColumnConst(*block[new_arguments[1]].column))
throw Exception{"Illegal column " + block[new_arguments[1]].column->getName()
+ " of argument of aggregation state multiply. Should be integer constant", ErrorCodes::ILLEGAL_COLUMN};
const IColumn & agg_state_column = *block.getByPosition(new_arguments[0]).column;
const IColumn & agg_state_column = *block[new_arguments[0]].column;
bool agg_state_is_const = isColumnConst(agg_state_column);
const ColumnAggregateFunction & column = typeid_cast<const ColumnAggregateFunction &>(
agg_state_is_const ? assert_cast<const ColumnConst &>(agg_state_column).getDataColumn() : agg_state_column);
@ -647,7 +647,7 @@ class FunctionBinaryArithmetic : public IFunction
auto & vec_to = column_to->getData();
auto & vec_from = column_from->getData();
UInt64 m = typeid_cast<const ColumnConst *>(block.getByPosition(new_arguments[1]).column.get())->getValue<UInt64>();
UInt64 m = typeid_cast<const ColumnConst *>(block[new_arguments[1]].column.get())->getValue<UInt64>();
// Since we merge the function states by ourselves, we have to have an
// Arena for this. Pass it to the resulting column so that the arena
@ -674,16 +674,16 @@ class FunctionBinaryArithmetic : public IFunction
}
if (agg_state_is_const)
block.getByPosition(result).column = ColumnConst::create(std::move(column_to), input_rows_count);
block[result].column = ColumnConst::create(std::move(column_to), input_rows_count);
else
block.getByPosition(result).column = std::move(column_to);
block[result].column = std::move(column_to);
}
/// Merge two aggregation states together.
void executeAggregateAddition(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const
{
const IColumn & lhs_column = *block.getByPosition(arguments[0]).column;
const IColumn & rhs_column = *block.getByPosition(arguments[1]).column;
const IColumn & lhs_column = *block[arguments[0]].column;
const IColumn & rhs_column = *block[arguments[1]].column;
bool lhs_is_const = isColumnConst(lhs_column);
bool rhs_is_const = isColumnConst(rhs_column);
@ -707,9 +707,9 @@ class FunctionBinaryArithmetic : public IFunction
}
if (lhs_is_const && rhs_is_const)
block.getByPosition(result).column = ColumnConst::create(std::move(column_to), input_rows_count);
block[result].column = ColumnConst::create(std::move(column_to), input_rows_count);
else
block.getByPosition(result).column = std::move(column_to);
block[result].column = std::move(column_to);
}
void executeDateTimeIntervalPlusMinus(Block & block, const ColumnNumbers & arguments,
@ -718,19 +718,19 @@ class FunctionBinaryArithmetic : public IFunction
ColumnNumbers new_arguments = arguments;
/// Interval argument must be second.
if (WhichDataType(block.getByPosition(arguments[1]).type).isDateOrDateTime())
if (WhichDataType(block[arguments[1]].type).isDateOrDateTime())
std::swap(new_arguments[0], new_arguments[1]);
/// Change interval argument type to its representation
Block new_block = block;
new_block.getByPosition(new_arguments[1]).type = std::make_shared<DataTypeNumber<DataTypeInterval::FieldType>>();
new_block[new_arguments[1]].type = std::make_shared<DataTypeNumber<DataTypeInterval::FieldType>>();
ColumnsWithTypeAndName new_arguments_with_type_and_name =
{new_block.getByPosition(new_arguments[0]), new_block.getByPosition(new_arguments[1])};
{new_block[new_arguments[0]], new_block[new_arguments[1]]};
auto function = function_builder->build(new_arguments_with_type_and_name);
function->execute(new_block, new_arguments, result, input_rows_count);
block.getByPosition(result).column = new_block.getByPosition(result).column;
block[result].column = new_block[result].column;
}
public:
@ -855,8 +855,8 @@ public:
{
using OpImpl = FixedStringOperationImpl<Op<UInt8, UInt8>>;
auto col_left_raw = block.getByPosition(arguments[0]).column.get();
auto col_right_raw = block.getByPosition(arguments[1]).column.get();
auto col_left_raw = block[arguments[0]].column.get();
auto col_right_raw = block[arguments[1]].column.get();
if (auto col_left_const = checkAndGetColumnConst<ColumnFixedString>(col_left_raw))
{
if (auto col_right_const = checkAndGetColumnConst<ColumnFixedString>(col_right_raw))
@ -872,7 +872,7 @@ public:
col_right->getChars().data(),
out_chars.data(),
out_chars.size());
block.getByPosition(result).column = ColumnConst::create(std::move(col_res), block.rows());
block[result].column = ColumnConst::create(std::move(col_res), col_left_raw->size());
return true;
}
}
@ -922,7 +922,7 @@ public:
out_chars.size(),
col_left->getN());
}
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
return true;
}
return false;
@ -944,8 +944,8 @@ public:
using ColVecT1 = std::conditional_t<IsDecimalNumber<T1>, ColumnDecimal<T1>, ColumnVector<T1>>;
using ColVecResult = std::conditional_t<IsDecimalNumber<ResultType>, ColumnDecimal<ResultType>, ColumnVector<ResultType>>;
auto col_left_raw = block.getByPosition(arguments[0]).column.get();
auto col_right_raw = block.getByPosition(arguments[1]).column.get();
auto col_left_raw = block[arguments[0]].column.get();
auto col_right_raw = block[arguments[1]].column.get();
auto col_left_const = checkAndGetColumnConst<ColVecT0>(col_left_raw);
auto col_right_const = checkAndGetColumnConst<ColVecT1>(col_right_raw);
@ -981,14 +981,14 @@ public:
OpImplCheck::template constantConstant<dec_a, dec_b>(const_a, const_b, scale_a, scale_b) :
OpImpl::template constantConstant<dec_a, dec_b>(const_a, const_b, scale_a, scale_b);
block.getByPosition(result).column = ResultDataType(type.getPrecision(), type.getScale()).createColumnConst(
block[result].column = ResultDataType(type.getPrecision(), type.getScale()).createColumnConst(
col_left_const->size(), toField(res, type.getScale()));
return true;
}
col_res = ColVecResult::create(0, type.getScale());
auto & vec_res = col_res->getData();
vec_res.resize(block.rows());
vec_res.resize(col_left_raw->size());
if (col_left && col_right)
{
@ -1026,13 +1026,13 @@ public:
if (col_left_const && col_right_const)
{
auto res = OpImpl::constantConstant(col_left_const->template getValue<T0>(), col_right_const->template getValue<T1>());
block.getByPosition(result).column = ResultDataType().createColumnConst(col_left_const->size(), toField(res));
block[result].column = ResultDataType().createColumnConst(col_left_const->size(), toField(res));
return true;
}
col_res = ColVecResult::create();
auto & vec_res = col_res->getData();
vec_res.resize(block.rows());
vec_res.resize(col_left_raw->size());
if (col_left && col_right)
{
@ -1050,7 +1050,7 @@ public:
return false;
}
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
return true;
}
return false;
@ -1059,14 +1059,14 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const override
{
/// Special case when multiply aggregate function state
if (isAggregateMultiply(block.getByPosition(arguments[0]).type, block.getByPosition(arguments[1]).type))
if (isAggregateMultiply(block[arguments[0]].type, block[arguments[1]].type))
{
executeAggregateMultiply(block, arguments, result, input_rows_count);
return;
}
/// Special case - addition of two aggregate functions states
if (isAggregateAddition(block.getByPosition(arguments[0]).type, block.getByPosition(arguments[1]).type))
if (isAggregateAddition(block[arguments[0]].type, block[arguments[1]].type))
{
executeAggregateAddition(block, arguments, result, input_rows_count);
return;
@ -1074,14 +1074,14 @@ public:
/// Special case when the function is plus or minus, one of arguments is Date/DateTime and another is Interval.
if (auto function_builder
= getFunctionForIntervalArithmetic(block.getByPosition(arguments[0]).type, block.getByPosition(arguments[1]).type, context))
= getFunctionForIntervalArithmetic(block[arguments[0]].type, block[arguments[1]].type, context))
{
executeDateTimeIntervalPlusMinus(block, arguments, result, input_rows_count, function_builder);
return;
}
const auto & left_argument = block.getByPosition(arguments[0]);
const auto & right_argument = block.getByPosition(arguments[1]);
const auto & left_argument = block[arguments[0]];
const auto & right_argument = block[arguments[1]];
auto * left_generic = left_argument.type.get();
auto * right_generic = right_argument.type.get();
bool valid = castBothTypes(left_generic, right_generic, [&](const auto & left, const auto & right)
@ -1171,6 +1171,7 @@ class FunctionBinaryArithmeticWithConstants : public FunctionBinaryArithmetic<Op
public:
using Base = FunctionBinaryArithmetic<Op, Name, valid_on_default_arguments>;
using Monotonicity = typename Base::Monotonicity;
using Block = typename Base::Block;
static FunctionPtr create(
const ColumnWithTypeAndName & left_,
@ -1194,21 +1195,23 @@ public:
{
if (left.column && isColumnConst(*left.column) && arguments.size() == 1)
{
Block block_with_constant
ColumnsWithTypeAndName block_with_constant
= {{left.column->cloneResized(input_rows_count), left.type, left.name},
block.getByPosition(arguments[0]),
block.getByPosition(result)};
block[arguments[0]],
block[result]};
Base::executeImpl(block_with_constant, {0, 1}, 2, input_rows_count);
block.getByPosition(result) = block_with_constant.getByPosition(2);
block[result] = block_with_constant[2];
}
else if (right.column && isColumnConst(*right.column) && arguments.size() == 1)
{
Block block_with_constant
= {block.getByPosition(arguments[0]),
ColumnsWithTypeAndName block_with_constant
= {block[arguments[0]],
{right.column->cloneResized(input_rows_count), right.type, right.name},
block.getByPosition(result)};
block[result]};
Base::executeImpl(block_with_constant, {0, 1}, 2, input_rows_count);
block.getByPosition(result) = block_with_constant.getByPosition(2);
block[result] = block_with_constant[2];
}
else
Base::executeImpl(block, arguments, result, input_rows_count);
@ -1242,13 +1245,14 @@ public:
{
auto transform = [&](const Field & point)
{
Block block_with_constant
ColumnsWithTypeAndName block_with_constant
= {{left.column->cloneResized(1), left.type, left.name},
{right.type->createColumnConst(1, point), right.type, right.name},
{nullptr, return_type, ""}};
Base::executeImpl(block_with_constant, {0, 1}, 2, 1);
Field point_transformed;
block_with_constant.getByPosition(2).column->get(0, point_transformed);
block_with_constant[2].column->get(0, point_transformed);
return point_transformed;
};
transform(left_point);
@ -1277,13 +1281,14 @@ public:
{
auto transform = [&](const Field & point)
{
Block block_with_constant
ColumnsWithTypeAndName block_with_constant
= {{left.type->createColumnConst(1, point), left.type, left.name},
{right.column->cloneResized(1), right.type, right.name},
{nullptr, return_type, ""}};
Base::executeImpl(block_with_constant, {0, 1}, 2, 1);
Field point_transformed;
block_with_constant.getByPosition(2).column->get(0, point_transformed);
block_with_constant[2].column->get(0, point_transformed);
return point_transformed;
};

View File

@ -56,7 +56,7 @@ public:
void executeImpl(Block & block , const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) const override
{
const auto value_col = block.getByPosition(arguments.front()).column.get();
const auto value_col = block[arguments.front()].column.get();
if (!execute<UInt8>(block, arguments, result, value_col)
&& !execute<UInt16>(block, arguments, result, value_col)
@ -98,7 +98,7 @@ private:
out[i] = Impl::apply(val[i], mask[i]);
}
block.getByPosition(result).column = std::move(out_col);
block[result].column = std::move(out_col);
return true;
}
else if (const auto value_col_const = checkAndGetColumnConst<ColumnVector<T>>(value_col_untyped))
@ -110,7 +110,7 @@ private:
if (is_const)
{
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(size, toField(Impl::apply(val, const_mask)));
block[result].column = block[result].type->createColumnConst(size, toField(Impl::apply(val, const_mask)));
}
else
{
@ -122,7 +122,7 @@ private:
for (const auto i : ext::range(0, size))
out[i] = Impl::apply(val, mask[i]);
block.getByPosition(result).column = std::move(out_col);
block[result].column = std::move(out_col);
}
return true;
@ -139,7 +139,7 @@ private:
for (const auto i : ext::range(1, arguments.size()))
{
if (auto pos_col_const = checkAndGetColumnConst<ColumnVector<ValueType>>(block.getByPosition(arguments[i]).column.get()))
if (auto pos_col_const = checkAndGetColumnConst<ColumnVector<ValueType>>(block[arguments[i]].column.get()))
{
const auto pos = pos_col_const->getUInt(0);
if (pos < 8 * sizeof(ValueType))
@ -162,7 +162,7 @@ private:
for (const auto i : ext::range(1, arguments.size()))
{
const auto pos_col = block.getByPosition(arguments[i]).column.get();
const auto pos_col = block[arguments[i]].column.get();
if (!addToMaskImpl<UInt8>(mask, pos_col)
&& !addToMaskImpl<UInt16>(mask, pos_col)

View File

@ -1,3 +1,4 @@
#pragma once
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
@ -97,7 +98,7 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const override
{
const IDataType * from_type = block.getByPosition(arguments[0]).type.get();
const IDataType * from_type = block[arguments[0]].type.get();
WhichDataType which(from_type);
if (which.isDate())
@ -114,7 +115,7 @@ public:
}
else
throw Exception(
"Illegal type " + block.getByPosition(arguments[0]).type->getName() + " of argument of function " + getName(),
"Illegal type " + block[arguments[0]].type->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}

View File

@ -1,3 +1,4 @@
#pragma once
#include <common/DateLUTImpl.h>
#include <DataTypes/DataTypeDate.h>
@ -304,7 +305,7 @@ private:
template <typename FromDataType, typename ToDataType, typename Transform>
struct DateTimeAddIntervalImpl
{
static void execute(Transform transform, Block & block, const ColumnNumbers & arguments, size_t result)
static void execute(Transform transform, ColumnsWithTypeAndName & block, const ColumnNumbers & arguments, size_t result)
{
using FromValueType = typename FromDataType::FieldType;
using FromColumnType = typename FromDataType::ColumnType;
@ -314,14 +315,14 @@ struct DateTimeAddIntervalImpl
const DateLUTImpl & time_zone = extractTimeZoneFromFunctionArguments(block, arguments, 2, 0);
const ColumnPtr source_col = block.getByPosition(arguments[0]).column;
const ColumnPtr source_col = block[arguments[0]].column;
auto result_col = block.getByPosition(result).type->createColumn();
auto result_col = block[result].type->createColumn();
auto col_to = assert_cast<ToColumnType *>(result_col.get());
if (const auto * sources = checkAndGetColumn<FromColumnType>(source_col.get()))
{
const IColumn & delta_column = *block.getByPosition(arguments[1]).column;
const IColumn & delta_column = *block[arguments[1]].column;
if (const auto * delta_const_column = typeid_cast<const ColumnConst *>(&delta_column))
op.vectorConstant(sources->getData(), col_to->getData(), delta_const_column->getInt(0), time_zone);
@ -333,16 +334,16 @@ struct DateTimeAddIntervalImpl
op.constantVector(
sources_const->template getValue<FromValueType>(),
col_to->getData(),
*block.getByPosition(arguments[1]).column, time_zone);
*block[arguments[1]].column, time_zone);
}
else
{
throw Exception("Illegal column " + block.getByPosition(arguments[0]).column->getName()
throw Exception("Illegal column " + block[arguments[0]].column->getName()
+ " of first argument of function " + Transform::name,
ErrorCodes::ILLEGAL_COLUMN);
}
block.getByPosition(result).column = std::move(result_col);
block[result].column = std::move(result_col);
}
};
@ -464,7 +465,7 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) const override
{
const IDataType * from_type = block.getByPosition(arguments[0]).type.get();
const IDataType * from_type = block[arguments[0]].type.get();
WhichDataType which(from_type);
if (which.isDate())
@ -483,7 +484,7 @@ public:
Transform{datetime64_type->getScale()}, block, arguments, result);
}
else
throw Exception("Illegal type " + block.getByPosition(arguments[0]).type->getName() + " of first argument of function " + getName(),
throw Exception("Illegal type " + block[arguments[0]].type->getName() + " of first argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}
};

View File

@ -1,3 +1,4 @@
#pragma once
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Functions/IFunctionImpl.h>
@ -96,7 +97,7 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const override
{
const IDataType * from_type = block.getByPosition(arguments[0]).type.get();
const IDataType * from_type = block[arguments[0]].type.get();
WhichDataType which(from_type);
if (which.isDate())
@ -110,7 +111,7 @@ public:
DateTimeTransformImpl<DataTypeDateTime64, ToDataType, decltype(transformer)>::execute(block, arguments, result, input_rows_count, transformer);
}
else
throw Exception("Illegal type " + block.getByPosition(arguments[0]).type->getName() + " of argument of function " + getName(),
throw Exception("Illegal type " + block[arguments[0]].type->getName() + " of argument of function " + getName(),
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
}

View File

@ -36,7 +36,7 @@ public:
void executeImpl(Block & block, const ColumnNumbers &, size_t result, size_t input_rows_count) const override
{
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(
block[result].column = block[result].type->createColumnConst(
input_rows_count, getFQDNOrHostName())->convertToFullColumnIfConst();
}
};

View File

@ -2,6 +2,7 @@
#include <Functions/IFunctionAdaptors.h>
#include <Common/IFactoryWithAliases.h>
#include <Interpreters/Context.h>
#include <functional>
#include <memory>

View File

@ -51,14 +51,14 @@ Columns convertConstTupleToConstantElements(const ColumnConst & column)
}
static Block createBlockWithNestedColumnsImpl(const Block & block, const std::unordered_set<size_t> & args)
static ColumnsWithTypeAndName createBlockWithNestedColumnsImpl(const ColumnsWithTypeAndName & columns, const std::unordered_set<size_t> & args)
{
Block res;
size_t columns = block.columns();
ColumnsWithTypeAndName res;
size_t num_columns = columns.size();
for (size_t i = 0; i < columns; ++i)
for (size_t i = 0; i < num_columns; ++i)
{
const auto & col = block.getByPosition(i);
const auto & col = columns[i];
if (args.count(i) && col.type->isNullable())
{
@ -66,40 +66,40 @@ static Block createBlockWithNestedColumnsImpl(const Block & block, const std::un
if (!col.column)
{
res.insert({nullptr, nested_type, col.name});
res.emplace_back(ColumnWithTypeAndName{nullptr, nested_type, col.name});
}
else if (const auto * nullable = checkAndGetColumn<ColumnNullable>(*col.column))
{
const auto & nested_col = nullable->getNestedColumnPtr();
res.insert({nested_col, nested_type, col.name});
res.emplace_back(ColumnWithTypeAndName{nested_col, nested_type, col.name});
}
else if (const auto * const_column = checkAndGetColumn<ColumnConst>(*col.column))
{
const auto & nested_col = checkAndGetColumn<ColumnNullable>(const_column->getDataColumn())->getNestedColumnPtr();
res.insert({ ColumnConst::create(nested_col, col.column->size()), nested_type, col.name});
res.emplace_back(ColumnWithTypeAndName{ ColumnConst::create(nested_col, col.column->size()), nested_type, col.name});
}
else
throw Exception("Illegal column for DataTypeNullable", ErrorCodes::ILLEGAL_COLUMN);
}
else
res.insert(col);
res.emplace_back(col);
}
return res;
}
Block createBlockWithNestedColumns(const Block & block, const ColumnNumbers & args)
ColumnsWithTypeAndName createBlockWithNestedColumns(const ColumnsWithTypeAndName & columns, const ColumnNumbers & args)
{
std::unordered_set<size_t> args_set(args.begin(), args.end());
return createBlockWithNestedColumnsImpl(block, args_set);
return createBlockWithNestedColumnsImpl(columns, args_set);
}
Block createBlockWithNestedColumns(const Block & block, const ColumnNumbers & args, size_t result)
ColumnsWithTypeAndName createBlockWithNestedColumns(const ColumnsWithTypeAndName & columns, const ColumnNumbers & args, size_t result)
{
std::unordered_set<size_t> args_set(args.begin(), args.end());
args_set.insert(result);
return createBlockWithNestedColumnsImpl(block, args_set);
return createBlockWithNestedColumnsImpl(columns, args_set);
}
void validateArgumentType(const IFunction & func, const DataTypes & arguments,

View File

@ -85,10 +85,10 @@ Columns convertConstTupleToConstantElements(const ColumnConst & column);
/// Returns the copy of a given block in which each column specified in
/// the "arguments" parameter is replaced with its respective nested
/// column if it is nullable.
Block createBlockWithNestedColumns(const Block & block, const ColumnNumbers & args);
ColumnsWithTypeAndName createBlockWithNestedColumns(const ColumnsWithTypeAndName & columns, const ColumnNumbers & args);
/// Similar function as above. Additionally transform the result type if needed.
Block createBlockWithNestedColumns(const Block & block, const ColumnNumbers & args, size_t result);
ColumnsWithTypeAndName createBlockWithNestedColumns(const ColumnsWithTypeAndName & columns, const ColumnNumbers & args, size_t result);
/// Checks argument type at specified index with predicate.
/// throws if there is no argument at specified index or if predicate returns false.

View File

@ -19,19 +19,19 @@ namespace ErrorCodes
template <bool or_null>
void ExecutableFunctionJoinGet<or_null>::execute(Block & block, const ColumnNumbers & arguments, size_t result, size_t)
{
Block keys;
ColumnsWithTypeAndName keys;
for (size_t i = 2; i < arguments.size(); ++i)
{
auto key = block.getByPosition(arguments[i]);
keys.insert(std::move(key));
auto key = block[arguments[i]];
keys.emplace_back(std::move(key));
}
block.getByPosition(result) = join->joinGet(keys, result_block);
block[result] = join->joinGet(keys, result_block);
}
template <bool or_null>
ExecutableFunctionImplPtr FunctionJoinGet<or_null>::prepare(const Block &, const ColumnNumbers &, size_t) const
{
return std::make_unique<ExecutableFunctionJoinGet<or_null>>(join, Block{{return_type->createColumn(), return_type, attr_name}});
return std::make_unique<ExecutableFunctionJoinGet<or_null>>(join, DB::Block{{return_type->createColumn(), return_type, attr_name}});
}
static auto getJoin(const ColumnsWithTypeAndName & arguments, const Context & context)

View File

@ -1,6 +1,8 @@
#pragma once
#include <Functions/IFunctionImpl.h>
#include <Storages/IStorage_fwd.h>
#include <Storages/TableLockHolder.h>
#include <Core/Block.h>
namespace DB
{
@ -13,7 +15,7 @@ template <bool or_null>
class ExecutableFunctionJoinGet final : public IExecutableFunctionImpl
{
public:
ExecutableFunctionJoinGet(HashJoinPtr join_, const Block & result_block_)
ExecutableFunctionJoinGet(HashJoinPtr join_, const DB::Block & result_block_)
: join(std::move(join_)), result_block(result_block_) {}
static constexpr auto name = or_null ? "joinGetOrNull" : "joinGet";
@ -28,7 +30,7 @@ public:
private:
HashJoinPtr join;
Block result_block;
DB::Block result_block;
};
template <bool or_null>

View File

@ -95,7 +95,7 @@ private:
memcpy(&dst_data[rows_size], dst_remaining, rows_remaining * sizeof(Float64));
}
block.getByPosition(result).column = std::move(dst);
block[result].column = std::move(dst);
return true;
}
@ -157,7 +157,7 @@ private:
memcpy(&dst_data[rows_size], dst_remaining, rows_remaining * sizeof(Float64));
}
block.getByPosition(result).column = std::move(dst);
block[result].column = std::move(dst);
return true;
}
if (const auto right_arg_typed = checkAndGetColumnConst<ColumnVector<RightType>>(right_arg))
@ -200,7 +200,7 @@ private:
memcpy(&dst_data[rows_size], dst_remaining, rows_remaining * sizeof(Float64));
}
block.getByPosition(result).column = std::move(dst);
block[result].column = std::move(dst);
return true;
}
@ -209,8 +209,8 @@ private:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) const override
{
const ColumnWithTypeAndName & col_left = block.getByPosition(arguments[0]);
const ColumnWithTypeAndName & col_right = block.getByPosition(arguments[1]);
const ColumnWithTypeAndName & col_left = block[arguments[0]];
const ColumnWithTypeAndName & col_right = block[arguments[1]];
auto call = [&](const auto & types) -> bool
{

View File

@ -27,7 +27,7 @@ private:
void executeImpl(Block & block, const ColumnNumbers &, size_t result, size_t input_rows_count) const override
{
block.getByPosition(result).column = block.getByPosition(result).type->createColumnConst(input_rows_count, Impl::value);
block[result].column = block[result].type->createColumnConst(input_rows_count, Impl::value);
}
};

View File

@ -124,7 +124,7 @@ private:
executeInIterations(src_data.data(), dst_data.data(), size);
block.getByPosition(result).column = std::move(dst);
block[result].column = std::move(dst);
return true;
}
@ -144,7 +144,7 @@ private:
executeInIterations(dst_data.data(), dst_data.data(), size);
block.getByPosition(result).column = std::move(dst);
block[result].column = std::move(dst);
return true;
}
@ -152,7 +152,7 @@ private:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) const override
{
const ColumnWithTypeAndName & col = block.getByPosition(arguments[0]);
const ColumnWithTypeAndName & col = block[arguments[0]];
auto call = [&](const auto & types) -> bool
{

View File

@ -1,3 +1,4 @@
#pragma once
#include <Functions/IFunctionImpl.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/DataTypesNumber.h>
@ -47,7 +48,7 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t /*input_rows_count*/) const override
{
const auto in = block.getByPosition(arguments.front()).column.get();
const auto in = block[arguments.front()].column.get();
if ( !execute<UInt8>(block, in, result)
&& !execute<UInt16>(block, in, result)
@ -77,7 +78,7 @@ public:
for (const auto i : ext::range(0, size))
out_data[i] = Impl::execute(in_data[i]);
block.getByPosition(result).column = std::move(out);
block[result].column = std::move(out);
return true;
}

View File

@ -1,3 +1,4 @@
#pragma once
#include <Functions/FunctionHelpers.h>
#include <Functions/GatherUtils/GatherUtils.h>
#include <Functions/GatherUtils/Sources.h>
@ -64,8 +65,8 @@ public:
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const override
{
const IColumn * haystack_column = block.getByPosition(arguments[0]).column.get();
const IColumn * needle_column = block.getByPosition(arguments[1]).column.get();
const IColumn * haystack_column = block[arguments[0]].column.get();
const IColumn * needle_column = block[arguments[1]].column.get();
auto col_res = ColumnVector<UInt8>::create();
typename ColumnVector<UInt8>::Container & vec_res = col_res->getData();
@ -83,7 +84,7 @@ public:
else
throw Exception("Illegal combination of columns as arguments of function " + getName(), ErrorCodes::ILLEGAL_COLUMN);
block.getByPosition(result).column = std::move(col_res);
block[result].column = std::move(col_res);
}
private:
@ -158,7 +159,7 @@ public:
#endif
}
void executeImpl(Block & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const override
void executeImpl(ColumnsWithTypeAndName & block, const ColumnNumbers & arguments, size_t result, size_t input_rows_count) const override
{
selector.selectAndExecute(block, arguments, result, input_rows_count);
}

Some files were not shown because too many files have changed in this diff Show More