Merge branch 'master' into async_hdfs_read_buffer

This commit is contained in:
taiyang-li 2022-06-03 17:52:09 +08:00
commit f202c35311
301 changed files with 4889 additions and 2079 deletions

View File

@ -36,7 +36,7 @@ set_property(GLOBAL PROPERTY USE_FOLDERS ON)
# Check that submodules are present
if (NOT EXISTS "${ClickHouse_SOURCE_DIR}/contrib/sysroot/README.md")
message (FATAL_ERROR "Submodules are not initialized. Run\n\tgit submodule update --init --recursive")
message (FATAL_ERROR "Submodules are not initialized. Run\n\tgit submodule update --init")
endif ()
# Take care to add prlimit in command line before ccache, or else ccache thinks that

View File

@ -17,15 +17,12 @@ set (SRCS
sleep.cpp
terminalColors.cpp
errnoToString.cpp
ReplxxLineReader.cpp
StringRef.cpp
safeExit.cpp
throwError.cpp
)
if (ENABLE_REPLXX)
list (APPEND SRCS ReplxxLineReader.cpp)
endif ()
if (USE_DEBUG_HELPERS)
get_target_property(MAGIC_ENUM_INCLUDE_DIR ch_contrib::magic_enum INTERFACE_INCLUDE_DIRECTORIES)
# CMake generator expression will do insane quoting when it encounters special character like quotes, spaces, etc.

View File

@ -1,7 +1,4 @@
set(ABSL_ROOT_DIR "${ClickHouse_SOURCE_DIR}/contrib/abseil-cpp")
if(NOT EXISTS "${ABSL_ROOT_DIR}/CMakeLists.txt")
message(FATAL_ERROR " submodule third_party/abseil-cpp is missing. To fix try run: \n git submodule update --init --recursive")
endif()
set(BUILD_TESTING OFF)
set(ABSL_PROPAGATE_CXX_STD ON)
add_subdirectory("${ABSL_ROOT_DIR}" "${ClickHouse_BINARY_DIR}/contrib/abseil-cpp")

View File

@ -5,6 +5,12 @@ if (NOT ENABLE_AMQPCPP)
return()
endif()
# can be removed once libuv build on MacOS with GCC is possible
if (NOT TARGET ch_contrib::uv)
message(STATUS "Not using AMQP-CPP because libuv is disabled")
return()
endif()
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/AMQP-CPP")
set (SRCS
@ -32,21 +38,6 @@ set (SRCS
add_library(_amqp-cpp ${SRCS})
target_compile_options (_amqp-cpp
PRIVATE
-Wno-old-style-cast
-Wno-inconsistent-missing-destructor-override
-Wno-deprecated
-Wno-unused-parameter
-Wno-shadow
-Wno-tautological-type-limit-compare
-Wno-extra-semi
# NOTE: disable all warnings at last because the warning:
# "conversion function converting 'XXX' to itself will never be used"
# doesn't have it's own diagnostic flag yet.
-w
)
target_include_directories (_amqp-cpp SYSTEM BEFORE PUBLIC "${LIBRARY_DIR}/include" "${LIBRARY_DIR}")
target_link_libraries (_amqp-cpp PUBLIC OpenSSL::Crypto OpenSSL::SSL ch_contrib::uv)
add_library (ch_contrib::amqp_cpp ALIAS _amqp-cpp)

View File

@ -20,7 +20,7 @@ endif()
option (ENABLE_PARQUET "Enable parquet" ${ENABLE_PARQUET_DEFAULT})
if (NOT ENABLE_PARQUET)
message(STATUS "Building without Parquet support")
message(STATUS "Not using parquet")
return()
endif()

View File

@ -60,14 +60,6 @@ target_compile_definitions (_avrocpp PUBLIC SNAPPY_CODEC_AVAILABLE)
target_include_directories (_avrocpp PRIVATE ${SNAPPY_INCLUDE_DIR})
target_link_libraries (_avrocpp PRIVATE ch_contrib::snappy)
if (COMPILER_GCC)
set (SUPPRESS_WARNINGS -Wno-non-virtual-dtor)
elseif (COMPILER_CLANG)
set (SUPPRESS_WARNINGS -Wno-non-virtual-dtor)
endif ()
target_compile_options(_avrocpp PRIVATE ${SUPPRESS_WARNINGS})
# create a symlink to include headers with <avro/...>
set(AVRO_INCLUDE_DIR "${CMAKE_CURRENT_BINARY_DIR}/include")
ADD_CUSTOM_TARGET(avro_symlink_headers ALL

View File

@ -52,20 +52,6 @@ include("${AZURE_DIR}/cmake-modules/AzureTransportAdapters.cmake")
add_library(_azure_sdk ${AZURE_SDK_UNIFIED_SRC})
if (COMPILER_CLANG)
target_compile_options(_azure_sdk PRIVATE
-Wno-deprecated-copy-dtor
-Wno-extra-semi
-Wno-suggest-destructor-override
-Wno-inconsistent-missing-destructor-override
-Wno-error=unknown-warning-option
)
if (CMAKE_CXX_COMPILER_VERSION VERSION_GREATER_EQUAL 13)
target_compile_options(_azure_sdk PRIVATE -Wno-reserved-identifier)
endif()
endif()
# Originally, on Windows azure-core is built with bcrypt and crypt32 by default
if (TARGET OpenSSL::SSL)
target_link_libraries(_azure_sdk PRIVATE OpenSSL::Crypto OpenSSL::SSL)

View File

@ -1,7 +1,12 @@
# Needed for:
# - securely connecting to an external server, e.g. clickhouse-client --host ... --secure
# - lots of thirdparty libraries
option(ENABLE_SSL "Enable ssl" ${ENABLE_LIBRARIES})
# Actually, so many 3rd party libraries + unit tests need SSL that we cannot disable it
# without breaking the build ...
option(ENABLE_SSL "Enable ssl" ON) # breaks if OFF
# TODO: Making SSL dependent on ENABLE_LIBRARIES is desirable but needs fixing dependent libs + tests.
# option(ENABLE_SSL "Enable ssl" ${ENABLE_LIBRARIES})
if(NOT ENABLE_SSL)
message(STATUS "Not using openssl")

View File

@ -45,7 +45,4 @@ add_library(ch_contrib::brotli ALIAS _brotli)
target_include_directories(_brotli SYSTEM BEFORE PUBLIC "${BROTLI_SOURCE_DIR}/include")
if(M_LIBRARY)
target_link_libraries(_brotli PRIVATE ${M_LIBRARY})
endif()
target_compile_definitions(_brotli PRIVATE BROTLI_BUILD_PORTABLE=1)

View File

@ -1,6 +1,6 @@
option(ENABLE_BZIP2 "Enable bzip2 compression support" ${ENABLE_LIBRARIES})
if (NOT ENABLE_BZIP2)
message (STATUS "bzip2 compression disabled")
message (STATUS "Not using bzip2")
return()
endif()
@ -26,8 +26,4 @@ configure_file (
add_library(_bzip2 ${SRCS})
add_library(ch_contrib::bzip2 ALIAS _bzip2)
# To avoid -Wreserved-id-macro we use SYSTEM:
#
# clickhouse/contrib/bzip2/bzlib.h:23:9: error: macro name is a reserved identifier [-Werror,-Wreserved-id-macro]
# #define _BZLIB_H
target_include_directories(_bzip2 SYSTEM BEFORE PUBLIC "${BZIP2_SOURCE_DIR}" "${BZIP2_BINARY_DIR}")

View File

@ -81,16 +81,12 @@ set (CAPNPC_SRCS
add_library(_capnpc ${CAPNPC_SRCS})
target_link_libraries(_capnpc PUBLIC _capnp)
# The library has substandard code
if (COMPILER_GCC)
set (SUPPRESS_WARNINGS -w)
elseif (COMPILER_CLANG)
set (SUPPRESS_WARNINGS -w)
if (COMPILER_CLANG)
set (CAPNP_PRIVATE_CXX_FLAGS -fno-char8_t)
endif ()
target_compile_options(_kj PRIVATE ${SUPPRESS_WARNINGS} ${CAPNP_PRIVATE_CXX_FLAGS})
target_compile_options(_capnp PRIVATE ${SUPPRESS_WARNINGS} ${CAPNP_PRIVATE_CXX_FLAGS})
target_compile_options(_capnpc PRIVATE ${SUPPRESS_WARNINGS} ${CAPNP_PRIVATE_CXX_FLAGS})
target_compile_options(_kj PRIVATE ${CAPNP_PRIVATE_CXX_FLAGS})
target_compile_options(_capnp PRIVATE ${CAPNP_PRIVATE_CXX_FLAGS})
target_compile_options(_capnpc PRIVATE ${CAPNP_PRIVATE_CXX_FLAGS})
add_library(ch_contrib::capnp ALIAS _capnpc)

View File

@ -5,6 +5,12 @@ if (NOT ENABLE_CASSANDRA)
return()
endif()
# can be removed once libuv build on MacOS with GCC is possible
if (NOT TARGET ch_contrib::uv)
message(STATUS "Not using cassandra because libuv is disabled")
return()
endif()
if (APPLE)
set(CMAKE_MACOSX_RPATH ON)
endif()

View File

@ -1,5 +1,5 @@
if (NOT ENABLE_KAFKA)
message(STATUS "Not using librdkafka (skip cppkafka)")
message(STATUS "Not using kafka")
return()
endif()

View File

@ -5,7 +5,7 @@ elseif(ENABLE_FASTOPS)
endif()
if(NOT ENABLE_FASTOPS)
message(STATUS "Not using fast vectorized mathematical functions library by Mikhail Parakhin")
message(STATUS "Not using fastops")
return()
endif()

View File

@ -1,22 +1,24 @@
set(FMT_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/fmtlib")
set (SRCS
# NOTE: do not build module for now:
# ../fmtlib/src/fmt.cc
../fmtlib/src/format.cc
../fmtlib/src/os.cc
${FMT_SOURCE_DIR}/src/format.cc
${FMT_SOURCE_DIR}/src/os.cc
../fmtlib/include/fmt/args.h
../fmtlib/include/fmt/chrono.h
../fmtlib/include/fmt/color.h
../fmtlib/include/fmt/compile.h
../fmtlib/include/fmt/core.h
../fmtlib/include/fmt/format.h
../fmtlib/include/fmt/format-inl.h
../fmtlib/include/fmt/locale.h
../fmtlib/include/fmt/os.h
../fmtlib/include/fmt/ostream.h
../fmtlib/include/fmt/printf.h
../fmtlib/include/fmt/ranges.h
../fmtlib/include/fmt/xchar.h
${FMT_SOURCE_DIR}/include/fmt/args.h
${FMT_SOURCE_DIR}/include/fmt/chrono.h
${FMT_SOURCE_DIR}/include/fmt/color.h
${FMT_SOURCE_DIR}/include/fmt/compile.h
${FMT_SOURCE_DIR}/include/fmt/core.h
${FMT_SOURCE_DIR}/include/fmt/format.h
${FMT_SOURCE_DIR}/include/fmt/format-inl.h
${FMT_SOURCE_DIR}/include/fmt/locale.h
${FMT_SOURCE_DIR}/include/fmt/os.h
${FMT_SOURCE_DIR}/include/fmt/ostream.h
${FMT_SOURCE_DIR}/include/fmt/printf.h
${FMT_SOURCE_DIR}/include/fmt/ranges.h
${FMT_SOURCE_DIR}/include/fmt/xchar.h
)
add_library(_fmt ${SRCS})

View File

@ -34,8 +34,5 @@ add_library(_h3 ${SRCS})
target_include_directories(_h3 SYSTEM PUBLIC "${H3_SOURCE_DIR}/include")
target_include_directories(_h3 SYSTEM PUBLIC "${H3_BINARY_DIR}/include")
target_compile_definitions(_h3 PRIVATE H3_HAVE_VLA)
if(M_LIBRARY)
target_link_libraries(_h3 PRIVATE ${M_LIBRARY})
endif()
add_library(ch_contrib::h3 ALIAS _h3)

View File

@ -5,7 +5,7 @@ elseif(ENABLE_HIVE)
endif()
if (NOT ENABLE_HIVE)
message("Hive disabled")
message(STATUS "Not using hive")
return()
endif()

View File

@ -481,10 +481,6 @@ target_include_directories(_icui18n SYSTEM PUBLIC "${ICU_SOURCE_DIR}/i18n/")
target_compile_definitions(_icuuc PRIVATE -DU_COMMON_IMPLEMENTATION)
target_compile_definitions(_icui18n PRIVATE -DU_I18N_IMPLEMENTATION)
if (COMPILER_CLANG)
target_compile_options(_icudata PRIVATE -Wno-unused-command-line-argument)
endif ()
add_library(_icu INTERFACE)
target_link_libraries(_icu INTERFACE _icui18n _icuuc _icudata)
add_library(ch_contrib::icu ALIAS _icu)

View File

@ -180,7 +180,6 @@ if (USE_UNWIND)
target_link_libraries (_jemalloc PRIVATE unwind)
endif ()
target_compile_options(_jemalloc PRIVATE -Wno-redundant-decls)
# for RTLD_NEXT
target_compile_options(_jemalloc PRIVATE -D_GNU_SOURCE)

View File

@ -6,7 +6,7 @@ elseif(ENABLE_CPUID)
endif()
if (NOT ENABLE_CPUID)
message("Not using cpuid")
message(STATUS "Not using cpuid")
return()
endif()
@ -27,8 +27,5 @@ add_library (_cpuid ${SRCS})
target_include_directories (_cpuid SYSTEM PUBLIC "${LIBRARY_DIR}")
target_compile_definitions (_cpuid PRIVATE VERSION="v0.4.1")
if (COMPILER_CLANG)
target_compile_options (_cpuid PRIVATE -Wno-reserved-id-macro)
endif ()
add_library(ch_contrib::cpuid ALIAS _cpuid)

View File

@ -1,7 +1,7 @@
option(ENABLE_GSASL_LIBRARY "Enable gsasl library" ${ENABLE_LIBRARIES})
if (NOT ENABLE_GSASL_LIBRARY)
message(STATUS "Not using gsasl library")
message(STATUS "Not using gsasl")
return()
endif()

View File

@ -1,3 +1,4 @@
# once fixed, please remove similar places in CMakeLists of libuv users (search "ch_contrib::uv")
if (OS_DARWIN AND COMPILER_GCC)
message (WARNING "libuv cannot be built with GCC in macOS due to a bug: https://gcc.gnu.org/bugzilla/show_bug.cgi?id=93082")
return()

View File

@ -53,9 +53,6 @@ set(SRCS
add_library(_libxml2 ${SRCS})
target_link_libraries(_libxml2 PRIVATE ch_contrib::zlib)
if(M_LIBRARY)
target_link_libraries(_libxml2 PRIVATE ${M_LIBRARY})
endif()
target_include_directories(_libxml2 BEFORE PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/linux_x86_64/include")
target_include_directories(_libxml2 BEFORE PUBLIC "${LIBXML2_SOURCE_DIR}/include")

View File

@ -1,6 +1,6 @@
option(ENABLE_MINIZIP "Enable minizip-ng the zip manipulation library" ${ENABLE_LIBRARIES})
if (NOT ENABLE_MINIZIP)
message (STATUS "minizip-ng disabled")
message (STATUS "Not using minizip-ng")
return()
endif()

View File

@ -2,12 +2,12 @@ if (NOT ENABLE_ODBC)
return ()
endif ()
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/nanodbc")
if (NOT TARGET ch_contrib::unixodbc)
message(FATAL_ERROR "Configuration error: unixodbc is not a target")
endif()
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/nanodbc")
set (SRCS
"${LIBRARY_DIR}/nanodbc/nanodbc.cpp"
)

View File

@ -1,10 +1,3 @@
option (ENABLE_REPLXX "Enable replxx support" ${ENABLE_LIBRARIES})
if (NOT ENABLE_REPLXX)
message (STATUS "Not using replxx")
return()
endif()
set (LIBRARY_DIR "${ClickHouse_SOURCE_DIR}/contrib/replxx")
set(SRCS
@ -22,9 +15,4 @@ set(SRCS
add_library (_replxx ${SRCS})
target_include_directories(_replxx SYSTEM PUBLIC "${LIBRARY_DIR}/include")
if (COMPILER_CLANG)
target_compile_options(_replxx PRIVATE -Wno-documentation)
endif ()
add_library(ch_contrib::replxx ALIAS _replxx)

View File

@ -149,7 +149,3 @@ target_link_libraries(_s2 PRIVATE
target_include_directories(_s2 SYSTEM BEFORE PUBLIC "${S2_SOURCE_DIR}/")
target_include_directories(_s2 SYSTEM PUBLIC "${ABSL_SOURCE_DIR}")
if(M_LIBRARY)
target_link_libraries(_s2 PRIVATE ${M_LIBRARY})
endif()

View File

@ -1,7 +1,7 @@
option(ENABLE_THRIFT "Enable Thrift" ${ENABLE_LIBRARIES})
if (NOT ENABLE_THRIFT)
message (STATUS "thrift disabled")
message (STATUS "Not using thrift")
return()
endif()

View File

@ -294,14 +294,6 @@ target_include_directories (_unixodbc
"${LIBRARY_DIR}/include"
)
target_compile_definitions (_unixodbc PRIVATE -DHAVE_CONFIG_H)
target_compile_options (_unixodbc
PRIVATE
-Wno-dangling-else
-Wno-parentheses
-Wno-misleading-indentation
-Wno-unknown-warning-option
-Wno-reserved-id-macro
-O2
)
target_compile_options (_unixodbc PRIVATE -O2) # intended?
add_library (ch_contrib::unixodbc ALIAS _unixodbc)

View File

@ -92,8 +92,6 @@ function run_tests()
if [[ -n "$USE_DATABASE_REPLICATED" ]] && [[ "$USE_DATABASE_REPLICATED" -eq 1 ]]; then
ADDITIONAL_OPTIONS+=('--replicated-database')
# Cannot be used with replicated database, due to distributed_ddl_output_mode=none
ADDITIONAL_OPTIONS+=('--no-left-queries-check')
ADDITIONAL_OPTIONS+=('--jobs')
ADDITIONAL_OPTIONS+=('2')
else

View File

@ -81,7 +81,7 @@ $ ./src/unit_tests_dbms --gtest_filter=LocalAddress*
## Performance Tests {#performance-tests}
Performance tests allow to measure and compare performance of some isolated part of ClickHouse on synthetic queries. Tests are located at `tests/performance`. Each test is represented by `.xml` file with description of test case. Tests are run with `docker/tests/performance-comparison` tool . See the readme file for invocation.
Performance tests allow to measure and compare performance of some isolated part of ClickHouse on synthetic queries. Tests are located at `tests/performance`. Each test is represented by `.xml` file with description of test case. Tests are run with `docker/test/performance-comparison` tool . See the readme file for invocation.
Each test run one or multiple queries (possibly with combinations of parameters) in a loop.

View File

@ -149,6 +149,8 @@ Each element of [Nested](../sql-reference/data-types/nested-data-structures/nest
In input data, ENUM values can be represented as names or as ids. First, we try to match the input value to the ENUM name. If we fail and the input value is a number, we try to match this number to ENUM id.
If input data contains only ENUM ids, it's recommended to enable the setting [input_format_tsv_enum_as_number](../operations/settings/settings.md#settings-input_format_tsv_enum_as_number) to optimize ENUM parsing.
While importing data, you can skip some first rows using setting [input_format_tsv_skip_first_lines](../operations/settings/settings.md#settings-input_format_tsv_skip_first_lines)
For example:
``` sql
@ -429,6 +431,8 @@ If input data contains only ENUM ids, it's recommended to enable the setting [in
The CSV format supports the output of totals and extremes the same way as `TabSeparated`.
While importing data, you can skip some first rows using setting [input_format_csv_skip_first_lines](../operations/settings/settings.md#settings-input_format_csv_skip_first_lines)
## CSVWithNames {#csvwithnames}
Also prints the header row with column names, similar to [TabSeparatedWithNames](#tabseparatedwithnames).

View File

@ -5,7 +5,7 @@ sidebar_label: Caches
# Cache Types {#cache-types}
When performing queries, ClichHouse uses different caches.
When performing queries, ClickHouse uses different caches.
Main cache types:

View File

@ -1745,13 +1745,3 @@ Possible values:
- Positive integer.
Default value: `10000`.
## global_memory_usage_overcommit_max_wait_microseconds {#global_memory_usage_overcommit_max_wait_microseconds}
Sets maximum waiting time for global overcommit tracker.
Possible values:
- Positive integer.
Default value: `200`.

View File

@ -521,6 +521,18 @@ Result:
└─────┴────────┘
```
## input_format_tsv_skip_first_lines {#settings-input_format_tsv_skip_first_lines}
The number of lines to skip at the beginning of data in TSV input format.
Default value: `0`.
## input_format_csv_skip_first_lines {#settings-input_format_csv_skip_first_lines}
The number of lines to skip at the beginning of data in CSV input format.
Default value: `0`.
## input_format_null_as_default {#settings-input-format-null-as-default}
Enables or disables the initialization of [NULL](../../sql-reference/syntax.md#null-literal) fields with [default values](../../sql-reference/statements/create/table.md#create-default-values), if data type of these fields is not [nullable](../../sql-reference/data-types/nullable.md#data_type-nullable).
@ -4279,7 +4291,7 @@ Maximum time thread will wait for memory to be freed in the case of memory overc
If the timeout is reached and memory is not freed, an exception is thrown.
Read more about [memory overcommit](memory-overcommit.md).
Default value: `200`.
Default value: `5000000`.
## memory_overcommit_ratio_denominator_for_user

View File

@ -14,6 +14,11 @@ The `system.part_log` table contains the following columns:
- `REMOVE_PART` — Removing or detaching a data part using [DETACH PARTITION](../../sql-reference/statements/alter/partition.md#alter_detach-partition).
- `MUTATE_PART` — Mutating of a data part.
- `MOVE_PART` — Moving the data part from the one disk to another one.
- `merge_reason` ([Enum8](../../sql-reference/data-types/enum.md)) — The reason for the event with type `MERGE_PARTS`. Can have one of the following values:
- `NOT_A_MERGE` — The current event has the type other than `MERGE_PARTS`.
- `REGULAR_MERGE` — Some regular merge.
- `TTL_DELETE_MERGE` — Cleaning up expired data.
- `TTL_RECOMPRESS_MERGE` — Recompressing data part with the.
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — Event date.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — Event time.
- `event_time_microseconds` ([DateTime64](../../sql-reference/data-types/datetime64.md)) — Event time with microseconds precision.
@ -46,6 +51,7 @@ Row 1:
──────
query_id: 983ad9c7-28d5-4ae1-844e-603116b7de31
event_type: NewPart
merge_reason: NotAMerge
event_date: 2021-02-02
event_time: 2021-02-02 11:14:28
event_time_microseconds: 2021-02-02 11:14:28.861919

View File

@ -71,7 +71,7 @@ INSERT INTO [db.]table [(c1, c2, c3)] FORMAT format_name data_set
INSERT INTO [db.]table [(c1, c2, c3)] FORMAT Values (v11, v12, v13), (v21, v22, v23), ...
```
ClickHouse会清除数据前所有的空白字符与一行摘要信息(如果需要的话)。所以在进行查询时,我们建议您将数据放入到输入输出格式名称后的新的一行中去(如果数据是以空白字符开始的,这将非常重要)。
ClickHouse会清除数据前所有的空白字符与一个换行符(如果有换行符的话)。所以在进行查询时,我们建议您将数据放入到输入输出格式名称后的新的一行中去(如果数据是以空白字符开始的,这将非常重要)。
示例:
@ -83,6 +83,10 @@ INSERT INTO t FORMAT TabSeparated
在使用命令行客户端或HTTP客户端时你可以将具体的查询语句与数据分开发送。更多具体信息请参考«[客户端](../../interfaces/index.md#interfaces)»部分。
### 限制 {#constraints}
如果表中有一些[限制](../../sql-reference/statements/create/table.md#constraints),,数据插入时会逐行进行数据校验,如果这里面包含了不符合限制条件的数据,服务将会抛出包含限制信息的异常,这个语句也会被停止执行。
### 使用`SELECT`的结果写入 {#insert_query_insert-select}
``` sql
@ -96,6 +100,66 @@ INSERT INTO [db.]table [(c1, c2, c3)] SELECT ...
系统不支持的其他用于修改数据的查询:`UPDATE`, `DELETE`, `REPLACE`, `MERGE`, `UPSERT`, `INSERT UPDATE`
但是,您可以使用 `ALTER TABLE ... DROP PARTITION`查询来删除一些旧的数据。
如果 `SELECT` 查询中包含了 [input()](../../sql-reference/table-functions/input.md) 函数,那么 `FORMAT` 必须出现在查询语句的最后。
如果某一列限制了值不能是NULL那么插入NULL的时候就会插入这个列类型的默认数据可以通过设置 [insert_null_as_default](../../operations/settings/settings.md#insert_null_as_default) 插入NULL。
### 从文件向表中插入数据 {#inserting-data-from-a-file}
``` sql
INSERT INTO [db.]table [(c1, c2, c3)] FROM INFILE file_name [COMPRESSION type] FORMAT format_name
```
使用上面的语句可以从客户端的文件上读取数据并插入表中,`file_name` 和 `type` 都是 `String` 类型,输入文件的[格式](../../interfaces/formats.md) 一定要在 `FORMAT` 语句中设置。
支持读取压缩文件。默认会去读文件的拓展名作为文件的压缩方式,或者也可以在 `COMPRESSION` 语句中指明,支持的文件压缩格式如下:`'none'` `'gzip'` `'deflate'` `'br'` `'xz'` `'zstd'` `'lz4'` `'bz2'`
这个功能在 [command-line client](../../interfaces/cli.md) 和 [clickhouse-local](../../operations/utilities/clickhouse-local.md) 是可用的。
**样例**
```bash
echo 1,A > input.csv ; echo 2,B >> input.csv
clickhouse-client --query="CREATE TABLE table_from_file (id UInt32, text String) ENGINE=MergeTree() ORDER BY id;"
clickhouse-client --query="INSERT INTO table_from_file FROM INFILE 'input.csv' FORMAT CSV;"
clickhouse-client --query="SELECT * FROM table_from_file FORMAT PrettyCompact;"
```
结果:
```text
┌─id─┬─text─┐
│ 1 │ A │
│ 2 │ B │
└────┴──────┘
```
### 插入表函数 {#inserting-into-table-function}
数据可以通过 [table functions](../../sql-reference/table-functions/index.md) 方法插入。
``` sql
INSERT INTO [TABLE] FUNCTION table_func ...
```
**例如**
可以这样使用[remote](../../sql-reference/table-functions/index.md#remote) 表函数:
``` sql
CREATE TABLE simple_table (id UInt32, text String) ENGINE=MergeTree() ORDER BY id;
INSERT INTO TABLE FUNCTION remote('localhost', default.simple_table)
VALUES (100, 'inserted via remote()');
SELECT * FROM simple_table;
```
结果:
``` text
┌──id─┬─text──────────────────┐
│ 100 │ inserted via remote() │
└─────┴───────────────────────┘
```
### 性能的注意事项 {#xing-neng-de-zhu-yi-shi-xiang}
在进行`INSERT`时将会对写入的数据进行一些处理,按照主键排序,按照月份对数据进行分区等。所以如果在您的写入数据中包含多个月份的混合数据时,将会显著的降低`INSERT`的性能。为了避免这种情况:
@ -108,4 +172,6 @@ INSERT INTO [db.]table [(c1, c2, c3)] SELECT ...
- 数据总是被实时的写入。
- 写入的数据已经按照时间排序。
也可以异步的、小规模的插入数据,这些数据会被合并成多个批次,然后安全地写入到表中,通过设置[async_insert](../../operations/settings/settings.md#async-insert)可以使用异步插入的方式请注意异步插入的方式只支持HTTP协议并且不支持数据去重。
[来源文章](https://clickhouse.com/docs/en/query_language/insert_into/) <!--hide-->

View File

@ -546,8 +546,9 @@ static void sanityChecks(Server & server)
#if defined(OS_LINUX)
try
{
if (readString("/sys/devices/system/clocksource/clocksource0/current_clocksource").find("tsc") == std::string::npos)
server.context()->addWarningMessage("Linux is not using a fast TSC clock source. Performance can be degraded.");
const char * filename = "/sys/devices/system/clocksource/clocksource0/current_clocksource";
if (readString(filename).find("tsc") == std::string::npos)
server.context()->addWarningMessage("Linux is not using a fast TSC clock source. Performance can be degraded. Check " + String(filename));
}
catch (...)
{
@ -555,8 +556,9 @@ static void sanityChecks(Server & server)
try
{
if (readNumber("/proc/sys/vm/overcommit_memory") == 2)
server.context()->addWarningMessage("Linux memory overcommit is disabled.");
const char * filename = "/proc/sys/vm/overcommit_memory";
if (readNumber(filename) == 2)
server.context()->addWarningMessage("Linux memory overcommit is disabled. Check " + String(filename));
}
catch (...)
{
@ -564,8 +566,9 @@ static void sanityChecks(Server & server)
try
{
if (readString("/sys/kernel/mm/transparent_hugepage/enabled").find("[always]") != std::string::npos)
server.context()->addWarningMessage("Linux transparent hugepages are set to \"always\".");
const char * filename = "/sys/kernel/mm/transparent_hugepage/enabled";
if (readString(filename).find("[always]") != std::string::npos)
server.context()->addWarningMessage("Linux transparent hugepages are set to \"always\". Check " + String(filename));
}
catch (...)
{
@ -573,8 +576,9 @@ static void sanityChecks(Server & server)
try
{
if (readNumber("/proc/sys/kernel/pid_max") < 30000)
server.context()->addWarningMessage("Linux max PID is too low.");
const char * filename = "/proc/sys/kernel/pid_max";
if (readNumber(filename) < 30000)
server.context()->addWarningMessage("Linux max PID is too low. Check " + String(filename));
}
catch (...)
{
@ -582,8 +586,9 @@ static void sanityChecks(Server & server)
try
{
if (readNumber("/proc/sys/kernel/threads-max") < 30000)
server.context()->addWarningMessage("Linux threads max count is too low.");
const char * filename = "/proc/sys/kernel/threads-max";
if (readNumber(filename) < 30000)
server.context()->addWarningMessage("Linux threads max count is too low. Check " + String(filename));
}
catch (...)
{
@ -591,7 +596,7 @@ static void sanityChecks(Server & server)
std::string dev_id = getBlockDeviceId(data_path);
if (getBlockDeviceType(dev_id) == BlockDeviceType::ROT && getBlockDeviceReadAheadBytes(dev_id) == 0)
server.context()->addWarningMessage("Rotational disk with disabled readahead is in use. Performance can be degraded.");
server.context()->addWarningMessage("Rotational disk with disabled readahead is in use. Performance can be degraded. Used for data: " + String(data_path));
#endif
try
@ -1095,8 +1100,6 @@ int Server::main(const std::vector<std::string> & /*args*/)
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker();
UInt64 max_overcommit_wait_time = config->getUInt64("global_memory_usage_overcommit_max_wait_microseconds", 200);
global_overcommit_tracker->setMaxWaitTime(max_overcommit_wait_time);
total_memory_tracker.setOvercommitTracker(global_overcommit_tracker);
// FIXME logging-related things need synchronization -- see the 'Logger * log' saved

View File

@ -6,9 +6,11 @@
#include <Common/Arena.h>
#include <Common/LRUCache.h>
#include <Common/assert_cast.h>
#include "Columns/IColumn.h"
#include <base/unaligned.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnLowCardinality.h>
@ -83,8 +85,11 @@ struct HashMethodString
HashMethodString(const ColumnRawPtrs & key_columns, const Sizes & /*key_sizes*/, const HashMethodContextPtr &)
{
const IColumn & column = *key_columns[0];
const ColumnString & column_string = assert_cast<const ColumnString &>(column);
const IColumn * column = key_columns[0];
if (isColumnConst(*column))
column = &assert_cast<const ColumnConst &>(*column).getDataColumn();
const ColumnString & column_string = assert_cast<const ColumnString &>(*column);
offsets = column_string.getOffsets().data();
chars = column_string.getChars().data();
}

View File

@ -221,7 +221,7 @@ bool haveAVX512F() noexcept
&& (our_xgetbv(0) & 6u) == 6u // XMM state and YMM state are enabled by OS
&& ((our_xgetbv(0) >> 5) & 7u) == 7u // ZMM state is enabled by OS
&& CpuInfo(0x0).registers.eax >= 0x7 // leaf 7 is present
&& ((CpuInfo(0x7).registers.ebx >> 16) & 1u); // AVX512F bit
&& ((CpuInfo(0x7, 0).registers.ebx >> 16) & 1u); // AVX512F bit
#else
return false;
#endif

View File

@ -638,10 +638,6 @@ void LRUFileCache::remove(const Key & key)
if (fs::exists(key_path))
fs::remove(key_path);
#ifndef NDEBUG
assertCacheCorrectness(cache_lock);
#endif
}
void LRUFileCache::remove()
@ -1053,8 +1049,7 @@ void LRUFileCache::assertCacheCellsCorrectness(
if (file_segment->reserved_size != 0)
{
assert(cell.queue_iterator);
/// FIXME: this is too slow, need to make it O(1)
/// assert(queue.contains(file_segment->key(), file_segment->offset(), cache_lock));
assert(queue.contains(file_segment->key(), file_segment->offset(), cache_lock));
}
}
}

View File

@ -301,7 +301,7 @@ private:
size_t getFileSegmentsNumUnlocked(std::lock_guard<std::mutex> & cache_lock) const;
static void assertCacheCellsCorrectness(const FileSegmentsByOffset & cells_by_offset, std::lock_guard<std::mutex> & cache_lock);
void assertCacheCellsCorrectness(const FileSegmentsByOffset & cells_by_offset, std::lock_guard<std::mutex> & cache_lock);
public:
String dumpStructure(const Key & key_) override;

View File

@ -16,6 +16,7 @@
#include <cmath>
#include <random>
#include <cstdlib>
#include <string>
#ifdef MEMORY_TRACKER_DEBUG_CHECKS
@ -52,11 +53,37 @@ namespace DB
}
}
namespace
{
inline std::string_view toDescription(OvercommitResult result)
{
switch (result)
{
case OvercommitResult::NONE:
return "Memory overcommit isn't used. OvercommitTracker isn't set.";
case OvercommitResult::DISABLED:
return "Memory overcommit isn't used. Waiting time or orvercommit denominator are set to zero.";
case OvercommitResult::MEMORY_FREED:
throw DB::Exception(DB::ErrorCodes::LOGICAL_ERROR, "OvercommitResult::MEMORY_FREED shouldn't be asked for description");
case OvercommitResult::SELECTED:
return "Query was selected to stop by OvercommitTracker.";
case OvercommitResult::TIMEOUTED:
return "Waiting timeout for memory to be freed is reached.";
case OvercommitResult::NOT_ENOUGH_FREED:
return "Memory overcommit has freed not enough memory.";
}
}
}
namespace ProfileEvents
{
extern const Event QueryMemoryLimitExceeded;
}
using namespace std::chrono_literals;
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
@ -189,11 +216,11 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
if (unlikely(current_hard_limit && will_be > current_hard_limit) && memoryTrackerCanThrow(level, false) && throw_if_memory_exceeded)
{
bool need_to_throw = true;
OvercommitResult overcommit_result = OvercommitResult::NONE;
if (auto * overcommit_tracker_ptr = overcommit_tracker.load(std::memory_order_relaxed); overcommit_tracker_ptr != nullptr && query_tracker != nullptr)
need_to_throw = overcommit_tracker_ptr->needToStopQuery(query_tracker, size);
overcommit_result = overcommit_tracker_ptr->needToStopQuery(query_tracker, size);
if (need_to_throw)
if (overcommit_result != OvercommitResult::MEMORY_FREED)
{
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
MemoryTrackerBlockerInThread untrack_lock(VariableContext::Global);
@ -201,12 +228,13 @@ void MemoryTracker::allocImpl(Int64 size, bool throw_if_memory_exceeded, MemoryT
const auto * description = description_ptr.load(std::memory_order_relaxed);
throw DB::Exception(
DB::ErrorCodes::MEMORY_LIMIT_EXCEEDED,
"Memory limit{}{} exceeded: would use {} (attempt to allocate chunk of {} bytes), maximum: {}",
"Memory limit{}{} exceeded: would use {} (attempt to allocate chunk of {} bytes), maximum: {}. OvercommitTracker decision: {}.",
description ? " " : "",
description ? description : "",
formatReadableSizeWithBinarySuffix(will_be),
size,
formatReadableSizeWithBinarySuffix(current_hard_limit));
formatReadableSizeWithBinarySuffix(current_hard_limit),
toDescription(overcommit_result));
}
else
{
@ -337,6 +365,12 @@ OvercommitRatio MemoryTracker::getOvercommitRatio(Int64 limit)
}
void MemoryTracker::setOvercommitWaitingTime(UInt64 wait_time)
{
max_wait_time.store(wait_time * 1us, std::memory_order_relaxed);
}
void MemoryTracker::resetCounters()
{
amount.store(0, std::memory_order_relaxed);

View File

@ -1,6 +1,7 @@
#pragma once
#include <atomic>
#include <chrono>
#include <base/types.h>
#include <Common/CurrentMetrics.h>
#include <Common/VariableContext.h>
@ -73,6 +74,8 @@ private:
/// This description will be used as prefix into log messages (if isn't nullptr)
std::atomic<const char *> description_ptr = nullptr;
std::atomic<std::chrono::microseconds> max_wait_time;
std::atomic<OvercommitTracker *> overcommit_tracker = nullptr;
bool updatePeak(Int64 will_be, bool log_memory_usage);
@ -186,6 +189,13 @@ public:
OvercommitRatio getOvercommitRatio();
OvercommitRatio getOvercommitRatio(Int64 limit);
std::chrono::microseconds getOvercommitWaitingTime()
{
return max_wait_time.load(std::memory_order_relaxed);
}
void setOvercommitWaitingTime(UInt64 wait_time);
void setOvercommitTracker(OvercommitTracker * tracker) noexcept
{
overcommit_tracker.store(tracker, std::memory_order_relaxed);

View File

@ -2,15 +2,20 @@
#include <chrono>
#include <mutex>
#include <Common/ProfileEvents.h>
#include <Interpreters/ProcessList.h>
namespace ProfileEvents
{
extern const Event MemoryOvercommitWaitTimeMicroseconds;
}
using namespace std::chrono_literals;
constexpr std::chrono::microseconds ZERO_MICROSEC = 0us;
OvercommitTracker::OvercommitTracker(std::mutex & global_mutex_)
: max_wait_time(ZERO_MICROSEC)
, picked_tracker(nullptr)
: picked_tracker(nullptr)
, cancellation_state(QueryCancellationState::NONE)
, global_mutex(global_mutex_)
, freed_memory(0)
@ -18,13 +23,7 @@ OvercommitTracker::OvercommitTracker(std::mutex & global_mutex_)
, allow_release(true)
{}
void OvercommitTracker::setMaxWaitTime(UInt64 wait_time)
{
std::lock_guard guard(overcommit_m);
max_wait_time = wait_time * 1us;
}
bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int64 amount)
OvercommitResult OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int64 amount)
{
// NOTE: Do not change the order of locks
//
@ -35,8 +34,10 @@ bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int64 amount)
std::unique_lock<std::mutex> global_lock(global_mutex);
std::unique_lock<std::mutex> lk(overcommit_m);
auto max_wait_time = tracker->getOvercommitWaitingTime();
if (max_wait_time == ZERO_MICROSEC)
return true;
return OvercommitResult::DISABLED;
pickQueryToExclude();
assert(cancellation_state != QueryCancellationState::NONE);
@ -50,7 +51,7 @@ bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int64 amount)
// picked_tracker to be not null pointer.
assert(cancellation_state == QueryCancellationState::SELECTED);
cancellation_state = QueryCancellationState::NONE;
return true;
return OvercommitResult::DISABLED;
}
if (picked_tracker == tracker)
{
@ -58,17 +59,20 @@ bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int64 amount)
// It may happen even when current state is RUNNING, because
// ThreadStatus::~ThreadStatus may call MemoryTracker::alloc.
cancellation_state = QueryCancellationState::RUNNING;
return true;
return OvercommitResult::SELECTED;
}
allow_release = true;
required_memory += amount;
required_per_thread[tracker] = amount;
auto wait_start_time = std::chrono::system_clock::now();
bool timeout = !cv.wait_for(lk, max_wait_time, [this, tracker]()
{
return required_per_thread[tracker] == 0 || cancellation_state == QueryCancellationState::NONE;
});
auto wait_end_time = std::chrono::system_clock::now();
ProfileEvents::increment(ProfileEvents::MemoryOvercommitWaitTimeMicroseconds, (wait_end_time - wait_start_time) / 1us);
LOG_DEBUG(getLogger(), "Memory was{} freed within timeout", (timeout ? " not" : ""));
required_memory -= amount;
@ -84,7 +88,12 @@ bool OvercommitTracker::needToStopQuery(MemoryTracker * tracker, Int64 amount)
// As we don't need to free memory, we can continue execution of the selected query.
if (required_memory == 0 && cancellation_state == QueryCancellationState::SELECTED)
reset();
return timeout || still_need != 0;
if (timeout)
return OvercommitResult::TIMEOUTED;
if (still_need != 0)
return OvercommitResult::NOT_ENOUGH_FREED;
else
return OvercommitResult::MEMORY_FREED;
}
void OvercommitTracker::tryContinueQueryExecutionAfterFree(Int64 amount)

View File

@ -36,6 +36,16 @@ struct OvercommitRatio
class MemoryTracker;
enum class OvercommitResult
{
NONE,
DISABLED,
MEMORY_FREED,
SELECTED,
TIMEOUTED,
NOT_ENOUGH_FREED,
};
enum class QueryCancellationState
{
NONE = 0, // Hard limit is not reached, there is no selected query to kill.
@ -52,9 +62,7 @@ enum class QueryCancellationState
// is killed to free memory.
struct OvercommitTracker : boost::noncopyable
{
void setMaxWaitTime(UInt64 wait_time);
bool needToStopQuery(MemoryTracker * tracker, Int64 amount);
OvercommitResult needToStopQuery(MemoryTracker * tracker, Int64 amount);
void tryContinueQueryExecutionAfterFree(Int64 amount);
@ -72,8 +80,6 @@ protected:
std::mutex overcommit_m;
std::condition_variable cv;
std::chrono::microseconds max_wait_time;
// Specifies memory tracker of the chosen to stop query.
// If soft limit is not set, all the queries which reach hard limit must stop.
// This case is represented as picked tracker pointer is set to nullptr and

View File

@ -199,6 +199,7 @@
M(RealTimeMicroseconds, "Total (wall clock) time spent in processing (queries and other tasks) threads (not that this is a sum).") \
M(UserTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in user space. This include time CPU pipeline was stalled due to cache misses, branch mispredictions, hyper-threading, etc.") \
M(SystemTimeMicroseconds, "Total time spent in processing (queries and other tasks) threads executing CPU instructions in OS kernel space. This include time CPU pipeline was stalled due to cache misses, branch mispredictions, hyper-threading, etc.") \
M(MemoryOvercommitWaitTimeMicroseconds, "Total time spent in waiting for memory to be freed in OvercommitTracker.") \
M(SoftPageFaults, "") \
M(HardPageFaults, "") \
\

View File

@ -16,6 +16,8 @@ UInt32 getSupportedArchs()
result |= static_cast<UInt32>(TargetArch::AVX2);
if (Cpu::CpuFlagsCache::have_AVX512F)
result |= static_cast<UInt32>(TargetArch::AVX512F);
if (Cpu::CpuFlagsCache::have_AVX512BW)
result |= static_cast<UInt32>(TargetArch::AVX512BW);
return result;
}
@ -34,6 +36,7 @@ String toString(TargetArch arch)
case TargetArch::AVX: return "avx";
case TargetArch::AVX2: return "avx2";
case TargetArch::AVX512F: return "avx512f";
case TargetArch::AVX512BW: return "avx512bw";
}
__builtin_unreachable();

View File

@ -80,6 +80,7 @@ enum class TargetArch : UInt32
AVX = (1 << 1),
AVX2 = (1 << 2),
AVX512F = (1 << 3),
AVX512BW = (1 << 4),
};
/// Runtime detection.

View File

@ -40,15 +40,17 @@ static constexpr UInt64 WAIT_TIME = 4'000'000;
template <typename T>
void free_not_continue_test(T & overcommit_tracker)
{
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
static constexpr size_t THREADS = 5;
std::vector<MemoryTracker> trackers(THREADS);
for (auto & tracker : trackers)
tracker.setOvercommitWaitingTime(WAIT_TIME);
std::atomic<int> need_to_stop = 0;
std::vector<std::thread> threads;
threads.reserve(THREADS);
MemoryTracker picked;
picked.setOvercommitWaitingTime(WAIT_TIME);
overcommit_tracker.setCandidate(&picked);
for (size_t i = 0; i < THREADS; ++i)
@ -56,7 +58,7 @@ void free_not_continue_test(T & overcommit_tracker)
threads.push_back(std::thread(
[&, i]()
{
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
if (overcommit_tracker.needToStopQuery(&trackers[i], 100) != OvercommitResult::MEMORY_FREED)
++need_to_stop;
}
));
@ -96,15 +98,16 @@ TEST(OvercommitTracker, GlobalFreeNotContinue)
template <typename T>
void free_continue_test(T & overcommit_tracker)
{
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
static constexpr size_t THREADS = 5;
std::vector<MemoryTracker> trackers(THREADS);
for (auto & tracker : trackers)
tracker.setOvercommitWaitingTime(WAIT_TIME);
std::atomic<int> need_to_stop = 0;
std::vector<std::thread> threads;
threads.reserve(THREADS);
MemoryTracker picked;
picked.setOvercommitWaitingTime(WAIT_TIME);
overcommit_tracker.setCandidate(&picked);
for (size_t i = 0; i < THREADS; ++i)
@ -112,7 +115,7 @@ void free_continue_test(T & overcommit_tracker)
threads.push_back(std::thread(
[&, i]()
{
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
if (overcommit_tracker.needToStopQuery(&trackers[i], 100) != OvercommitResult::MEMORY_FREED)
++need_to_stop;
}
));
@ -152,15 +155,16 @@ TEST(OvercommitTracker, GlobalFreeContinue)
template <typename T>
void free_continue_and_alloc_test(T & overcommit_tracker)
{
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
static constexpr size_t THREADS = 5;
std::vector<MemoryTracker> trackers(THREADS);
for (auto & tracker : trackers)
tracker.setOvercommitWaitingTime(WAIT_TIME);
std::atomic<int> need_to_stop = 0;
std::vector<std::thread> threads;
threads.reserve(THREADS);
MemoryTracker picked;
picked.setOvercommitWaitingTime(WAIT_TIME);
overcommit_tracker.setCandidate(&picked);
for (size_t i = 0; i < THREADS; ++i)
@ -168,7 +172,7 @@ void free_continue_and_alloc_test(T & overcommit_tracker)
threads.push_back(std::thread(
[&, i]()
{
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
if (overcommit_tracker.needToStopQuery(&trackers[i], 100) != OvercommitResult::MEMORY_FREED)
++need_to_stop;
}
));
@ -179,9 +183,10 @@ void free_continue_and_alloc_test(T & overcommit_tracker)
[&]()
{
MemoryTracker failed;
failed.setOvercommitWaitingTime(WAIT_TIME);
std::this_thread::sleep_for(1000ms);
overcommit_tracker.tryContinueQueryExecutionAfterFree(5000);
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100);
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100) != OvercommitResult::MEMORY_FREED;
}
).join();
@ -212,15 +217,16 @@ TEST(OvercommitTracker, GlobalFreeContinueAndAlloc)
template <typename T>
void free_continue_and_alloc_2_test(T & overcommit_tracker)
{
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
static constexpr size_t THREADS = 5;
std::vector<MemoryTracker> trackers(THREADS);
for (auto & tracker : trackers)
tracker.setOvercommitWaitingTime(WAIT_TIME);
std::atomic<int> need_to_stop = 0;
std::vector<std::thread> threads;
threads.reserve(THREADS);
MemoryTracker picked;
picked.setOvercommitWaitingTime(WAIT_TIME);
overcommit_tracker.setCandidate(&picked);
for (size_t i = 0; i < THREADS; ++i)
@ -228,7 +234,7 @@ void free_continue_and_alloc_2_test(T & overcommit_tracker)
threads.push_back(std::thread(
[&, i]()
{
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
if (overcommit_tracker.needToStopQuery(&trackers[i], 100) != OvercommitResult::MEMORY_FREED)
++need_to_stop;
}
));
@ -239,9 +245,10 @@ void free_continue_and_alloc_2_test(T & overcommit_tracker)
[&]()
{
MemoryTracker failed;
failed.setOvercommitWaitingTime(WAIT_TIME);
std::this_thread::sleep_for(1000ms);
overcommit_tracker.tryContinueQueryExecutionAfterFree(5000);
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100);
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100) != OvercommitResult::MEMORY_FREED;
}
));
@ -280,15 +287,16 @@ TEST(OvercommitTracker, GlobalFreeContinueAndAlloc2)
template <typename T>
void free_continue_and_alloc_3_test(T & overcommit_tracker)
{
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
static constexpr size_t THREADS = 5;
std::vector<MemoryTracker> trackers(THREADS);
for (auto & tracker : trackers)
tracker.setOvercommitWaitingTime(WAIT_TIME);
std::atomic<int> need_to_stop = 0;
std::vector<std::thread> threads;
threads.reserve(THREADS);
MemoryTracker picked;
picked.setOvercommitWaitingTime(WAIT_TIME);
overcommit_tracker.setCandidate(&picked);
for (size_t i = 0; i < THREADS; ++i)
@ -296,7 +304,7 @@ void free_continue_and_alloc_3_test(T & overcommit_tracker)
threads.push_back(std::thread(
[&, i]()
{
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
if (overcommit_tracker.needToStopQuery(&trackers[i], 100) != OvercommitResult::MEMORY_FREED)
++need_to_stop;
}
));
@ -307,9 +315,10 @@ void free_continue_and_alloc_3_test(T & overcommit_tracker)
[&]()
{
MemoryTracker failed;
failed.setOvercommitWaitingTime(WAIT_TIME);
std::this_thread::sleep_for(1000ms);
overcommit_tracker.tryContinueQueryExecutionAfterFree(5000);
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100);
stopped_next = overcommit_tracker.needToStopQuery(&failed, 100) != OvercommitResult::MEMORY_FREED;
}
));
@ -348,15 +357,16 @@ TEST(OvercommitTracker, GlobalFreeContinueAndAlloc3)
template <typename T>
void free_continue_2_test(T & overcommit_tracker)
{
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
static constexpr size_t THREADS = 5;
std::vector<MemoryTracker> trackers(THREADS);
for (auto & tracker : trackers)
tracker.setOvercommitWaitingTime(WAIT_TIME);
std::atomic<int> need_to_stop = 0;
std::vector<std::thread> threads;
threads.reserve(THREADS);
MemoryTracker picked;
picked.setOvercommitWaitingTime(WAIT_TIME);
overcommit_tracker.setCandidate(&picked);
for (size_t i = 0; i < THREADS; ++i)
@ -364,7 +374,7 @@ void free_continue_2_test(T & overcommit_tracker)
threads.push_back(std::thread(
[&, i]()
{
if (overcommit_tracker.needToStopQuery(&trackers[i], 100))
if (overcommit_tracker.needToStopQuery(&trackers[i], 100) != OvercommitResult::MEMORY_FREED)
++need_to_stop;
}
));
@ -404,18 +414,18 @@ TEST(OvercommitTracker, GlobalFreeContinue2)
template <typename T>
void query_stop_not_continue_test(T & overcommit_tracker)
{
overcommit_tracker.setMaxWaitTime(WAIT_TIME);
std::atomic<int> need_to_stop = 0;
MemoryTracker picked;
picked.setOvercommitWaitingTime(WAIT_TIME);
overcommit_tracker.setCandidate(&picked);
MemoryTracker another;
another.setOvercommitWaitingTime(WAIT_TIME);
auto thread = std::thread(
[&]()
{
if (overcommit_tracker.needToStopQuery(&another, 100))
if (overcommit_tracker.needToStopQuery(&another, 100) != OvercommitResult::MEMORY_FREED)
++need_to_stop;
}
);

View File

@ -1166,12 +1166,12 @@ KeeperStorage::ResponsesForSessions KeeperStorage::processRequest(const Coordina
? list_watches
: watches;
watches_type[zk_request->getPath()].emplace_back(session_id);
watches_type[zk_request->getPath()].emplace(session_id);
sessions_and_watchers[session_id].emplace(zk_request->getPath());
}
else if (response->error == Coordination::Error::ZNONODE && zk_request->getOpNum() == Coordination::OpNum::Exists)
{
watches[zk_request->getPath()].emplace_back(session_id);
watches[zk_request->getPath()].emplace(session_id);
sessions_and_watchers[session_id].emplace(zk_request->getPath());
}
}
@ -1206,13 +1206,7 @@ void KeeperStorage::clearDeadWatches(int64_t session_id)
if (watch != watches.end())
{
auto & watches_for_path = watch->second;
for (auto w_it = watches_for_path.begin(); w_it != watches_for_path.end();)
{
if (*w_it == session_id)
w_it = watches_for_path.erase(w_it);
else
++w_it;
}
watches_for_path.erase(session_id);
if (watches_for_path.empty())
watches.erase(watch);
}
@ -1222,13 +1216,7 @@ void KeeperStorage::clearDeadWatches(int64_t session_id)
if (list_watch != list_watches.end())
{
auto & list_watches_for_path = list_watch->second;
for (auto w_it = list_watches_for_path.begin(); w_it != list_watches_for_path.end();)
{
if (*w_it == session_id)
w_it = list_watches_for_path.erase(w_it);
else
++w_it;
}
list_watches_for_path.erase(session_id);
if (list_watches_for_path.empty())
list_watches.erase(list_watch);
}
@ -1250,7 +1238,7 @@ void KeeperStorage::dumpWatches(WriteBufferFromOwnString & buf) const
void KeeperStorage::dumpWatchesByPath(WriteBufferFromOwnString & buf) const
{
auto write_int_vec = [&buf](const std::vector<int64_t> & session_ids)
auto write_int_container = [&buf](const auto & session_ids)
{
for (int64_t session_id : session_ids)
{
@ -1261,13 +1249,13 @@ void KeeperStorage::dumpWatchesByPath(WriteBufferFromOwnString & buf) const
for (const auto & [watch_path, sessions] : watches)
{
buf << watch_path << "\n";
write_int_vec(sessions);
write_int_container(sessions);
}
for (const auto & [watch_path, sessions] : list_watches)
{
buf << watch_path << "\n";
write_int_vec(sessions);
write_int_container(sessions);
}
}

View File

@ -96,7 +96,7 @@ public:
using Container = SnapshotableHashTable<Node>;
using Ephemerals = std::unordered_map<int64_t, std::unordered_set<std::string>>;
using SessionAndWatcher = std::unordered_map<int64_t, std::unordered_set<std::string>>;
using SessionIDs = std::vector<int64_t>;
using SessionIDs = std::unordered_set<int64_t>;
/// Just vector of SHA1 from user:password
using AuthIDs = std::vector<AuthID>;

View File

@ -1,5 +1,6 @@
#pragma once
#include <unordered_set>
#include <vector>
@ -7,6 +8,8 @@ namespace DB
{
using ColumnNumbers = std::vector<size_t>;
using ColumnNumbersSet = std::unordered_set<size_t>;
using ColumnNumbersList = std::vector<ColumnNumbers>;
using ColumnNumbersSetList = std::vector<ColumnNumbersSet>;
}

View File

@ -16,6 +16,7 @@ using NameOrderedSet = std::set<std::string>;
using NameToNameMap = std::unordered_map<std::string, std::string>;
using NameToNameSetMap = std::unordered_map<std::string, NameSet>;
using NameToNameVector = std::vector<std::pair<std::string, std::string>>;
using NameToIndexMap = std::unordered_map<std::string, size_t>;
using NameWithAlias = std::pair<std::string, std::string>;
using NamesWithAliases = std::vector<NameWithAlias>;

View File

@ -1,3 +1,4 @@
#include <cstddef>
#include <Core/NamesAndTypes.h>
#include <base/sort.h>
@ -214,4 +215,17 @@ std::optional<NameAndTypePair> NamesAndTypesList::tryGetByName(const std::string
}
return {};
}
size_t NamesAndTypesList::getPosByName(const std::string &name) const noexcept
{
size_t pos = 0;
for (const NameAndTypePair & column : *this)
{
if (column.name == name)
break;
++pos;
}
return pos;
}
}

View File

@ -105,8 +105,11 @@ public:
/// Check that column contains in list
bool contains(const String & name) const;
/// Try to get column by name, return empty optional if column not found
/// Try to get column by name, returns empty optional if column not found
std::optional<NameAndTypePair> tryGetByName(const std::string & name) const;
/// Try to get column position by name, returns number of columns if column isn't found
size_t getPosByName(const std::string & name) const noexcept;
};
using NamesAndTypesLists = std::vector<NamesAndTypesList>;

View File

@ -372,7 +372,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(UInt64, memory_profiler_step, (4 * 1024 * 1024), "Whenever query memory usage becomes larger than every next step in number of bytes the memory profiler will collect the allocating stack trace. Zero means disabled memory profiler. Values lower than a few megabytes will slow down query processing.", 0) \
M(Float, memory_profiler_sample_probability, 0., "Collect random allocations and deallocations and write them into system.trace_log with 'MemorySample' trace_type. The probability is for every alloc/free regardless to the size of the allocation. Note that sampling happens only when the amount of untracked memory exceeds 'max_untracked_memory'. You may want to set 'max_untracked_memory' to 0 for extra fine grained sampling.", 0) \
\
M(UInt64, memory_usage_overcommit_max_wait_microseconds, 200, "Maximum time thread will wait for memory to be freed in the case of memory overcommit on user level. If timeout is reached and memory is not freed, exception is thrown.", 0) \
M(UInt64, memory_usage_overcommit_max_wait_microseconds, 5'000'000, "Maximum time thread will wait for memory to be freed in the case of memory overcommit. If timeout is reached and memory is not freed, exception is thrown.", 0) \
\
M(UInt64, max_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for a query. Zero means unlimited.", 0) \
M(UInt64, max_network_bytes, 0, "The maximum number of bytes (compressed) to receive or transmit over the network for execution of the query.", 0) \
@ -567,7 +567,7 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
\
M(UInt64, remote_fs_read_max_backoff_ms, 10000, "Max wait time when trying to read data for remote disk", 0) \
M(UInt64, remote_fs_read_backoff_max_tries, 5, "Max attempts to read with backoff", 0) \
M(Bool, enable_filesystem_cache, true, "Use cache for remote filesystem. This setting does not turn on/off cache for disks (must me done via disk config), but allows to bypass cache for some queries if intended", 0) \
M(Bool, enable_filesystem_cache, true, "Use cache for remote filesystem. This setting does not turn on/off cache for disks (must be done via disk config), but allows to bypass cache for some queries if intended", 0) \
M(UInt64, filesystem_cache_max_wait_sec, 5, "Allow to wait at most this number of seconds for download of current remote_fs_buffer_size bytes, and skip cache if exceeded", 0) \
M(Bool, enable_filesystem_cache_on_write_operations, false, "Write into cache on write operations. To actually work this setting requires be added to disk config too", 0) \
M(Bool, enable_filesystem_cache_log, false, "Allows to record the filesystem caching log for each query", 0) \
@ -674,6 +674,8 @@ static constexpr UInt64 operator""_GiB(unsigned long long value)
M(Bool, input_format_json_read_bools_as_numbers, true, "Allow to parse bools as numbers in JSON input formats", 0) \
M(Bool, input_format_protobuf_flatten_google_wrappers, false, "Enable Google wrappers for regular non-nested columns, e.g. google.protobuf.StringValue 'str' for String column 'str'. For Nullable columns empty wrappers are recognized as defaults, and missing as nulls", 0) \
M(Bool, output_format_protobuf_nullables_with_google_wrappers, false, "When serializing Nullable columns with Google wrappers, serialize default values as empty wrappers. If turned off, default and null values are not serialized", 0) \
M(UInt64, input_format_csv_skip_first_lines, 0, "Skip specified number of lines at the beginning of data in CSV format", 0) \
M(UInt64, input_format_tsv_skip_first_lines, 0, "Skip specified number of lines at the beginning of data in TSV format", 0) \
\
M(DateTimeInputFormat, date_time_input_format, FormatSettings::DateTimeInputFormat::Basic, "Method to read DateTime from text input formats. Possible values: 'basic', 'best_effort' and 'best_effort_us'.", 0) \
M(DateTimeOutputFormat, date_time_output_format, FormatSettings::DateTimeOutputFormat::Simple, "Method to write DateTime to text output. Possible values: 'simple', 'iso', 'unix_timestamp'.", 0) \

View File

@ -9,6 +9,8 @@
#include <IO/WriteHelpers.h>
#include <boost/algorithm/string/predicate.hpp>
#include <cmath>
namespace DB
{
@ -16,6 +18,7 @@ namespace ErrorCodes
{
extern const int SIZE_OF_FIXED_STRING_DOESNT_MATCH;
extern const int CANNOT_PARSE_BOOL;
extern const int CANNOT_PARSE_NUMBER;
}
@ -176,27 +179,75 @@ UInt64 SettingFieldMaxThreads::getAuto()
return getNumberOfPhysicalCPUCores();
}
namespace
{
Poco::Timespan::TimeDiff float64AsSecondsToTimespan(Float64 d)
{
if (d != 0.0 && !std::isnormal(d))
throw Exception(
ErrorCodes::CANNOT_PARSE_NUMBER, "A setting's value in seconds must be a normal floating point number or zero. Got {}", d);
return static_cast<Poco::Timespan::TimeDiff>(d * 1000000);
}
template <SettingFieldTimespanUnit unit_>
SettingFieldTimespan<unit_>::SettingFieldTimespan(const Field & f) : SettingFieldTimespan(fieldToNumber<UInt64>(f))
}
template <>
SettingFieldSeconds::SettingFieldTimespan(const Field & f) : SettingFieldTimespan(float64AsSecondsToTimespan(fieldToNumber<Float64>(f)))
{
}
template <SettingFieldTimespanUnit unit_>
SettingFieldTimespan<unit_> & SettingFieldTimespan<unit_>::operator=(const Field & f)
template <>
SettingFieldMilliseconds::SettingFieldTimespan(const Field & f) : SettingFieldTimespan(fieldToNumber<UInt64>(f))
{
}
template <>
SettingFieldTimespan<SettingFieldTimespanUnit::Second> & SettingFieldSeconds::operator=(const Field & f)
{
*this = Poco::Timespan{float64AsSecondsToTimespan(fieldToNumber<Float64>(f))};
return *this;
}
template <>
SettingFieldTimespan<SettingFieldTimespanUnit::Millisecond> & SettingFieldMilliseconds::operator=(const Field & f)
{
*this = fieldToNumber<UInt64>(f);
return *this;
}
template <SettingFieldTimespanUnit unit_>
String SettingFieldTimespan<unit_>::toString() const
template <>
String SettingFieldSeconds::toString() const
{
return ::DB::toString(static_cast<Float64>(value.totalMicroseconds()) / microseconds_per_unit);
}
template <>
String SettingFieldMilliseconds::toString() const
{
return ::DB::toString(operator UInt64());
}
template <SettingFieldTimespanUnit unit_>
void SettingFieldTimespan<unit_>::parseFromString(const String & str)
template <>
SettingFieldSeconds::operator Field() const
{
return static_cast<Float64>(value.totalMicroseconds()) / microseconds_per_unit;
}
template <>
SettingFieldMilliseconds::operator Field() const
{
return operator UInt64();
}
template <>
void SettingFieldSeconds::parseFromString(const String & str)
{
Float64 n = parse<Float64>(str.data(), str.size());
*this = Poco::Timespan{static_cast<Poco::Timespan::TimeDiff>(n * microseconds_per_unit)};
}
template <>
void SettingFieldMilliseconds::parseFromString(const String & str)
{
*this = stringToNumber<UInt64>(str);
}
@ -204,6 +255,13 @@ void SettingFieldTimespan<unit_>::parseFromString(const String & str)
template <SettingFieldTimespanUnit unit_>
void SettingFieldTimespan<unit_>::writeBinary(WriteBuffer & out) const
{
/// Note that this returns an UInt64 (for both seconds and milliseconds units) for compatibility reasons as the value
/// for seconds used to be a integer (now a Float64)
/// This method is only used to communicate with clients or servers older than DBMS_MIN_REVISION_WITH_SETTINGS_SERIALIZED_AS_STRINGS
/// in which the value was passed as binary (as a UInt64)
/// Later versions pass the setting values as String (using toString() and parseFromString()) and there passing "1.2" will
/// lead to `1` on releases with integer seconds or `1.2` on more recent releases
/// See https://github.com/ClickHouse/ClickHouse/issues/36940 for more details
auto num_units = operator UInt64();
writeVarUInt(num_units, out);
}

View File

@ -124,7 +124,7 @@ struct SettingFieldTimespan
operator std::chrono::duration<Rep, Period>() const { return std::chrono::duration_cast<std::chrono::duration<Rep, Period>>(std::chrono::microseconds(value.totalMicroseconds())); } /// NOLINT
explicit operator UInt64() const { return value.totalMicroseconds() / microseconds_per_unit; }
explicit operator Field() const { return operator UInt64(); }
explicit operator Field() const;
Poco::Timespan::TimeDiff totalMicroseconds() const { return value.totalMicroseconds(); }
Poco::Timespan::TimeDiff totalMilliseconds() const { return value.totalMilliseconds(); }

View File

@ -222,11 +222,7 @@ struct SimpleSortCursor : SortCursorHelper<SimpleSortCursor>
bool ALWAYS_INLINE greaterAt(const SimpleSortCursor & rhs, size_t lhs_pos, size_t rhs_pos) const
{
const auto & desc = impl->desc[0];
int direction = desc.direction;
int nulls_direction = desc.nulls_direction;
bool result = false;
int res = 0;
#if USE_EMBEDDED_COMPILER
if (impl->desc.compiled_sort_description && rhs.impl->desc.compiled_sort_description)
@ -234,17 +230,23 @@ struct SimpleSortCursor : SortCursorHelper<SimpleSortCursor>
assert(impl->raw_sort_columns_data.size() == rhs.impl->raw_sort_columns_data.size());
auto sort_description_func_typed = reinterpret_cast<JITSortDescriptionFunc>(impl->desc.compiled_sort_description);
int jit_result = sort_description_func_typed(lhs_pos, rhs_pos, impl->raw_sort_columns_data.data(), rhs.impl->raw_sort_columns_data.data()); /// NOLINT
result = jit_result > 0;
res = sort_description_func_typed(lhs_pos, rhs_pos, impl->raw_sort_columns_data.data(), rhs.impl->raw_sort_columns_data.data()); /// NOLINT
}
else
#endif
{
int non_jit_result = impl->sort_columns[0]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[0]), nulls_direction);
result = (non_jit_result != 0 && ((non_jit_result > 0) == (direction > 0)));
const auto & desc = impl->desc[0];
int direction = desc.direction;
int nulls_direction = desc.nulls_direction;
res = direction * impl->sort_columns[0]->compareAt(lhs_pos, rhs_pos, *(rhs.impl->sort_columns[0]), nulls_direction);
}
return result;
if (res > 0)
return true;
if (res < 0)
return false;
return impl->order > rhs.impl->order;
}
};

View File

@ -8,6 +8,7 @@
#include <IO/WriteHelpers.h>
#include <Processors/Formats/IInputFormat.h>
#include <Interpreters/Context.h>
#include <Storages/ExternalDataSourceConfiguration.h>
#include <Poco/Net/HTTPRequest.h>
#include <Common/logger_useful.h>
#include "DictionarySourceFactory.h"
@ -228,12 +229,32 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
if (dict_struct.has_expressions)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Dictionary source of type `http` does not support attribute expressions");
auto context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
const auto & settings_config_prefix = config_prefix + ".http";
const auto & credentials_prefix = settings_config_prefix + ".credentials";
auto settings_config_prefix = config_prefix + ".http";
Poco::Net::HTTPBasicCredentials credentials;
ReadWriteBufferFromHTTP::HTTPHeaderEntries header_entries;
String url;
String endpoint;
String format;
auto named_collection = created_from_ddl
? getURLBasedDataSourceConfiguration(config, settings_config_prefix, global_context)
: std::nullopt;
if (named_collection)
{
url = named_collection->configuration.url;
endpoint = named_collection->configuration.endpoint;
format = named_collection->configuration.format;
credentials.setUsername(named_collection->configuration.user);
credentials.setPassword(named_collection->configuration.password);
header_entries.reserve(named_collection->configuration.headers.size());
for (const auto & header : named_collection->configuration.headers)
header_entries.emplace_back(std::make_tuple(header.first, header.second.get<String>()));
}
else
{
const auto & credentials_prefix = settings_config_prefix + ".credentials";
if (config.has(credentials_prefix))
{
@ -242,7 +263,7 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
}
const auto & headers_prefix = settings_config_prefix + ".headers";
ReadWriteBufferFromHTTP::HTTPHeaderEntries header_entries;
if (config.has(headers_prefix))
{
@ -258,15 +279,30 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
}
}
url = config.getString(settings_config_prefix + ".url", "");
endpoint = config.getString(settings_config_prefix + ".endpoint", "");
format =config.getString(settings_config_prefix + ".format", "");
}
if (url.ends_with('/'))
{
if (endpoint.starts_with('/'))
url.pop_back();
}
else if (!endpoint.empty() && !endpoint.starts_with('/'))
url.push_back('/');
auto configuration = HTTPDictionarySource::Configuration
{
.url = config.getString(settings_config_prefix + ".url", ""),
.format =config.getString(settings_config_prefix + ".format", ""),
.url = url + endpoint,
.format = format,
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
.header_entries = std::move(header_entries) //-V1030
};
auto context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
return std::make_unique<HTTPDictionarySource>(dict_struct, configuration, credentials, sample_block, context, created_from_ddl);
};
factory.registerSource("http", create_table_source);

View File

@ -27,6 +27,8 @@ namespace ErrorCodes
extern const int FILE_ALREADY_EXISTS;
extern const int FILE_DOESNT_EXIST;
extern const int BAD_FILE_TYPE;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int CANNOT_READ_ALL_DATA;
}
static String revisionToString(UInt64 revision)
@ -122,10 +124,10 @@ DiskObjectStorage::Metadata DiskObjectStorage::readUpdateAndStoreMetadata(const
}
DiskObjectStorage::Metadata DiskObjectStorage::readUpdateStoreMetadataAndRemove(const String & path, bool sync, DiskObjectStorage::MetadataUpdater updater)
void DiskObjectStorage::readUpdateStoreMetadataAndRemove(const String & path, bool sync, DiskObjectStorage::MetadataUpdater updater)
{
std::unique_lock lock(metadata_mutex);
return Metadata::readUpdateStoreMetadataAndRemove(remote_fs_root_path, metadata_disk, path, sync, updater);
Metadata::readUpdateStoreMetadataAndRemove(remote_fs_root_path, metadata_disk, path, sync, updater);
}
DiskObjectStorage::Metadata DiskObjectStorage::readOrCreateUpdateAndStoreMetadata(const String & path, WriteMode mode, bool sync, DiskObjectStorage::MetadataUpdater updater)
@ -174,8 +176,13 @@ void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::
}
catch (const Exception & e)
{
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST)
/// Unfortunately in rare cases it can happen when files disappear
/// or can be empty in case of operation interruption (like cancelled metadata fetch)
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST ||
e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF ||
e.code() == ErrorCodes::CANNOT_READ_ALL_DATA)
return;
throw;
}
}
@ -186,6 +193,15 @@ void DiskObjectStorage::getRemotePathsRecursive(const String & local_path, std::
{
it = iterateDirectory(local_path);
}
catch (const Exception & e)
{
/// Unfortunately in rare cases it can happen when files disappear
/// or can be empty in case of operation interruption (like cancelled metadata fetch)
if (e.code() == ErrorCodes::FILE_DOESNT_EXIST ||
e.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF ||
e.code() == ErrorCodes::CANNOT_READ_ALL_DATA)
return;
}
catch (const fs::filesystem_error & e)
{
if (e.code() == std::errc::no_such_file_or_directory)
@ -237,8 +253,11 @@ void DiskObjectStorage::moveFile(const String & from_path, const String & to_pat
metadata_helper->createFileOperationObject("rename", revision, object_metadata);
}
{
std::unique_lock lock(metadata_mutex);
metadata_disk->moveFile(from_path, to_path);
}
}
void DiskObjectStorage::moveFile(const String & from_path, const String & to_path)
{
@ -449,6 +468,8 @@ void DiskObjectStorage::removeMetadata(const String & path, std::vector<String>
LOG_WARNING(log,
"Metadata file {} can't be read by reason: {}. Removing it forcibly.",
backQuote(path), e.nested() ? e.nested()->message() : e.message());
std::unique_lock lock(metadata_mutex);
metadata_disk->removeFile(path);
}
else

View File

@ -65,7 +65,7 @@ public:
Metadata readMetadata(const String & path) const;
Metadata readMetadataUnlocked(const String & path, std::shared_lock<std::shared_mutex> &) const;
Metadata readUpdateAndStoreMetadata(const String & path, bool sync, MetadataUpdater updater);
Metadata readUpdateStoreMetadataAndRemove(const String & path, bool sync, MetadataUpdater updater);
void readUpdateStoreMetadataAndRemove(const String & path, bool sync, MetadataUpdater updater);
Metadata readOrCreateUpdateAndStoreMetadata(const String & path, WriteMode mode, bool sync, MetadataUpdater updater);

View File

@ -10,7 +10,10 @@ namespace ErrorCodes
{
extern const int UNKNOWN_FORMAT;
extern const int PATH_ACCESS_DENIED;
extern const int MEMORY_LIMIT_EXCEEDED;
extern const int FILE_DOESNT_EXIST;
extern const int ATTEMPT_TO_READ_AFTER_EOF;
extern const int CANNOT_READ_ALL_DATA;
extern const int CANNOT_OPEN_FILE;
}
DiskObjectStorageMetadata DiskObjectStorageMetadata::readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_)
@ -45,16 +48,38 @@ DiskObjectStorageMetadata DiskObjectStorageMetadata::createUpdateAndStoreMetadat
return result;
}
DiskObjectStorageMetadata DiskObjectStorageMetadata::readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater)
void DiskObjectStorageMetadata::readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, DiskObjectStorageMetadataUpdater updater)
{
DiskObjectStorageMetadata result(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
result.load();
if (updater(result))
result.save(sync);
/// Very often we are deleting metadata from some unfinished operation (like fetch of metadata)
/// in this case metadata file can be incomplete/empty and so on. It's ok to remove it in this case
/// because we cannot do anything better.
try
{
DiskObjectStorageMetadata metadata(remote_fs_root_path_, metadata_disk_, metadata_file_path_);
metadata.load();
if (updater(metadata))
metadata.save(sync);
metadata_disk_->removeFile(metadata_file_path_);
}
catch (Exception & ex)
{
/// If we have some broken half-empty file just remove it
if (ex.code() == ErrorCodes::ATTEMPT_TO_READ_AFTER_EOF
|| ex.code() == ErrorCodes::CANNOT_READ_ALL_DATA
|| ex.code() == ErrorCodes::CANNOT_OPEN_FILE)
{
LOG_INFO(&Poco::Logger::get("ObjectStorageMetadata"), "Failed to read metadata file {} before removal because it's incomplete or empty. "
"It's Ok and can happen after operation interruption (like metadata fetch), so removing as is", metadata_file_path_);
metadata_disk_->removeFile(metadata_file_path_);
}
return result;
/// If file already removed, than nothing to do
if (ex.code() == ErrorCodes::FILE_DOESNT_EXIST)
return;
throw;
}
}
DiskObjectStorageMetadata DiskObjectStorageMetadata::createAndStoreMetadataIfNotExists(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, bool overwrite)
@ -73,8 +98,6 @@ DiskObjectStorageMetadata DiskObjectStorageMetadata::createAndStoreMetadataIfNot
}
void DiskObjectStorageMetadata::load()
{
try
{
const ReadSettings read_settings;
auto buf = metadata_disk->readFile(metadata_file_path, read_settings, 1024); /* reasonable buffer size for small file */
@ -127,19 +150,6 @@ void DiskObjectStorageMetadata::load()
assertChar('\n', *buf);
}
}
catch (Exception & e)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
if (e.code() == ErrorCodes::UNKNOWN_FORMAT)
throw;
if (e.code() == ErrorCodes::MEMORY_LIMIT_EXCEEDED)
throw;
throw Exception("Failed to read metadata file: " + metadata_file_path, ErrorCodes::UNKNOWN_FORMAT);
}
}
/// Load metadata by path or create empty if `create` flag is set.
DiskObjectStorageMetadata::DiskObjectStorageMetadata(

View File

@ -47,7 +47,7 @@ struct DiskObjectStorageMetadata
static DiskObjectStorageMetadata readMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_);
static DiskObjectStorageMetadata readUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
static DiskObjectStorageMetadata readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
static void readUpdateStoreMetadataAndRemove(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);
static DiskObjectStorageMetadata createAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync);
static DiskObjectStorageMetadata createUpdateAndStoreMetadata(const String & remote_fs_root_path_, DiskPtr metadata_disk_, const String & metadata_file_path_, bool sync, Updater updater);

View File

@ -66,6 +66,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.csv.null_representation = settings.format_csv_null_representation;
format_settings.csv.input_format_arrays_as_nested_csv = settings.input_format_csv_arrays_as_nested_csv;
format_settings.csv.input_format_use_best_effort_in_schema_inference = settings.input_format_csv_use_best_effort_in_schema_inference;
format_settings.csv.skip_first_lines = settings.input_format_csv_skip_first_lines;
format_settings.hive_text.fields_delimiter = settings.input_format_hive_text_fields_delimiter;
format_settings.hive_text.collection_items_delimiter = settings.input_format_hive_text_collection_items_delimiter;
format_settings.hive_text.map_keys_delimiter = settings.input_format_hive_text_map_keys_delimiter;
@ -123,6 +124,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.tsv.input_format_enum_as_number = settings.input_format_tsv_enum_as_number;
format_settings.tsv.null_representation = settings.format_tsv_null_representation;
format_settings.tsv.input_format_use_best_effort_in_schema_inference = settings.input_format_tsv_use_best_effort_in_schema_inference;
format_settings.tsv.skip_first_lines = settings.input_format_tsv_skip_first_lines;
format_settings.values.accurate_types_of_literals = settings.input_format_values_accurate_types_of_literals;
format_settings.values.deduce_templates_of_expressions = settings.input_format_values_deduce_templates_of_expressions;
format_settings.values.interpret_expressions = settings.input_format_values_interpret_expressions;
@ -584,6 +586,13 @@ bool FormatFactory::checkIfFormatHasAnySchemaReader(const String & name) const
return checkIfFormatHasSchemaReader(name) || checkIfFormatHasExternalSchemaReader(name);
}
void FormatFactory::checkFormatName(const String & name) const
{
auto it = dict.find(name);
if (it == dict.end())
throw Exception("Unknown format " + name, ErrorCodes::UNKNOWN_FORMAT);
}
FormatFactory & FormatFactory::instance()
{
static FormatFactory ret;

View File

@ -210,6 +210,9 @@ public:
bool isInputFormat(const String & name) const;
bool isOutputFormat(const String & name) const;
/// Check that format with specified name exists and throw an exception otherwise.
void checkFormatName(const String & name) const;
private:
FormatsDictionary dict;
FileExtensionFormats file_extension_formats;

View File

@ -109,6 +109,7 @@ struct FormatSettings
String null_representation = "\\N";
char tuple_delimiter = ',';
bool input_format_use_best_effort_in_schema_inference = true;
UInt64 skip_first_lines = 0;
} csv;
struct HiveText
@ -219,6 +220,7 @@ struct FormatSettings
String null_representation = "\\N";
bool input_format_enum_as_number = false;
bool input_format_use_best_effort_in_schema_inference = true;
UInt64 skip_first_lines = 0;
} tsv;
struct

View File

@ -0,0 +1,12 @@
#include "FunctionShowCertificate.h"
#include <Functions/FunctionFactory.h>
namespace DB
{
void registerFunctionShowCertificate(FunctionFactory & factory)
{
factory.registerFunction<FunctionShowCertificate>();
}
}

View File

@ -0,0 +1,189 @@
#pragma once
#include <Common/config.h>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context.h>
#if USE_SSL
#include <openssl/x509v3.h>
#include "Poco/Net/SSLManager.h"
#include "Poco/Crypto/X509Certificate.h"
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
}
// showCertificate()
class FunctionShowCertificate : public IFunction
{
public:
static constexpr auto name = "showCertificate";
static FunctionPtr create(ContextPtr)
{
#if !defined(USE_SSL) || USE_SSL == 0
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support is disabled");
#endif
return std::make_shared<FunctionShowCertificate>();
}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo &) const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName &) const override
{
return std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>());
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
MutableColumnPtr keys = DataTypeString().createColumn();
MutableColumnPtr values = DataTypeString().createColumn();
MutableColumnPtr offsets = DataTypeNumber<IColumn::Offset>().createColumn();
if (input_rows_count)
{
#if USE_SSL
if (const X509 * cert = SSL_CTX_get0_certificate(Poco::Net::SSLManager::instance().defaultServerContext()->sslContext()))
{
BIO * b = BIO_new(BIO_s_mem());
SCOPE_EXIT(
{
BIO_free(b);
});
keys->insert("version");
values->insert(std::to_string(X509_get_version(cert) + 1));
{
char buf[1024] = {0};
const ASN1_INTEGER * sn = cert->cert_info->serialNumber;
BIGNUM * bnsn = ASN1_INTEGER_to_BN(sn, nullptr);
SCOPE_EXIT(
{
BN_free(bnsn);
});
if (BN_print(b, bnsn) > 0 && BIO_read(b, buf, sizeof(buf)) > 0)
{
keys->insert("serial_number");
values->insert(buf);
}
}
{
const ASN1_BIT_STRING *sig = nullptr;
const X509_ALGOR *al = nullptr;
char buf[1024] = {0};
X509_get0_signature(&sig, &al, cert);
if (al)
{
OBJ_obj2txt(buf, sizeof(buf), al->algorithm, 0);
keys->insert("signature_algo");
values->insert(buf);
}
}
char * issuer = X509_NAME_oneline(cert->cert_info->issuer, nullptr, 0);
if (issuer)
{
SCOPE_EXIT(
{
OPENSSL_free(issuer);
});
keys->insert("issuer");
values->insert(issuer);
}
{
char buf[1024] = {0};
if (ASN1_TIME_print(b, X509_get_notBefore(cert)) && BIO_read(b, buf, sizeof(buf)) > 0)
{
keys->insert("not_before");
values->insert(buf);
}
}
{
char buf[1024] = {0};
if (ASN1_TIME_print(b, X509_get_notAfter(cert)) && BIO_read(b, buf, sizeof(buf)) > 0)
{
keys->insert("not_after");
values->insert(buf);
}
}
char * subject = X509_NAME_oneline(cert->cert_info->subject, nullptr, 0);
if (subject)
{
SCOPE_EXIT(
{
OPENSSL_free(subject);
});
keys->insert("subject");
values->insert(subject);
}
if (X509_PUBKEY * pkey = X509_get_X509_PUBKEY(cert))
{
char buf[1024] = {0};
ASN1_OBJECT *ppkalg = nullptr;
const unsigned char *pk = nullptr;
int ppklen = 0;
X509_ALGOR *pa = nullptr;
if (X509_PUBKEY_get0_param(&ppkalg, &pk, &ppklen, &pa, pkey) &&
i2a_ASN1_OBJECT(b, ppkalg) > 0 && BIO_read(b, buf, sizeof(buf)) > 0)
{
keys->insert("pkey_algo");
values->insert(buf);
}
}
}
offsets->insert(keys->size());
#endif
}
size_t sz = keys->size();
if (sz && input_rows_count > 1)
{
keys->reserve(sz * input_rows_count);
values->reserve(sz * input_rows_count);
offsets->reserve(input_rows_count);
}
for (size_t i = 1; i < input_rows_count; ++i)
{
for (size_t j = 0; j < sz; ++j)
{
keys->insertFrom(*keys, j);
values->insertFrom(*values, j);
}
offsets->insert(keys->size());
}
auto nested_column = ColumnArray::create(
ColumnTuple::create(Columns{std::move(keys), std::move(values)}), std::move(offsets));
return ColumnMap::create(nested_column);
}
};
}

View File

@ -448,7 +448,7 @@ public:
class SplitByRegexpImpl
{
private:
Regexps::Pool::Pointer re;
Regexps::RegexpPtr re;
OptimizedRegularExpression::MatchVec matches;
Pos pos;
@ -477,7 +477,7 @@ public:
ErrorCodes::ILLEGAL_COLUMN);
if (!col->getValue<String>().empty())
re = Regexps::get<false, false, false>(col->getValue<String>());
re = std::make_shared<Regexps::Regexp>(Regexps::createRegexp<false, false, false>(col->getValue<String>()));
}
@ -532,7 +532,7 @@ public:
class ExtractAllImpl
{
private:
Regexps::Pool::Pointer re;
Regexps::RegexpPtr re;
OptimizedRegularExpression::MatchVec matches;
size_t capture;
@ -560,7 +560,7 @@ public:
+ " of first argument of function " + getName() + ". Must be constant string.",
ErrorCodes::ILLEGAL_COLUMN);
re = Regexps::get<false, false, false>(col->getValue<String>());
re = std::make_shared<Regexps::Regexp>(Regexps::createRegexp<false, false, false>(col->getValue<String>()));
capture = re->getNumberOfSubpatterns() > 0 ? 1 : 0;
matches.resize(capture + 1);

View File

@ -18,8 +18,6 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int LOGICAL_ERROR;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace impl
@ -112,16 +110,14 @@ struct MatchImpl
const ColumnString::Chars & haystack_data,
const ColumnString::Offsets & haystack_offsets,
const String & needle,
const ColumnPtr & start_pos_,
[[maybe_unused]] const ColumnPtr & start_pos_,
PaddedPODArray<UInt8> & res)
{
const size_t haystack_size = haystack_offsets.size();
if (haystack_size != res.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Function '{}' unexpectedly received a different number of haystacks and results", name);
assert(haystack_size == res.size());
if (start_pos_ != nullptr)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Function '{}' doesn't support start_pos argument", name);
assert(start_pos_ == nullptr);
if (haystack_offsets.empty())
return;
@ -166,17 +162,17 @@ struct MatchImpl
}
else
{
auto regexp = Regexps::get<is_like, true, case_insensitive>(needle);
const auto & regexp = Regexps::Regexp(Regexps::createRegexp<is_like, /*no_capture*/ true, case_insensitive>(needle));
String required_substring;
bool is_trivial;
bool required_substring_is_prefix; /// for `anchored` execution of the regexp.
regexp->getAnalyzeResult(required_substring, is_trivial, required_substring_is_prefix);
regexp.getAnalyzeResult(required_substring, is_trivial, required_substring_is_prefix);
if (required_substring.empty())
{
if (!regexp->getRE2()) /// An empty regexp. Always matches.
if (!regexp.getRE2()) /// An empty regexp. Always matches.
{
if (haystack_size)
memset(res.data(), !negate, haystack_size * sizeof(res[0]));
@ -186,7 +182,7 @@ struct MatchImpl
size_t prev_offset = 0;
for (size_t i = 0; i < haystack_size; ++i)
{
const bool match = regexp->getRE2()->Match(
const bool match = regexp.getRE2()->Match(
{reinterpret_cast<const char *>(&haystack_data[prev_offset]), haystack_offsets[i] - prev_offset - 1},
0,
haystack_offsets[i] - prev_offset - 1,
@ -241,7 +237,7 @@ struct MatchImpl
const size_t start_pos = (required_substring_is_prefix) ? (reinterpret_cast<const char *>(pos) - str_data) : 0;
const size_t end_pos = str_size;
const bool match = regexp->getRE2()->Match(
const bool match = regexp.getRE2()->Match(
{str_data, str_size},
start_pos,
end_pos,
@ -274,8 +270,7 @@ struct MatchImpl
{
const size_t haystack_size = haystack.size() / N;
if (haystack_size != res.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Function '{}' unexpectedly received a different number of haystacks and results", name);
assert(haystack_size == res.size());
if (haystack.empty())
return;
@ -325,17 +320,17 @@ struct MatchImpl
}
else
{
auto regexp = Regexps::get<is_like, true, case_insensitive>(needle);
const auto & regexp = Regexps::Regexp(Regexps::createRegexp<is_like, /*no_capture*/ true, case_insensitive>(needle));
String required_substring;
bool is_trivial;
bool required_substring_is_prefix; /// for `anchored` execution of the regexp.
regexp->getAnalyzeResult(required_substring, is_trivial, required_substring_is_prefix);
regexp.getAnalyzeResult(required_substring, is_trivial, required_substring_is_prefix);
if (required_substring.empty())
{
if (!regexp->getRE2()) /// An empty regexp. Always matches.
if (!regexp.getRE2()) /// An empty regexp. Always matches.
{
if (haystack_size)
memset(res.data(), !negate, haystack_size * sizeof(res[0]));
@ -345,7 +340,7 @@ struct MatchImpl
size_t offset = 0;
for (size_t i = 0; i < haystack_size; ++i)
{
const bool match = regexp->getRE2()->Match(
const bool match = regexp.getRE2()->Match(
{reinterpret_cast<const char *>(&haystack[offset]), N},
0,
N,
@ -403,7 +398,7 @@ struct MatchImpl
const size_t start_pos = (required_substring_is_prefix) ? (reinterpret_cast<const char *>(pos) - str_data) : 0;
const size_t end_pos = N;
const bool match = regexp->getRE2()->Match(
const bool match = regexp.getRE2()->Match(
{str_data, N},
start_pos,
end_pos,
@ -433,16 +428,15 @@ struct MatchImpl
const ColumnString::Offsets & haystack_offsets,
const ColumnString::Chars & needle_data,
const ColumnString::Offsets & needle_offset,
const ColumnPtr & start_pos_,
[[maybe_unused]] const ColumnPtr & start_pos_,
PaddedPODArray<UInt8> & res)
{
const size_t haystack_size = haystack_offsets.size();
if (haystack_size != needle_offset.size() || haystack_size != res.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Function '{}' unexpectedly received a different number of haystacks, needles and results", name);
assert(haystack_size == needle_offset.size());
assert(haystack_size == res.size());
if (start_pos_ != nullptr)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Function '{}' doesn't support start_pos argument", name);
assert(start_pos_ == nullptr);
if (haystack_offsets.empty())
return;
@ -454,6 +448,9 @@ struct MatchImpl
size_t prev_haystack_offset = 0;
size_t prev_needle_offset = 0;
Regexps::LocalCacheTable cache;
Regexps::RegexpPtr regexp;
for (size_t i = 0; i < haystack_size; ++i)
{
const auto * const cur_haystack_data = &haystack_data[prev_haystack_offset];
@ -479,22 +476,19 @@ struct MatchImpl
}
else
{
// each row is expected to contain a different like/re2 pattern
// --> bypass the regexp cache, instead construct the pattern on-the-fly
const int flags = Regexps::buildRe2Flags</*no_capture*/ true, case_insensitive>();
const auto & regexp = Regexps::Regexp(Regexps::createRegexp<is_like>(needle, flags));
cache.getOrSet<is_like, /*no_capture*/ true, case_insensitive>(needle, regexp);
regexp.getAnalyzeResult(required_substr, is_trivial, required_substring_is_prefix);
regexp->getAnalyzeResult(required_substr, is_trivial, required_substring_is_prefix);
if (required_substr.empty())
{
if (!regexp.getRE2()) /// An empty regexp. Always matches.
if (!regexp->getRE2()) /// An empty regexp. Always matches.
{
res[i] = !negate;
}
else
{
const bool match = regexp.getRE2()->Match(
const bool match = regexp->getRE2()->Match(
{reinterpret_cast<const char *>(cur_haystack_data), cur_haystack_length},
0,
cur_haystack_length,
@ -524,7 +518,7 @@ struct MatchImpl
const size_t start_pos = (required_substring_is_prefix) ? (match - cur_haystack_data) : 0;
const size_t end_pos = cur_haystack_length;
const bool match2 = regexp.getRE2()->Match(
const bool match2 = regexp->getRE2()->Match(
{reinterpret_cast<const char *>(cur_haystack_data), cur_haystack_length},
start_pos,
end_pos,
@ -547,16 +541,15 @@ struct MatchImpl
size_t N,
const ColumnString::Chars & needle_data,
const ColumnString::Offsets & needle_offset,
const ColumnPtr & start_pos_,
[[maybe_unused]] const ColumnPtr & start_pos_,
PaddedPODArray<UInt8> & res)
{
const size_t haystack_size = haystack.size()/N;
if (haystack_size != needle_offset.size() || haystack_size != res.size())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Function '{}' unexpectedly received a different number of haystacks, needles and results", name);
assert(haystack_size == needle_offset.size());
assert(haystack_size == res.size());
if (start_pos_ != nullptr)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Function '{}' doesn't support start_pos argument", name);
assert(start_pos_ == nullptr);
if (haystack.empty())
return;
@ -568,6 +561,9 @@ struct MatchImpl
size_t prev_haystack_offset = 0;
size_t prev_needle_offset = 0;
Regexps::LocalCacheTable cache;
Regexps::RegexpPtr regexp;
for (size_t i = 0; i < haystack_size; ++i)
{
const auto * const cur_haystack_data = &haystack[prev_haystack_offset];
@ -593,22 +589,19 @@ struct MatchImpl
}
else
{
// each row is expected to contain a different like/re2 pattern
// --> bypass the regexp cache, instead construct the pattern on-the-fly
const int flags = Regexps::buildRe2Flags</*no_capture*/ true, case_insensitive>();
const auto & regexp = Regexps::Regexp(Regexps::createRegexp<is_like>(needle, flags));
cache.getOrSet<is_like, /*no_capture*/ true, case_insensitive>(needle, regexp);
regexp.getAnalyzeResult(required_substr, is_trivial, required_substring_is_prefix);
regexp->getAnalyzeResult(required_substr, is_trivial, required_substring_is_prefix);
if (required_substr.empty())
{
if (!regexp.getRE2()) /// An empty regexp. Always matches.
if (!regexp->getRE2()) /// An empty regexp. Always matches.
{
res[i] = !negate;
}
else
{
const bool match = regexp.getRE2()->Match(
const bool match = regexp->getRE2()->Match(
{reinterpret_cast<const char *>(cur_haystack_data), cur_haystack_length},
0,
cur_haystack_length,
@ -638,7 +631,7 @@ struct MatchImpl
const size_t start_pos = (required_substring_is_prefix) ? (match - cur_haystack_data) : 0;
const size_t end_pos = cur_haystack_length;
const bool match2 = regexp.getRE2()->Match(
const bool match2 = regexp->getRE2()->Match(
{reinterpret_cast<const char *>(cur_haystack_data), cur_haystack_length},
start_pos,
end_pos,

View File

@ -9,7 +9,6 @@
#include <vector>
#include <Functions/likePatternToRegexp.h>
#include <Common/Exception.h>
#include <Common/ObjectPool.h>
#include <Common/OptimizedRegularExpression.h>
#include <Common/ProfileEvents.h>
#include <Common/config.h>
@ -39,46 +38,74 @@ namespace ErrorCodes
namespace Regexps
{
using Regexp = OptimizedRegularExpressionSingleThreaded;
using Pool = ObjectPoolMap<Regexp, String>;
using RegexpPtr = std::shared_ptr<Regexp>;
template <bool like>
inline Regexp createRegexp(const std::string & pattern, int flags)
{
if constexpr (like)
return {likePatternToRegexp(pattern), flags};
else
return {pattern, flags};
}
template<bool no_capture, bool case_insensitive>
inline int buildRe2Flags()
template <bool like, bool no_capture, bool case_insensitive>
inline Regexp createRegexp(const std::string & pattern)
{
int flags = OptimizedRegularExpression::RE_DOT_NL;
if constexpr (no_capture)
flags |= OptimizedRegularExpression::RE_NO_CAPTURE;
if constexpr (case_insensitive)
flags |= OptimizedRegularExpression::RE_CASELESS;
return flags;
if constexpr (like)
return {likePatternToRegexp(pattern), flags};
else
return {pattern, flags};
}
/// Caches compiled re2 objects for given string patterns. Intended to support the common situation of a small set of patterns which are
/// evaluated over and over within the same query. In these situations, usage of the cache will save unnecessary pattern re-compilation.
/// However, we must be careful that caching does not add too much static overhead to overall pattern evaluation. Therefore, the cache is
/// intentionally very lightweight: a) no thread-safety/mutexes, b) small & fixed capacity, c) no collision list, d) but also no open
/// addressing, instead collisions simply replace the existing element.
class LocalCacheTable
{
public:
using RegexpPtr = std::shared_ptr<Regexp>;
LocalCacheTable()
: known_regexps(max_regexp_cache_size, {"", nullptr})
{
}
/** Returns holder of an object from Pool.
* You must hold the ownership while using the object.
* In destructor, it returns the object back to the Pool for further reuse.
*/
template <bool like, bool no_capture, bool case_insensitive>
inline Pool::Pointer get(const std::string & pattern)
void getOrSet(const String & pattern, RegexpPtr & regexp)
{
/// the Singleton is thread-safe in C++11
static Pool known_regexps; /// Different variables for different pattern parameters.
StringAndRegexp & bucket = known_regexps[hasher(pattern) % max_regexp_cache_size];
return known_regexps.get(pattern, [&pattern]
if (likely(bucket.regexp != nullptr))
{
const int flags = buildRe2Flags<no_capture, case_insensitive>();
ProfileEvents::increment(ProfileEvents::RegexpCreated);
return new Regexp{createRegexp<like>(pattern, flags)};
});
if (pattern == bucket.pattern)
regexp = bucket.regexp;
else
{
regexp = std::make_shared<Regexp>(createRegexp<like, no_capture, case_insensitive>(pattern));
bucket = {pattern, regexp};
}
}
else
{
regexp = std::make_shared<Regexp>(createRegexp<like, no_capture, case_insensitive>(pattern));
bucket = {pattern, regexp};
}
}
private:
std::hash<std::string> hasher;
struct StringAndRegexp
{
std::string pattern;
RegexpPtr regexp;
};
using CacheTable = std::vector<StringAndRegexp>;
CacheTable known_regexps;
constexpr static size_t max_regexp_cache_size = 100; // collision probability
};
}
#if USE_HYPERSCAN
@ -286,6 +313,7 @@ namespace MultiRegexps
lock.unlock();
return it->second();
}
}
#endif // USE_HYPERSCAN

View File

@ -112,7 +112,6 @@ public:
auto is_not_null = FunctionFactory::instance().get("isNotNull", context);
auto assume_not_null = FunctionFactory::instance().get("assumeNotNull", context);
auto multi_if = FunctionFactory::instance().get("multiIf", context);
ColumnsWithTypeAndName multi_if_args;
ColumnsWithTypeAndName tmp_args(1);
@ -144,7 +143,16 @@ public:
if (multi_if_args.size() == 1)
return multi_if_args.front().column;
ColumnPtr res = multi_if->build(multi_if_args)->execute(multi_if_args, result_type, input_rows_count);
/// If there was only two arguments (3 arguments passed to multiIf)
/// use function "if" instead, because it's implemented more efficient.
/// TODO: make "multiIf" the same efficient.
FunctionOverloadResolverPtr if_function;
if (multi_if_args.size() == 3)
if_function = FunctionFactory::instance().get("if", context);
else
if_function = FunctionFactory::instance().get("multiIf", context);
ColumnPtr res = if_function->build(multi_if_args)->execute(multi_if_args, result_type, input_rows_count);
/// if last argument is not nullable, result should be also not nullable
if (!multi_if_args.back().column->isNullable() && res->isNullable())

View File

@ -55,7 +55,7 @@ public:
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
const ColumnConst * column_pattern = checkAndGetColumnConstStringOrFixedString(arguments[1].column.get());
Regexps::Pool::Pointer re = Regexps::get<false /* like */, true /* is_no_capture */, CountMatchesBase::case_insensitive>(column_pattern->getValue<String>());
const Regexps::Regexp re = Regexps::createRegexp</*is_like*/ false, /*no_capture*/ true, CountMatchesBase::case_insensitive>(column_pattern->getValue<String>());
OptimizedRegularExpression::MatchVec matches;
const IColumn * column_haystack = arguments[0].column.get();
@ -95,7 +95,7 @@ public:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Error in FunctionCountMatches::getReturnTypeImpl()");
}
static uint64_t countMatches(StringRef src, Regexps::Pool::Pointer & re, OptimizedRegularExpression::MatchVec & matches)
static uint64_t countMatches(StringRef src, const Regexps::Regexp & re, OptimizedRegularExpression::MatchVec & matches)
{
/// Only one match is required, no need to copy more.
static const unsigned matches_limit = 1;
@ -108,7 +108,7 @@ public:
{
if (pos >= end)
break;
if (!re->match(pos, end - pos, matches, matches_limit))
if (!re.match(pos, end - pos, matches, matches_limit))
break;
/// Progress should be made, but with empty match the progress will not be done.
/// Also note that simply check is pattern empty is not enough,

View File

@ -21,9 +21,9 @@ struct ExtractImpl
res_data.reserve(data.size() / 5);
res_offsets.resize(offsets.size());
const auto & regexp = Regexps::get<false, false, false>(pattern);
const Regexps::Regexp regexp = Regexps::createRegexp<false, false, false>(pattern);
unsigned capture = regexp->getNumberOfSubpatterns() > 0 ? 1 : 0;
unsigned capture = regexp.getNumberOfSubpatterns() > 0 ? 1 : 0;
OptimizedRegularExpression::MatchVec matches;
matches.reserve(capture + 1);
size_t prev_offset = 0;
@ -34,7 +34,7 @@ struct ExtractImpl
size_t cur_offset = offsets[i];
unsigned count
= regexp->match(reinterpret_cast<const char *>(&data[prev_offset]), cur_offset - prev_offset - 1, matches, capture + 1);
= regexp.match(reinterpret_cast<const char *>(&data[prev_offset]), cur_offset - prev_offset - 1, matches, capture + 1);
if (count > capture && matches[capture].offset != std::string::npos)
{
const auto & match = matches[capture];

View File

@ -95,8 +95,8 @@ public:
throw Exception("Length of 'needle' argument must be greater than 0.", ErrorCodes::BAD_ARGUMENTS);
using StringPiece = typename Regexps::Regexp::StringPieceType;
auto holder = Regexps::get<false, false, false>(needle);
const auto & regexp = holder->getRE2();
const Regexps::Regexp holder = Regexps::createRegexp<false, false, false>(needle);
const auto & regexp = holder.getRE2();
if (!regexp)
throw Exception("There are no groups in regexp: " + needle, ErrorCodes::BAD_ARGUMENTS);

View File

@ -63,8 +63,8 @@ public:
if (needle.empty())
throw Exception(getName() + " length of 'needle' argument must be greater than 0.", ErrorCodes::BAD_ARGUMENTS);
auto regexp = Regexps::get<false, false, false>(needle);
const auto & re2 = regexp->getRE2();
const Regexps::Regexp regexp = Regexps::createRegexp<false, false, false>(needle);
const auto & re2 = regexp.getRE2();
if (!re2)
throw Exception("There are no groups in regexp: " + needle, ErrorCodes::BAD_ARGUMENTS);

165
src/Functions/grouping.h Normal file
View File

@ -0,0 +1,165 @@
#pragma once
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnFixedString.h>
#include <Core/ColumnNumbers.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
namespace DB
{
class FunctionGroupingBase : public IFunction
{
protected:
static constexpr UInt64 ONE = 1;
const ColumnNumbers arguments_indexes;
public:
FunctionGroupingBase(ColumnNumbers arguments_indexes_)
: arguments_indexes(std::move(arguments_indexes_))
{}
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
bool useDefaultImplementationForNulls() const override { return false; }
bool isSuitableForConstantFolding() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeUInt64>();
}
template <typename AggregationKeyChecker>
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, size_t input_rows_count, AggregationKeyChecker checker) const
{
const auto * grouping_set_column = checkAndGetColumn<ColumnUInt64>(arguments[0].column.get());
auto result = ColumnUInt64::create();
auto & result_data = result->getData();
result_data.reserve(input_rows_count);
for (size_t i = 0; i < input_rows_count; ++i)
{
UInt64 set_index = grouping_set_column->getElement(i);
UInt64 value = 0;
for (auto index : arguments_indexes)
value = (value << 1) + (checker(set_index, index) ? 1 : 0);
result_data.push_back(value);
}
return result;
}
};
class FunctionGroupingOrdinary : public FunctionGroupingBase
{
public:
explicit FunctionGroupingOrdinary(ColumnNumbers arguments_indexes_)
: FunctionGroupingBase(std::move(arguments_indexes_))
{}
String getName() const override { return "groupingOrdinary"; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName &, const DataTypePtr &, size_t input_rows_count) const override
{
UInt64 value = (ONE << arguments_indexes.size()) - 1;
return ColumnUInt64::create(input_rows_count, value);
}
};
class FunctionGroupingForRollup : public FunctionGroupingBase
{
const UInt64 aggregation_keys_number;
public:
FunctionGroupingForRollup(ColumnNumbers arguments_indexes_, UInt64 aggregation_keys_number_)
: FunctionGroupingBase(std::move(arguments_indexes_))
, aggregation_keys_number(aggregation_keys_number_)
{}
String getName() const override { return "groupingForRollup"; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
return FunctionGroupingBase::executeImpl(arguments, input_rows_count,
[this](UInt64 set_index, UInt64 arg_index)
{
// For ROLLUP(a, b, c) there will be following grouping set indexes:
// | GROUPING SET | INDEX |
// | (a, b, c) | 0 |
// | (a, b) | 1 |
// | (a) | 2 |
// | () | 3 |
return arg_index < aggregation_keys_number - set_index;
}
);
}
};
class FunctionGroupingForCube : public FunctionGroupingBase
{
const UInt64 aggregation_keys_number;
public:
FunctionGroupingForCube(ColumnNumbers arguments_indexes_, UInt64 aggregation_keys_number_)
: FunctionGroupingBase(arguments_indexes_)
, aggregation_keys_number(aggregation_keys_number_)
{}
String getName() const override { return "groupingForCube"; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
return FunctionGroupingBase::executeImpl(arguments, input_rows_count,
[this](UInt64 set_index, UInt64 arg_index)
{
// For CUBE(a, b) there will be following grouping set indexes:
// | GROUPING SET | INDEX |
// | (a, b) | 0 |
// | (a) | 1 |
// | (b) | 2 |
// | () | 3 |
auto set_mask = (ONE << aggregation_keys_number) - 1 - set_index;
return set_mask & (ONE << (aggregation_keys_number - arg_index - 1));
}
);
}
};
class FunctionGroupingForGroupingSets : public FunctionGroupingBase
{
ColumnNumbersSetList grouping_sets;
public:
FunctionGroupingForGroupingSets(ColumnNumbers arguments_indexes_, ColumnNumbersList const & grouping_sets_)
: FunctionGroupingBase(std::move(arguments_indexes_))
{
for (auto const & set : grouping_sets_)
grouping_sets.emplace_back(set.begin(), set.end());
}
String getName() const override { return "groupingForGroupingSets"; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
return FunctionGroupingBase::executeImpl(arguments, input_rows_count,
[this](UInt64 set_index, UInt64 arg_index)
{
return grouping_sets[set_index].contains(arg_index);
}
);
}
};
}

View File

@ -68,7 +68,7 @@ void registerFunctionEncrypt(FunctionFactory & factory);
void registerFunctionDecrypt(FunctionFactory & factory);
void registerFunctionAESEncryptMysql(FunctionFactory & factory);
void registerFunctionAESDecryptMysql(FunctionFactory & factory);
void registerFunctionShowCertificate(FunctionFactory &);
#endif
void registerFunctions()
@ -135,6 +135,7 @@ void registerFunctions()
registerFunctionDecrypt(factory);
registerFunctionAESEncryptMysql(factory);
registerFunctionAESDecryptMysql(factory);
registerFunctionShowCertificate(factory);
#endif
registerFunctionTid(factory);
registerFunctionLogTrace(factory);

View File

@ -765,9 +765,9 @@ namespace S3
const String & force_region,
const RemoteHostFilter & remote_host_filter,
unsigned int s3_max_redirects,
bool enable_s3_requestrs_logging)
bool enable_s3_requests_logging)
{
return PocoHTTPClientConfiguration(force_region, remote_host_filter, s3_max_redirects, enable_s3_requestrs_logging);
return PocoHTTPClientConfiguration(force_region, remote_host_filter, s3_max_redirects, enable_s3_requests_logging);
}
URI::URI(const Poco::URI & uri_)

View File

@ -45,7 +45,7 @@ public:
const String & force_region,
const RemoteHostFilter & remote_host_filter,
unsigned int s3_max_redirects,
bool enable_s3_requestrs_logging);
bool enable_s3_requests_logging);
private:
ClientFactory();

View File

@ -39,7 +39,7 @@ void ActionsDAG::Node::toTree(JSONBuilder::JSONMap & map) const
map.add("Result Type", result_type->getName());
if (!result_name.empty())
map.add("Result Type", magic_enum::enum_name(type));
map.add("Result Name", result_name);
if (column)
map.add("Column", column->getName());

View File

@ -1,6 +1,12 @@
#include <memory>
#include <Common/quoteString.h>
#include <Common/typeid_cast.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnFixedString.h>
#include <Core/ColumnNumbers.h>
#include <Core/ColumnWithTypeAndName.h>
#include <Functions/grouping.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionsMiscellaneous.h>
@ -8,10 +14,12 @@
#include <DataTypes/DataTypeSet.h>
#include <DataTypes/DataTypeFunction.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/FieldToDataType.h>
#include <Columns/ColumnSet.h>
@ -56,6 +64,9 @@ namespace ErrorCodes
extern const int INCORRECT_ELEMENT_OF_SET;
extern const int BAD_ARGUMENTS;
extern const int DUPLICATE_COLUMN;
extern const int LOGICAL_ERROR;
extern const int TOO_FEW_ARGUMENTS_FOR_FUNCTION;
extern const int TOO_MANY_ARGUMENTS_FOR_FUNCTION;
}
static NamesAndTypesList::iterator findColumn(const String & name, NamesAndTypesList & cols)
@ -459,10 +470,18 @@ public:
};
ActionsMatcher::Data::Data(
ContextPtr context_, SizeLimits set_size_limit_, size_t subquery_depth_,
const NamesAndTypesList & source_columns_, ActionsDAGPtr actions_dag,
PreparedSets & prepared_sets_, SubqueriesForSets & subqueries_for_sets_,
bool no_subqueries_, bool no_makeset_, bool only_consts_, bool create_source_for_in_)
ContextPtr context_,
SizeLimits set_size_limit_,
size_t subquery_depth_,
std::reference_wrapper<const NamesAndTypesList> source_columns_,
ActionsDAGPtr actions_dag,
PreparedSets & prepared_sets_,
SubqueriesForSets & subqueries_for_sets_,
bool no_subqueries_,
bool no_makeset_,
bool only_consts_,
bool create_source_for_in_,
AggregationKeysInfo aggregation_keys_info_)
: WithContext(context_)
, set_size_limit(set_size_limit_)
, subquery_depth(subquery_depth_)
@ -475,6 +494,7 @@ ActionsMatcher::Data::Data(
, create_source_for_in(create_source_for_in_)
, visit_depth(0)
, actions_stack(std::move(actions_dag), context_)
, aggregation_keys_info(aggregation_keys_info_)
, next_unique_suffix(actions_stack.getLastActions().getIndex().size() + 1)
{
}
@ -817,6 +837,55 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
return;
}
if (node.name == "grouping")
{
if (data.only_consts)
return; // Can not perform constant folding, because this function can be executed only after GROUP BY
size_t arguments_size = node.arguments->children.size();
if (arguments_size == 0)
throw Exception(ErrorCodes::TOO_FEW_ARGUMENTS_FOR_FUNCTION, "Function GROUPING expects at least one argument");
if (arguments_size > 64)
throw Exception(ErrorCodes::TOO_MANY_ARGUMENTS_FOR_FUNCTION, "Function GROUPING can have up to 64 arguments, but {} provided", arguments_size);
auto keys_info = data.aggregation_keys_info;
auto aggregation_keys_number = keys_info.aggregation_keys.size();
ColumnNumbers arguments_indexes;
for (auto const & arg : node.arguments->children)
{
size_t pos = keys_info.aggregation_keys.getPosByName(arg->getColumnName());
if (pos == aggregation_keys_number)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Argument of GROUPING function {} is not a part of GROUP BY clause", arg->getColumnName());
arguments_indexes.push_back(pos);
}
switch (keys_info.group_by_kind)
{
case GroupByKind::GROUPING_SETS:
{
data.addFunction(std::make_shared<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionGroupingForGroupingSets>(std::move(arguments_indexes), keys_info.grouping_set_keys)), { "__grouping_set" }, column_name);
break;
}
case GroupByKind::ROLLUP:
data.addFunction(std::make_shared<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionGroupingForRollup>(std::move(arguments_indexes), aggregation_keys_number)), { "__grouping_set" }, column_name);
break;
case GroupByKind::CUBE:
{
data.addFunction(std::make_shared<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionGroupingForCube>(std::move(arguments_indexes), aggregation_keys_number)), { "__grouping_set" }, column_name);
break;
}
case GroupByKind::ORDINARY:
{
data.addFunction(std::make_shared<FunctionToOverloadResolverAdaptor>(std::make_shared<FunctionGroupingOrdinary>(std::move(arguments_indexes))), {}, column_name);
break;
}
default:
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Unexpected kind of GROUP BY clause for GROUPING function: {}", keys_info.group_by_kind);
}
return;
}
SetPtr prepared_set;
if (checkFunctionIsInOrGlobalInOperator(node))
{

View File

@ -1,10 +1,12 @@
#pragma once
#include <Core/NamesAndTypes.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/SubqueryForSet.h>
#include <Parsers/IAST.h>
#include <Core/ColumnNumbers.h>
namespace DB
@ -76,6 +78,42 @@ class ASTIdentifier;
class ASTFunction;
class ASTLiteral;
enum class GroupByKind
{
NONE,
ORDINARY,
ROLLUP,
CUBE,
GROUPING_SETS,
};
/*
* This class stores information about aggregation keys used in GROUP BY clause.
* It's used for providing information about aggregation to GROUPING function
* implementation.
*/
struct AggregationKeysInfo
{
AggregationKeysInfo(
std::reference_wrapper<const NamesAndTypesList> aggregation_keys_,
std::reference_wrapper<const ColumnNumbersList> grouping_set_keys_,
GroupByKind group_by_kind_)
: aggregation_keys(aggregation_keys_)
, grouping_set_keys(grouping_set_keys_)
, group_by_kind(group_by_kind_)
{}
AggregationKeysInfo(const AggregationKeysInfo &) = default;
AggregationKeysInfo(AggregationKeysInfo &&) = default;
// Names and types of all used keys
const NamesAndTypesList & aggregation_keys;
// Indexes of aggregation keys used in each grouping set (only for GROUP BY GROUPING SETS)
const ColumnNumbersList & grouping_set_keys;
GroupByKind group_by_kind;
};
/// Collect ExpressionAction from AST. Returns PreparedSets and SubqueriesForSets too.
class ActionsMatcher
{
@ -95,6 +133,7 @@ public:
bool create_source_for_in;
size_t visit_depth;
ScopeStack actions_stack;
AggregationKeysInfo aggregation_keys_info;
/*
* Remember the last unique column suffix to avoid quadratic behavior
@ -107,14 +146,15 @@ public:
ContextPtr context_,
SizeLimits set_size_limit_,
size_t subquery_depth_,
const NamesAndTypesList & source_columns_,
std::reference_wrapper<const NamesAndTypesList> source_columns_,
ActionsDAGPtr actions_dag,
PreparedSets & prepared_sets_,
SubqueriesForSets & subqueries_for_sets_,
bool no_subqueries_,
bool no_makeset_,
bool only_consts_,
bool create_source_for_in_);
bool create_source_for_in_,
AggregationKeysInfo aggregation_keys_info_);
/// Does result of the calculation already exists in the block.
bool hasColumn(const String & column_name) const;

View File

@ -43,10 +43,13 @@
#include <Common/typeid_cast.h>
#include <Common/StringUtils/StringUtils.h>
#include <Core/ColumnNumbers.h>
#include <Core/Names.h>
#include <Core/NamesAndTypes.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/DataTypeFixedString.h>
#include <Interpreters/ActionsVisitor.h>
#include <Interpreters/GetAggregatesVisitor.h>
@ -325,12 +328,21 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
{
if (ASTPtr group_by_ast = select_query->groupBy())
{
NameSet unique_keys;
NameToIndexMap unique_keys;
ASTs & group_asts = group_by_ast->children;
if (select_query->group_by_with_rollup)
group_by_kind = GroupByKind::ROLLUP;
else if (select_query->group_by_with_cube)
group_by_kind = GroupByKind::CUBE;
else if (select_query->group_by_with_grouping_sets && group_asts.size() > 1)
group_by_kind = GroupByKind::GROUPING_SETS;
else
group_by_kind = GroupByKind::ORDINARY;
/// For GROUPING SETS with multiple groups we always add virtual __grouping_set column
/// With set number, which is used as an additional key at the stage of merging aggregating data.
if (select_query->group_by_with_grouping_sets && group_asts.size() > 1)
if (group_by_kind != GroupByKind::ORDINARY)
aggregated_columns.emplace_back("__grouping_set", std::make_shared<DataTypeUInt64>());
for (ssize_t i = 0; i < static_cast<ssize_t>(group_asts.size()); ++i)
@ -347,6 +359,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
group_elements_ast = group_ast_element->children;
NamesAndTypesList grouping_set_list;
ColumnNumbers grouping_set_indexes_list;
for (ssize_t j = 0; j < ssize_t(group_elements_ast.size()); ++j)
{
@ -387,15 +400,21 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
/// Aggregation keys are unique.
if (!unique_keys.contains(key.name))
{
unique_keys.insert(key.name);
unique_keys[key.name] = aggregation_keys.size();
grouping_set_indexes_list.push_back(aggregation_keys.size());
aggregation_keys.push_back(key);
/// Key is no longer needed, therefore we can save a little by moving it.
aggregated_columns.push_back(std::move(key));
}
else
{
grouping_set_indexes_list.push_back(unique_keys[key.name]);
}
}
aggregation_keys_list.push_back(std::move(grouping_set_list));
aggregation_keys_indexes_list.push_back(std::move(grouping_set_indexes_list));
}
else
{
@ -433,7 +452,7 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
/// Aggregation keys are uniqued.
if (!unique_keys.contains(key.name))
{
unique_keys.insert(key.name);
unique_keys[key.name] = aggregation_keys.size();
aggregation_keys.push_back(key);
/// Key is no longer needed, therefore we can save a little by moving it.
@ -442,6 +461,13 @@ void ExpressionAnalyzer::analyzeAggregation(ActionsDAGPtr & temp_actions)
}
}
if (!select_query->group_by_with_grouping_sets)
{
auto & list = aggregation_keys_indexes_list.emplace_back();
for (size_t i = 0; i < aggregation_keys.size(); ++i)
list.push_back(i);
}
if (group_asts.empty())
{
select_query->setExpression(ASTSelectQuery::Expression::GROUP_BY, {});
@ -583,7 +609,8 @@ void ExpressionAnalyzer::getRootActions(const ASTPtr & ast, bool no_makeset_for_
no_makeset_for_subqueries,
false /* no_makeset */,
only_consts,
!isRemoteStorage() /* create_source_for_in */);
!isRemoteStorage() /* create_source_for_in */,
getAggregationKeysInfo());
ActionsVisitor(visitor_data, log.stream()).visit(ast);
actions = visitor_data.getActions();
}
@ -603,7 +630,8 @@ void ExpressionAnalyzer::getRootActionsNoMakeSet(const ASTPtr & ast, ActionsDAGP
true /* no_makeset_for_subqueries, no_makeset implies no_makeset_for_subqueries */,
true /* no_makeset */,
only_consts,
!isRemoteStorage() /* create_source_for_in */);
!isRemoteStorage() /* create_source_for_in */,
getAggregationKeysInfo());
ActionsVisitor(visitor_data, log.stream()).visit(ast);
actions = visitor_data.getActions();
}
@ -624,7 +652,8 @@ void ExpressionAnalyzer::getRootActionsForHaving(
no_makeset_for_subqueries,
false /* no_makeset */,
only_consts,
true /* create_source_for_in */);
true /* create_source_for_in */,
getAggregationKeysInfo());
ActionsVisitor(visitor_data, log.stream()).visit(ast);
actions = visitor_data.getActions();
}

View File

@ -1,6 +1,8 @@
#pragma once
#include <Core/ColumnNumbers.h>
#include <Columns/FilterDescription.h>
#include <Interpreters/ActionsVisitor.h>
#include <Interpreters/AggregateDescription.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/SubqueryForSet.h>
@ -67,6 +69,7 @@ struct ExpressionAnalyzerData
bool has_aggregation = false;
NamesAndTypesList aggregation_keys;
NamesAndTypesLists aggregation_keys_list;
ColumnNumbersList aggregation_keys_indexes_list;
bool has_const_aggregation_keys = false;
AggregateDescriptions aggregate_descriptions;
@ -77,6 +80,8 @@ struct ExpressionAnalyzerData
/// All new temporary tables obtained by performing the GLOBAL IN/JOIN subqueries.
TemporaryTablesMapping external_tables;
GroupByKind group_by_kind = GroupByKind::NONE;
};
@ -200,6 +205,11 @@ protected:
NamesAndTypesList getColumnsAfterArrayJoin(ActionsDAGPtr & actions, const NamesAndTypesList & src_columns);
NamesAndTypesList analyzeJoin(ActionsDAGPtr & actions, const NamesAndTypesList & src_columns);
AggregationKeysInfo getAggregationKeysInfo() const noexcept
{
return { aggregation_keys, aggregation_keys_indexes_list, group_by_kind };
}
};
class SelectQueryExpressionAnalyzer;
@ -326,7 +336,15 @@ public:
bool hasGlobalSubqueries() { return has_global_subqueries; }
bool hasTableJoin() const { return syntax->ast_join; }
bool useGroupingSetKey() const { return aggregation_keys_list.size() > 1; }
/// When there is only one group in GROUPING SETS
/// it is a special case that is equal to GROUP BY, i.e.:
///
/// GROUPING SETS ((a, b)) -> GROUP BY a, b
///
/// But it is rewritten by GroupingSetsRewriterVisitor to GROUP BY,
/// so instead of aggregation_keys_list.size() > 1,
/// !aggregation_keys_list.empty() can be used.
bool useGroupingSetKey() const { return !aggregation_keys_list.empty(); }
const NamesAndTypesList & aggregationKeys() const { return aggregation_keys; }
bool hasConstAggregationKeys() const { return has_const_aggregation_keys; }

View File

@ -0,0 +1,22 @@
#include <Interpreters/GroupingSetsRewriterVisitor.h>
#include <Parsers/ASTSelectQuery.h>
#include <Parsers/ASTExpressionList.h>
namespace DB
{
void GroupingSetsRewriterData::visit(ASTSelectQuery & select_query, ASTPtr &)
{
const ASTPtr group_by = select_query.groupBy();
if (!group_by || !select_query.group_by_with_grouping_sets)
return;
if (group_by->children.size() != 1)
return;
select_query.setExpression(ASTSelectQuery::Expression::GROUP_BY, std::move(group_by->children.front()));
select_query.group_by_with_grouping_sets = false;
}
}

View File

@ -0,0 +1,30 @@
#pragma once
#include <Parsers/IAST.h>
#include <Interpreters/InDepthNodeVisitor.h>
namespace DB
{
class ASTSelectQuery;
/// Rewrite GROUPING SETS with only one group, to GROUP BY.
///
/// Examples:
/// - GROUPING SETS (foo) -> GROUP BY foo
/// - GROUPING SETS ((foo, bar)) -> GROUP BY foo, bar
///
/// But not the following:
/// - GROUPING SETS (foo, bar) (since it has two groups (foo) and (bar))
class GroupingSetsRewriterData
{
public:
using TypeToVisit = ASTSelectQuery;
static void visit(ASTSelectQuery & select_query, ASTPtr &);
};
using GroupingSetsRewriterMatcher = OneTypeMatcher<GroupingSetsRewriterData>;
using GroupingSetsRewriterVisitor = InDepthNodeVisitor<GroupingSetsRewriterMatcher, true>;
}

View File

@ -538,6 +538,7 @@ void HashJoin::dataMapInit(MapsVariant & map)
bool HashJoin::overDictionary() const
{
assert(data->type != Type::DICT || table_join->getDictionaryReader());
return data->type == Type::DICT;
}
@ -707,6 +708,13 @@ namespace
void HashJoin::initRightBlockStructure(Block & saved_block_sample)
{
if (isCrossOrComma(kind))
{
/// cross join doesn't have keys, just add all columns
saved_block_sample = sample_block_with_columns_to_add.cloneEmpty();
return;
}
bool multiple_disjuncts = !table_join->oneDisjunct();
/// We could remove key columns for LEFT | INNER HashJoin but we should keep them for JoinSwitcher (if any).
bool save_key_columns = !table_join->forceHashJoin() || isRightOrFull(kind) || multiple_disjuncts;
@ -724,11 +732,9 @@ void HashJoin::initRightBlockStructure(Block & saved_block_sample)
for (auto & column : sample_block_with_columns_to_add)
{
if (!saved_block_sample.findByName(column.name))
{
saved_block_sample.insert(column);
}
}
}
Block HashJoin::structureRightBlock(const Block & block) const
{
@ -912,6 +918,7 @@ public:
bool is_join_get_)
: join_on_keys(join_on_keys_)
, rows_to_add(block.rows())
, sample_block(saved_block_sample)
, is_join_get(is_join_get_)
{
size_t num_columns_to_add = block_with_columns_to_add.columns();
@ -951,12 +958,46 @@ public:
return ColumnWithTypeAndName(std::move(columns[i]), type_name[i].type, type_name[i].qualified_name);
}
static void assertBlockEqualsStructureUpToLowCard(const Block & lhs_block, const Block & rhs_block)
{
if (lhs_block.columns() != rhs_block.columns())
throw Exception("Different number of columns in blocks", ErrorCodes::LOGICAL_ERROR);
for (size_t i = 0; i < lhs_block.columns(); ++i)
{
const auto & lhs = lhs_block.getByPosition(i);
const auto & rhs = rhs_block.getByPosition(i);
if (lhs.name != rhs.name)
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Block structure mismatch: [{}] != [{}]",
lhs_block.dumpStructure(), rhs_block.dumpStructure());
const auto & ltype = recursiveRemoveLowCardinality(lhs.type);
const auto & rtype = recursiveRemoveLowCardinality(rhs.type);
if (!ltype->equals(*rtype))
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Block structure mismatch: [{}] != [{}]",
lhs_block.dumpStructure(), rhs_block.dumpStructure());
const auto & lcol = recursiveRemoveLowCardinality(lhs.column);
const auto & rcol = recursiveRemoveLowCardinality(rhs.column);
if (lcol->getDataType() != rcol->getDataType())
throw DB::Exception(ErrorCodes::LOGICAL_ERROR, "Block structure mismatch: [{}] != [{}]",
lhs_block.dumpStructure(), rhs_block.dumpStructure());
}
}
template <bool has_defaults>
void appendFromBlock(const Block & block, size_t row_num)
{
if constexpr (has_defaults)
applyLazyDefaults();
#ifndef NDEBUG
/// Like assertBlocksHaveEqualStructure but doesn't check low cardinality
assertBlockEqualsStructureUpToLowCard(sample_block, block);
#else
UNUSED(assertBlockEqualsStructureUpToLowCard);
#endif
if (is_join_get)
{
/// If it's joinGetOrNull, we need to wrap not-nullable columns in StorageJoin.
@ -1019,6 +1060,7 @@ private:
size_t lazy_defaults_count = 0;
/// for ASOF
const IColumn * left_asof_key = nullptr;
Block sample_block;
bool is_join_get;
@ -1698,7 +1740,7 @@ DataTypePtr HashJoin::joinGetCheckAndGetReturnType(const DataTypes & data_types,
throw Exception("StorageJoin doesn't contain column " + column_name, ErrorCodes::NO_SUCH_COLUMN_IN_TABLE);
auto elem = sample_block_with_columns_to_add.getByName(column_name);
if (or_null)
if (or_null && JoinCommon::canBecomeNullable(elem.type))
elem.type = makeNullable(elem.type);
return elem.type;
}

View File

@ -12,8 +12,13 @@ class IInterpreterUnionOrSelectQuery : public IInterpreter
{
public:
IInterpreterUnionOrSelectQuery(const ASTPtr & query_ptr_, ContextPtr context_, const SelectQueryOptions & options_)
: IInterpreterUnionOrSelectQuery(query_ptr_, Context::createCopy(context_), options_)
{
}
IInterpreterUnionOrSelectQuery(const ASTPtr & query_ptr_, ContextMutablePtr context_, const SelectQueryOptions & options_)
: query_ptr(query_ptr_)
, context(Context::createCopy(context_))
, context(context_)
, options(options_)
, max_streams(context->getSettingsRef().max_threads)
{
@ -60,4 +65,3 @@ protected:
bool uses_view_source = false;
};
}

View File

@ -51,6 +51,14 @@ InterpreterSelectIntersectExceptQuery::InterpreterSelectIntersectExceptQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
const SelectQueryOptions & options_)
:InterpreterSelectIntersectExceptQuery(query_ptr_, Context::createCopy(context_), options_)
{
}
InterpreterSelectIntersectExceptQuery::InterpreterSelectIntersectExceptQuery(
const ASTPtr & query_ptr_,
ContextMutablePtr context_,
const SelectQueryOptions & options_)
: IInterpreterUnionOrSelectQuery(query_ptr_->clone(), context_, options_)
{
ASTSelectIntersectExceptQuery * ast = query_ptr->as<ASTSelectIntersectExceptQuery>();

View File

@ -24,6 +24,11 @@ public:
ContextPtr context_,
const SelectQueryOptions & options_);
InterpreterSelectIntersectExceptQuery(
const ASTPtr & query_ptr_,
ContextMutablePtr context_,
const SelectQueryOptions & options_);
BlockIO execute() override;
Block getSampleBlock() { return result_header; }

View File

@ -165,6 +165,14 @@ InterpreterSelectQuery::InterpreterSelectQuery(
: InterpreterSelectQuery(query_ptr_, context_, std::nullopt, nullptr, options_, required_result_column_names_)
{}
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
ContextMutablePtr context_,
const SelectQueryOptions & options_,
const Names & required_result_column_names_)
: InterpreterSelectQuery(query_ptr_, context_, std::nullopt, nullptr, options_, required_result_column_names_)
{}
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
ContextPtr context_,
@ -280,6 +288,28 @@ InterpreterSelectQuery::InterpreterSelectQuery(
const StorageMetadataPtr & metadata_snapshot_,
SubqueriesForSets subquery_for_sets_,
PreparedSets prepared_sets_)
: InterpreterSelectQuery(
query_ptr_,
Context::createCopy(context_),
std::move(input_pipe_),
storage_,
options_,
required_result_column_names,
metadata_snapshot_,
std::move(subquery_for_sets_),
std::move(prepared_sets_))
{}
InterpreterSelectQuery::InterpreterSelectQuery(
const ASTPtr & query_ptr_,
ContextMutablePtr context_,
std::optional<Pipe> input_pipe_,
const StoragePtr & storage_,
const SelectQueryOptions & options_,
const Names & required_result_column_names,
const StorageMetadataPtr & metadata_snapshot_,
SubqueriesForSets subquery_for_sets_,
PreparedSets prepared_sets_)
/// NOTE: the query almost always should be cloned because it will be modified during analysis.
: IInterpreterUnionOrSelectQuery(options_.modify_inplace ? query_ptr_ : query_ptr_->clone(), context_, options_)
, storage(storage_)
@ -1098,6 +1128,9 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
if (query.group_by_with_grouping_sets && (query.group_by_with_rollup || query.group_by_with_cube))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "GROUPING SETS are not supported together with ROLLUP and CUBE");
if (expressions.hasHaving() && query.group_by_with_totals && (query.group_by_with_rollup || query.group_by_with_cube))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "WITH TOTALS and WITH ROLLUP or CUBE are not supported together in presence of HAVING");
if (query_info.projection && query_info.projection->desc->type == ProjectionDescription::Type::Aggregate)
{
query_info.projection->aggregate_overflow_row = aggregate_overflow_row;
@ -1386,12 +1419,8 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, std::optional<P
executeRollupOrCube(query_plan, Modificator::CUBE);
if ((query.group_by_with_rollup || query.group_by_with_cube || query.group_by_with_grouping_sets) && expressions.hasHaving())
{
if (query.group_by_with_totals)
throw Exception("WITH TOTALS and WITH ROLLUP or CUBE or GROUPING SETS are not supported together in presence of HAVING", ErrorCodes::NOT_IMPLEMENTED);
executeHaving(query_plan, expressions.before_having, expressions.remove_having_filter);
}
}
else if (expressions.hasHaving())
executeHaving(query_plan, expressions.before_having, expressions.remove_having_filter);
}
@ -2060,12 +2089,15 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
if (prewhere_info)
query_info.prewhere_info = prewhere_info;
bool optimize_read_in_order = analysis_result.optimize_read_in_order;
bool optimize_aggregation_in_order = analysis_result.optimize_aggregation_in_order && !query_analyzer->useGroupingSetKey();
/// Create optimizer with prepared actions.
/// Maybe we will need to calc input_order_info later, e.g. while reading from StorageMerge.
if ((analysis_result.optimize_read_in_order || analysis_result.optimize_aggregation_in_order)
if ((optimize_read_in_order || optimize_aggregation_in_order)
&& (!query_info.projection || query_info.projection->complete))
{
if (analysis_result.optimize_read_in_order)
if (optimize_read_in_order)
{
if (query_info.projection)
{
@ -2291,7 +2323,7 @@ void InterpreterSelectQuery::executeAggregation(QueryPlan & query_plan, const Ac
SortDescription group_by_sort_description;
if (group_by_info && settings.optimize_aggregation_in_order)
if (group_by_info && settings.optimize_aggregation_in_order && !query_analyzer->useGroupingSetKey())
group_by_sort_description = getSortDescriptionFromGroupBy(getSelectQuery());
else
group_by_info = nullptr;

View File

@ -55,6 +55,12 @@ public:
const SelectQueryOptions &,
const Names & required_result_column_names_ = Names{});
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
ContextMutablePtr context_,
const SelectQueryOptions &,
const Names & required_result_column_names_ = Names{});
/// Read data not from the table specified in the query, but from the prepared pipe `input`.
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
@ -133,6 +139,17 @@ private:
SubqueriesForSets subquery_for_sets_ = {},
PreparedSets prepared_sets_ = {});
InterpreterSelectQuery(
const ASTPtr & query_ptr_,
ContextMutablePtr context_,
std::optional<Pipe> input_pipe,
const StoragePtr & storage_,
const SelectQueryOptions &,
const Names & required_result_column_names = {},
const StorageMetadataPtr & metadata_snapshot_ = nullptr,
SubqueriesForSets subquery_for_sets_ = {},
PreparedSets prepared_sets_ = {});
ASTSelectQuery & getSelectQuery() { return query_ptr->as<ASTSelectQuery &>(); }
void addPrewhereAliasActions();

View File

@ -31,8 +31,13 @@ namespace ErrorCodes
}
InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
const ASTPtr & query_ptr_, ContextPtr context_,
const SelectQueryOptions & options_, const Names & required_result_column_names)
const ASTPtr & query_ptr_, ContextPtr context_, const SelectQueryOptions & options_, const Names & required_result_column_names)
: InterpreterSelectWithUnionQuery(query_ptr_, Context::createCopy(context_), options_, required_result_column_names)
{
}
InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
const ASTPtr & query_ptr_, ContextMutablePtr context_, const SelectQueryOptions & options_, const Names & required_result_column_names)
: IInterpreterUnionOrSelectQuery(query_ptr_, context_, options_)
{
ASTSelectWithUnionQuery * ast = query_ptr->as<ASTSelectWithUnionQuery>();
@ -46,6 +51,10 @@ InterpreterSelectWithUnionQuery::InterpreterSelectWithUnionQuery(
if (!num_children)
throw Exception("Logical error: no children in ASTSelectWithUnionQuery", ErrorCodes::LOGICAL_ERROR);
/// This is required for UNION to match headers correctly.
if (num_children > 1)
options.reorderColumns();
/// Note that we pass 'required_result_column_names' to first SELECT.
/// And for the rest, we pass names at the corresponding positions of 'required_result_column_names' in the result of first SELECT,
/// because names could be different.

View File

@ -22,6 +22,12 @@ public:
const SelectQueryOptions &,
const Names & required_result_column_names = {});
InterpreterSelectWithUnionQuery(
const ASTPtr & query_ptr_,
ContextMutablePtr context_,
const SelectQueryOptions &,
const Names & required_result_column_names = {});
~InterpreterSelectWithUnionQuery() override;
/// Builds QueryPlan for current query.

View File

@ -16,6 +16,26 @@
namespace DB
{
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
}
PartLogElement::MergeReasonType PartLogElement::getMergeReasonType(MergeType merge_type)
{
switch (merge_type)
{
case MergeType::Regular:
return REGULAR_MERGE;
case MergeType::TTLDelete:
return TTL_DELETE_MERGE;
case MergeType::TTLRecompress:
return TTL_RECOMPRESS_MERGE;
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Unknown MergeType {}", static_cast<UInt64>(merge_type));
}
NamesAndTypesList PartLogElement::getNamesAndTypes()
{
auto event_type_datatype = std::make_shared<DataTypeEnum8>(
@ -30,11 +50,22 @@ NamesAndTypesList PartLogElement::getNamesAndTypes()
}
);
auto merge_reason_datatype = std::make_shared<DataTypeEnum8>(
DataTypeEnum8::Values
{
{"NotAMerge", static_cast<Int8>(NOT_A_MERGE)},
{"RegularMerge", static_cast<Int8>(REGULAR_MERGE)},
{"TTLDeleteMerge", static_cast<Int8>(TTL_DELETE_MERGE)},
{"TTLRecompressMerge", static_cast<Int8>(TTL_RECOMPRESS_MERGE)},
}
);
ColumnsWithTypeAndName columns_with_type_and_name;
return {
{"query_id", std::make_shared<DataTypeString>()},
{"event_type", std::move(event_type_datatype)},
{"merge_reason", std::move(merge_reason_datatype)},
{"event_date", std::make_shared<DataTypeDate>()},
{"event_time", std::make_shared<DataTypeDateTime>()},
@ -72,6 +103,7 @@ void PartLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(query_id);
columns[i++]->insert(event_type);
columns[i++]->insert(merge_reason);
columns[i++]->insert(DateLUT::instance().toDayNum(event_time).toUnderType());
columns[i++]->insert(event_time);
columns[i++]->insert(event_time_microseconds);

View File

@ -4,6 +4,7 @@
#include <Interpreters/SystemLog.h>
#include <Core/NamesAndTypes.h>
#include <Core/NamesAndAliases.h>
#include <Storages/MergeTree/MergeType.h>
namespace DB
@ -21,9 +22,22 @@ struct PartLogElement
MOVE_PART = 6,
};
enum MergeReasonType
{
/// merge_reason is relevant only for event_type = 'MERGE_PARTS', in other cases it is NOT_A_MERGE
NOT_A_MERGE = 1,
/// Just regular merge
REGULAR_MERGE = 2,
/// Merge assigned to delete some data from parts (with TTLMergeSelector)
TTL_DELETE_MERGE = 3,
/// Merge with recompression
TTL_RECOMPRESS_MERGE = 4,
};
String query_id;
Type event_type = NEW_PART;
MergeReasonType merge_reason = NOT_A_MERGE;
time_t event_time = 0;
Decimal64 event_time_microseconds = 0;
@ -57,6 +71,7 @@ struct PartLogElement
static std::string name() { return "PartLog"; }
static MergeReasonType getMergeReasonType(MergeType merge_type);
static NamesAndTypesList getNamesAndTypes();
static NamesAndAliases getNamesAndAliases() { return {}; }
void appendToBlock(MutableColumns & columns) const;

View File

@ -225,6 +225,8 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
if (settings.memory_tracker_fault_probability)
thread_group->memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability);
thread_group->memory_tracker.setOvercommitWaitingTime(settings.memory_usage_overcommit_max_wait_microseconds);
/// NOTE: Do not set the limit for thread-level memory tracker since it could show unreal values
/// since allocation and deallocation could happen in different threads
}
@ -244,7 +246,6 @@ ProcessList::EntryPtr ProcessList::insert(const String & query_, const IAST * as
user_process_list.user_memory_tracker.setOrRaiseHardLimit(settings.max_memory_usage_for_user);
user_process_list.user_memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator_for_user);
user_process_list.user_memory_tracker.setDescription("(for user)");
user_process_list.user_overcommit_tracker.setMaxWaitTime(settings.memory_usage_overcommit_max_wait_microseconds);
if (!user_process_list.user_throttler)
{

Some files were not shown because too many files have changed in this diff Show More