Merge branch 'master' into fiber-local-var-2

This commit is contained in:
Kruglov Pavel 2023-05-22 15:17:45 +02:00 committed by GitHub
commit 054ffc47b7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
208 changed files with 3236 additions and 988 deletions

View File

@ -270,8 +270,8 @@ endif ()
option (ENABLE_BUILD_PATH_MAPPING "Enable remapping of file source paths in debug info, predefined preprocessor macros, and __builtin_FILE(). It's used to generate reproducible builds. See https://reproducible-builds.org/docs/build-path" ${ENABLE_BUILD_PATH_MAPPING_DEFAULT})
if (ENABLE_BUILD_PATH_MAPPING)
set (COMPILER_FLAGS "${COMPILER_FLAGS} -ffile-prefix-map=${CMAKE_SOURCE_DIR}=.")
set (CMAKE_ASM_FLAGS "${CMAKE_ASM_FLAGS} -ffile-prefix-map=${CMAKE_SOURCE_DIR}=.")
set (COMPILER_FLAGS "${COMPILER_FLAGS} -ffile-prefix-map=${PROJECT_SOURCE_DIR}=.")
set (CMAKE_ASM_FLAGS "${CMAKE_ASM_FLAGS} -ffile-prefix-map=${PROJECT_SOURCE_DIR}=.")
endif ()
option (ENABLE_BUILD_PROFILING "Enable profiling of build time" OFF)
@ -568,7 +568,7 @@ if (NATIVE_BUILD_TARGETS
)
message (STATUS "Building native targets...")
set (NATIVE_BUILD_DIR "${CMAKE_BINARY_DIR}/native")
set (NATIVE_BUILD_DIR "${PROJECT_BINARY_DIR}/native")
execute_process(
COMMAND ${CMAKE_COMMAND} -E make_directory "${NATIVE_BUILD_DIR}"
@ -582,7 +582,7 @@ if (NATIVE_BUILD_TARGETS
# Avoid overriding .cargo/config.toml with native toolchain.
"-DENABLE_RUST=OFF"
"-DENABLE_CLICKHOUSE_SELF_EXTRACTING=${ENABLE_CLICKHOUSE_SELF_EXTRACTING}"
${CMAKE_SOURCE_DIR}
${PROJECT_SOURCE_DIR}
WORKING_DIRECTORY "${NATIVE_BUILD_DIR}"
COMMAND_ECHO STDOUT)

View File

@ -3,6 +3,7 @@
#include <cassert>
#include <stdexcept> // for std::logic_error
#include <string>
#include <type_traits>
#include <vector>
#include <functional>
#include <iosfwd>
@ -326,5 +327,16 @@ namespace ZeroTraits
inline void set(StringRef & x) { x.size = 0; }
}
namespace PackedZeroTraits
{
template <typename Second, template <typename, typename> class PackedPairNoInit>
inline bool check(const PackedPairNoInit<StringRef, Second> p)
{ return 0 == p.key.size; }
template <typename Second, template <typename, typename> class PackedPairNoInit>
inline void set(PackedPairNoInit<StringRef, Second> & p)
{ p.key.size = 0; }
}
std::ostream & operator<<(std::ostream & os, const StringRef & str);

View File

@ -5,11 +5,11 @@ if (NOT TARGET check)
if (CMAKE_CONFIGURATION_TYPES)
add_custom_target (check COMMAND ${CMAKE_CTEST_COMMAND}
--force-new-ctest-process --output-on-failure --build-config "$<CONFIGURATION>"
WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
WORKING_DIRECTORY ${PROJECT_BINARY_DIR})
else ()
add_custom_target (check COMMAND ${CMAKE_CTEST_COMMAND}
--force-new-ctest-process --output-on-failure
WORKING_DIRECTORY ${CMAKE_BINARY_DIR})
WORKING_DIRECTORY ${PROJECT_BINARY_DIR})
endif ()
endif ()

View File

@ -5,14 +5,14 @@ if (Git_FOUND)
# Commit hash + whether the building workspace was dirty or not
execute_process(COMMAND
"${GIT_EXECUTABLE}" rev-parse HEAD
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
OUTPUT_VARIABLE GIT_HASH
ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE)
# Branch name
execute_process(COMMAND
"${GIT_EXECUTABLE}" rev-parse --abbrev-ref HEAD
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
OUTPUT_VARIABLE GIT_BRANCH
ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE)
@ -20,14 +20,14 @@ if (Git_FOUND)
SET(ENV{TZ} "UTC")
execute_process(COMMAND
"${GIT_EXECUTABLE}" log -1 --format=%ad --date=iso-local
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
OUTPUT_VARIABLE GIT_DATE
ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE)
# Subject of the commit
execute_process(COMMAND
"${GIT_EXECUTABLE}" log -1 --format=%s
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR}
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR}
OUTPUT_VARIABLE GIT_COMMIT_SUBJECT
ERROR_QUIET OUTPUT_STRIP_TRAILING_WHITESPACE)
@ -35,7 +35,7 @@ if (Git_FOUND)
execute_process(
COMMAND ${GIT_EXECUTABLE} status
WORKING_DIRECTORY ${CMAKE_SOURCE_DIR} OUTPUT_STRIP_TRAILING_WHITESPACE)
WORKING_DIRECTORY ${PROJECT_SOURCE_DIR} OUTPUT_STRIP_TRAILING_WHITESPACE)
else()
message(STATUS "Git could not be found.")
endif()

View File

@ -7,6 +7,6 @@ message (STATUS "compiler CXX = ${CMAKE_CXX_COMPILER} ${FULL_CXX_FLAGS}")
message (STATUS "LINKER_FLAGS = ${FULL_EXE_LINKER_FLAGS}")
# Reproducible builds
string (REPLACE "${CMAKE_SOURCE_DIR}" "." FULL_C_FLAGS_NORMALIZED "${FULL_C_FLAGS}")
string (REPLACE "${CMAKE_SOURCE_DIR}" "." FULL_CXX_FLAGS_NORMALIZED "${FULL_CXX_FLAGS}")
string (REPLACE "${CMAKE_SOURCE_DIR}" "." FULL_EXE_LINKER_FLAGS_NORMALIZED "${FULL_EXE_LINKER_FLAGS}")
string (REPLACE "${PROJECT_SOURCE_DIR}" "." FULL_C_FLAGS_NORMALIZED "${FULL_C_FLAGS}")
string (REPLACE "${PROJECT_SOURCE_DIR}" "." FULL_CXX_FLAGS_NORMALIZED "${FULL_CXX_FLAGS}")
string (REPLACE "${PROJECT_SOURCE_DIR}" "." FULL_EXE_LINKER_FLAGS_NORMALIZED "${FULL_EXE_LINKER_FLAGS}")

View File

@ -29,14 +29,14 @@ if (SANITIZE)
# Linking can fail due to relocation overflows (see #49145), caused by too big object files / libraries.
# Work around this with position-independent builds (-fPIC and -fpie), this is slightly slower than non-PIC/PIE but that's okay.
set (MSAN_FLAGS "-fsanitize=memory -fsanitize-memory-use-after-dtor -fsanitize-memory-track-origins -fno-optimize-sibling-calls -fPIC -fpie -fsanitize-blacklist=${CMAKE_SOURCE_DIR}/tests/msan_suppressions.txt")
set (MSAN_FLAGS "-fsanitize=memory -fsanitize-memory-use-after-dtor -fsanitize-memory-track-origins -fno-optimize-sibling-calls -fPIC -fpie -fsanitize-blacklist=${PROJECT_SOURCE_DIR}/tests/msan_suppressions.txt")
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} ${MSAN_FLAGS}")
set (CMAKE_C_FLAGS "${CMAKE_C_FLAGS} ${SAN_FLAGS} ${MSAN_FLAGS}")
elseif (SANITIZE STREQUAL "thread")
set (TSAN_FLAGS "-fsanitize=thread")
if (COMPILER_CLANG)
set (TSAN_FLAGS "${TSAN_FLAGS} -fsanitize-blacklist=${CMAKE_SOURCE_DIR}/tests/tsan_suppressions.txt")
set (TSAN_FLAGS "${TSAN_FLAGS} -fsanitize-blacklist=${PROJECT_SOURCE_DIR}/tests/tsan_suppressions.txt")
endif()
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} ${TSAN_FLAGS}")
@ -54,7 +54,7 @@ if (SANITIZE)
set(UBSAN_FLAGS "${UBSAN_FLAGS} -fno-sanitize=unsigned-integer-overflow")
endif()
if (COMPILER_CLANG)
set (UBSAN_FLAGS "${UBSAN_FLAGS} -fsanitize-blacklist=${CMAKE_SOURCE_DIR}/tests/ubsan_suppressions.txt")
set (UBSAN_FLAGS "${UBSAN_FLAGS} -fsanitize-blacklist=${PROJECT_SOURCE_DIR}/tests/ubsan_suppressions.txt")
endif()
set (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${SAN_FLAGS} ${UBSAN_FLAGS}")

View File

@ -1,4 +1,4 @@
include(${CMAKE_SOURCE_DIR}/cmake/autogenerated_versions.txt)
include(${PROJECT_SOURCE_DIR}/cmake/autogenerated_versions.txt)
set(VERSION_EXTRA "" CACHE STRING "")
set(VERSION_TWEAK "" CACHE STRING "")

View File

@ -6,7 +6,7 @@ if (NOT ENABLE_AVRO)
return()
endif()
set(AVROCPP_ROOT_DIR "${CMAKE_SOURCE_DIR}/contrib/avro/lang/c++")
set(AVROCPP_ROOT_DIR "${PROJECT_SOURCE_DIR}/contrib/avro/lang/c++")
set(AVROCPP_INCLUDE_DIR "${AVROCPP_ROOT_DIR}/api")
set(AVROCPP_SOURCE_DIR "${AVROCPP_ROOT_DIR}/impl")

View File

@ -18,7 +18,7 @@ endif()
# Need to use C++17 since the compilation is not possible with C++20 currently.
set (CMAKE_CXX_STANDARD 17)
set(CASS_ROOT_DIR ${CMAKE_SOURCE_DIR}/contrib/cassandra)
set(CASS_ROOT_DIR ${PROJECT_SOURCE_DIR}/contrib/cassandra)
set(CASS_SRC_DIR "${CASS_ROOT_DIR}/src")
set(CASS_INCLUDE_DIR "${CASS_ROOT_DIR}/include")

View File

@ -26,7 +26,7 @@ endif ()
# StorageSystemTimeZones.generated.cpp is autogenerated each time during a build
# data in this file will be used to populate the system.time_zones table, this is specific to OS_LINUX
# as the library that's built using embedded tzdata is also specific to OS_LINUX
set(SYSTEM_STORAGE_TZ_FILE "${CMAKE_BINARY_DIR}/src/Storages/System/StorageSystemTimeZones.generated.cpp")
set(SYSTEM_STORAGE_TZ_FILE "${PROJECT_BINARY_DIR}/src/Storages/System/StorageSystemTimeZones.generated.cpp")
# remove existing copies so that its generated fresh on each build.
file(REMOVE ${SYSTEM_STORAGE_TZ_FILE})

View File

@ -1,7 +1,7 @@
# This file is a modified version of contrib/libuv/CMakeLists.txt
set (SOURCE_DIR "${CMAKE_SOURCE_DIR}/contrib/libuv")
set (BINARY_DIR "${CMAKE_BINARY_DIR}/contrib/libuv")
set (SOURCE_DIR "${PROJECT_SOURCE_DIR}/contrib/libuv")
set (BINARY_DIR "${PROJECT_BINARY_DIR}/contrib/libuv")
set(uv_sources
src/fs-poll.c

View File

@ -15,7 +15,7 @@ endif()
# This is the LGPL libmariadb project.
set(CC_SOURCE_DIR ${CMAKE_SOURCE_DIR}/contrib/mariadb-connector-c)
set(CC_SOURCE_DIR ${PROJECT_SOURCE_DIR}/contrib/mariadb-connector-c)
set(CC_BINARY_DIR ${CMAKE_CURRENT_BINARY_DIR})
set(WITH_SSL ON)

View File

@ -1,4 +1,4 @@
set (SOURCE_DIR "${CMAKE_SOURCE_DIR}/contrib/snappy")
set (SOURCE_DIR "${PROJECT_SOURCE_DIR}/contrib/snappy")
if (ARCH_S390X)
set (SNAPPY_IS_BIG_ENDIAN 1)

View File

@ -1,4 +1,4 @@
set (SOURCE_DIR ${CMAKE_SOURCE_DIR}/contrib/zlib-ng)
set (SOURCE_DIR ${PROJECT_SOURCE_DIR}/contrib/zlib-ng)
add_definitions(-DZLIB_COMPAT)
add_definitions(-DWITH_GZFILEOP)

View File

@ -2,34 +2,115 @@
slug: /en/operations/named-collections
sidebar_position: 69
sidebar_label: "Named collections"
title: "Named collections"
---
# Storing details for connecting to external sources in configuration files
Named collections provide a way to store collections of key-value pairs to be
used to configure integrations with external sources. You can use named collections with
dictionaries, tables, table functions, and object storage.
Details for connecting to external sources (dictionaries, tables, table functions) can be saved
in configuration files and thus simplify the creation of objects and hide credentials
from users with only SQL access.
Named collections can be configured with DDL or in configuration files and are applied
when ClickHouse starts. They simplify the creation of objects and the hiding of credentials
from users without administrative access.
Parameters can be set in XML `<format>CSV</format>` and overridden in SQL `, format = 'TSV'`.
The parameters in SQL can be overridden using format `key` = `value`: `compression_method = 'gzip'`.
The keys in a named collection must match the parameter names of the corresponding
function, table engine, database, etc. In the examples below the parameter list is
linked to for each type.
Named collections are stored in the `config.xml` file of the ClickHouse server in the `<named_collections>` section and are applied when ClickHouse starts.
Parameters set in a named collection can be overridden in SQL, this is shown in the examples
below.
Example of configuration:
```xml
$ cat /etc/clickhouse-server/config.d/named_collections.xml
## Storing named collections in the system database
### DDL example
```sql
CREATE NAMED COLLECTION name AS
key_1 = 'value',
key_2 = 'value2',
url = 'https://connection.url/'
```
### Permissions to create named collections with DDL
To manage named collections with DDL a user must have the `named_control_collection` privilege. This can be assigned by adding a file to `/etc/clickhouse-server/users.d/`. The example gives the user `default` both the `access_management` and `named_collection_control` privileges:
```xml title='/etc/clickhouse-server/users.d/user_default.xml'
<clickhouse>
<users>
<default>
<password_sha256_hex>65e84be33532fb784c48129675f9eff3a682b27168c0ea744b2cf58ee02337c5</password_sha256_hex replace=true>
<access_management>1</access_management>
<!-- highlight-start -->
<named_collection_control>1</named_collection_control>
<!-- highlight-end -->
</default>
</users>
</clickhouse>
```
:::tip
In the above example the `passowrd_sha256_hex` value is the hexadecimal representation of the SHA256 hash of the password. This configuration for the user `default` has the attribute `replace=true` as in the default configuration has a plain text `password` set, and it is not possible to have both plain text and sha256 hex passwords set for a user.
:::
## Storing named collections in configuration files
### XML example
```xml title='/etc/clickhouse-server/config.d/named_collections.xml'
<clickhouse>
<named_collections>
...
<name>
<key_1>value</key_1>
<key_2>value_2</key_2>
<url>https://connection.url/</url>
</name>
</named_collections>
</clickhouse>
```
## Named collections for accessing S3.
## Modifying named collections
Named collections that are created with DDL queries can be altered or dropped with DDL. Named collections created with XML files can be managed by editing or deleting the corresponding XML.
### Alter a DDL named collection
Change or add the keys `key1` and `key3` of the collection `collection2`:
```sql
ALTER NAMED COLLECTION collection2 SET key1=4, key3='value3'
```
Remove the key `key2` from `collection2`:
```sql
ALTER NAMED COLLECTION collection2 DELETE key2
```
Change or add the key `key1` and delete the key `key3` of the collection `collection2`:
```sql
ALTER NAMED COLLECTION collection2 SET key1=4, DELETE key3
```
### Drop the DDL named collection `collection2`:
```sql
DROP NAMED COLLECTION collection2
```
## Named collections for accessing S3
The description of parameters see [s3 Table Function](../sql-reference/table-functions/s3.md).
Example of configuration:
### DDL example
```sql
CREATE NAMED COLLECTION s3_mydata AS
access_key_id = 'AKIAIOSFODNN7EXAMPLE',
secret_access_key = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY',
format = 'CSV',
url = 'https://s3.us-east-1.amazonaws.com/yourbucket/mydata/'
```
### XML example
```xml
<clickhouse>
<named_collections>
@ -43,23 +124,23 @@ Example of configuration:
</clickhouse>
```
### Example of using named collections with the s3 function
### s3() function and S3 Table named collection examples
Both of the following examples use the same named collection `s3_mydata`:
#### s3() function
```sql
INSERT INTO FUNCTION s3(s3_mydata, filename = 'test_file.tsv.gz',
format = 'TSV', structure = 'number UInt64', compression_method = 'gzip')
SELECT * FROM numbers(10000);
SELECT count()
FROM s3(s3_mydata, filename = 'test_file.tsv.gz')
┌─count()─┐
│ 10000 │
└─────────┘
1 rows in set. Elapsed: 0.279 sec. Processed 10.00 thousand rows, 90.00 KB (35.78 thousand rows/s., 322.02 KB/s.)
```
### Example of using named collections with an S3 table
:::tip
The first argument to the `s3()` function above is the name of the collection, `s3_mydata`. Without named collections, the access key ID, secret, format, and URL would all be passed in every call to the `s3()` function.
:::
#### S3 table
```sql
CREATE TABLE s3_engine_table (number Int64)
@ -78,7 +159,22 @@ SELECT * FROM s3_engine_table LIMIT 3;
The description of parameters see [mysql](../sql-reference/table-functions/mysql.md).
Example of configuration:
### DDL example
```sql
CREATE NAMED COLLECTION mymysql AS
user = 'myuser',
password = 'mypass',
host = '127.0.0.1',
port = 3306,
database = 'test'
connection_pool_size = 8
on_duplicate_clause = 1
replace_query = 1
```
### XML example
```xml
<clickhouse>
<named_collections>
@ -96,7 +192,11 @@ Example of configuration:
</clickhouse>
```
### Example of using named collections with the mysql function
### mysql() function, MySQL table, MySQL database, and Dictionary named collection examples
The four following examples use the same named collection `mymysql`:
#### mysql() function
```sql
SELECT count() FROM mysql(mymysql, table = 'test');
@ -105,8 +205,11 @@ SELECT count() FROM mysql(mymysql, table = 'test');
│ 3 │
└─────────┘
```
:::note
The named collection does not specify the `table` parameter, so it is specified in the function call as `table = 'test'`.
:::
### Example of using named collections with an MySQL table
#### MySQL table
```sql
CREATE TABLE mytable(A Int64) ENGINE = MySQL(mymysql, table = 'test', connection_pool_size=3, replace_query=0);
@ -117,7 +220,11 @@ SELECT count() FROM mytable;
└─────────┘
```
### Example of using named collections with database with engine MySQL
:::note
The DDL overrides the named collection setting for connection_pool_size.
:::
#### MySQL database
```sql
CREATE DATABASE mydatabase ENGINE = MySQL(mymysql);
@ -130,7 +237,7 @@ SHOW TABLES FROM mydatabase;
└────────┘
```
### Example of using named collections with a dictionary with source MySQL
#### MySQL Dictionary
```sql
CREATE DICTIONARY dict (A Int64, B String)
@ -150,6 +257,17 @@ SELECT dictGet('dict', 'B', 2);
The description of parameters see [postgresql](../sql-reference/table-functions/postgresql.md).
```sql
CREATE NAMED COLLECTION mypg AS
user = 'pguser',
password = 'jw8s0F4',
host = '127.0.0.1',
port = 5432,
database = 'test',
schema = 'test_schema',
connection_pool_size = 8
```
Example of configuration:
```xml
<clickhouse>
@ -229,12 +347,22 @@ SELECT dictGet('dict', 'b', 2);
└─────────────────────────┘
```
## Named collections for accessing remote ClickHouse database
## Named collections for accessing a remote ClickHouse database
The description of parameters see [remote](../sql-reference/table-functions/remote.md/#parameters).
Example of configuration:
```sql
CREATE NAMED COLLECTION remote1 AS
host = 'remote_host',
port = 9000,
database = 'system',
user = 'foo',
password = 'secret',
secure = 1
```
```xml
<clickhouse>
<named_collections>
@ -286,3 +414,4 @@ SELECT dictGet('dict', 'b', 1);
│ a │
└─────────────────────────┘
```

View File

@ -452,6 +452,8 @@ Possible values:
The first phase of a grace join reads the right table and splits it into N buckets depending on the hash value of key columns (initially, N is `grace_hash_join_initial_buckets`). This is done in a way to ensure that each bucket can be processed independently. Rows from the first bucket are added to an in-memory hash table while the others are saved to disk. If the hash table grows beyond the memory limit (e.g., as set by [`max_bytes_in_join`](/docs/en/operations/settings/query-complexity.md/#settings-max_bytes_in_join)), the number of buckets is increased and the assigned bucket for each row. Any rows which dont belong to the current bucket are flushed and reassigned.
Supports `INNER/LEFT/RIGHT/FULL ALL/ANY JOIN`.
- hash
[Hash join algorithm](https://en.wikipedia.org/wiki/Hash_join) is used. The most generic implementation that supports all combinations of kind and strictness and multiple join keys that are combined with `OR` in the `JOIN ON` section.
@ -1377,6 +1379,12 @@ Possible values:
Default value: `default`.
## allow_experimental_parallel_reading_from_replicas
If true, ClickHouse will send a SELECT query to all replicas of a table (up to `max_parallel_replicas`) . It will work for any kind of MergeTree table.
Default value: `false`.
## compile_expressions {#compile-expressions}
Enables or disables compilation of frequently used simple functions and operators to native code with LLVM at runtime.
@ -1708,7 +1716,7 @@ Default value: `100000`.
### async_insert_max_query_number {#async-insert-max-query-number}
The maximum number of insert queries per block before being inserted. This setting takes effect only if [async_insert_deduplicate](#settings-async-insert-deduplicate) is enabled.
The maximum number of insert queries per block before being inserted. This setting takes effect only if [async_insert_deduplicate](#async-insert-deduplicate) is enabled.
Possible values:
@ -1739,7 +1747,7 @@ Possible values:
Default value: `0`.
### async_insert_deduplicate {#settings-async-insert-deduplicate}
### async_insert_deduplicate {#async-insert-deduplicate}
Enables or disables insert deduplication of `ASYNC INSERT` (for Replicated\* tables).
@ -3213,17 +3221,6 @@ Possible values:
Default value: `0`.
## allow_experimental_geo_types {#allow-experimental-geo-types}
Allows working with experimental [geo data types](../../sql-reference/data-types/geo.md).
Possible values:
- 0 — Working with geo data types is disabled.
- 1 — Working with geo data types is enabled.
Default value: `0`.
## database_atomic_wait_for_drop_and_detach_synchronously {#database_atomic_wait_for_drop_and_detach_synchronously}
Adds a modifier `SYNC` to all `DROP` and `DETACH` queries.

View File

@ -97,8 +97,8 @@ Columns:
- `forwarded_for` ([String](../../sql-reference/data-types/string.md)) — HTTP header `X-Forwarded-For` passed in the HTTP query.
- `quota_key` ([String](../../sql-reference/data-types/string.md)) — The `quota key` specified in the [quotas](../../operations/quotas.md) setting (see `keyed`).
- `revision` ([UInt32](../../sql-reference/data-types/int-uint.md)) — ClickHouse revision.
- `ProfileEvents` ([Map(String, UInt64)](../../sql-reference/data-types/array.md)) — ProfileEvents that measure different metrics. The description of them could be found in the table [system.events](../../operations/system-tables/events.md#system_tables-events)
- `Settings` ([Map(String, String)](../../sql-reference/data-types/array.md)) — Settings that were changed when the client ran the query. To enable logging changes to settings, set the `log_query_settings` parameter to 1.
- `ProfileEvents` ([Map(String, UInt64)](../../sql-reference/data-types/map.md)) — ProfileEvents that measure different metrics. The description of them could be found in the table [system.events](../../operations/system-tables/events.md#system_tables-events)
- `Settings` ([Map(String, String)](../../sql-reference/data-types/map.md)) — Settings that were changed when the client ran the query. To enable logging changes to settings, set the `log_query_settings` parameter to 1.
- `log_comment` ([String](../../sql-reference/data-types/string.md)) — Log comment. It can be set to arbitrary string no longer than [max_query_size](../../operations/settings/settings.md#settings-max_query_size). An empty string if it is not defined.
- `thread_ids` ([Array(UInt64)](../../sql-reference/data-types/array.md)) — Thread ids that are participating in query execution.
- `used_aggregate_functions` ([Array(String)](../../sql-reference/data-types/array.md)) — Canonical names of `aggregate functions`, which were used during query execution.

View File

@ -0,0 +1,29 @@
---
slug: /en/operations/system-tables/zookeeper_connection
---
#zookeeper_connection
This table does not exist if ZooKeeper is not configured. The 'system.zookeeper_connection' table shows current connections to ZooKeeper (including auxiliary ZooKeepers). Each row shows information about one connection.
Columns:
- `name` ([String](../../sql-reference/data-types/string.md)) — ZooKeeper cluster's name.
- `host` ([String](../../sql-reference/data-types/string.md)) — The hostname/IP of the ZooKeeper node that ClickHouse connected to.
- `port` ([String](../../sql-reference/data-types/string.md)) — The port of the ZooKeeper node that ClickHouse connected to.
- `index` ([UInt8](../../sql-reference/data-types/int-uint.md)) — The index of the ZooKeeper node that ClickHouse connected to. The index is from ZooKeeper config.
- `connected_time` ([String](../../sql-reference/data-types/string.md)) — When the connection was established
- `is_expired` ([UInt8](../../sql-reference/data-types/int-uint.md)) — Is the current connection expired.
- `keeper_api_version` ([String](../../sql-reference/data-types/string.md)) — Keeper API version.
- `client_id` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Session id of the connection.
Example:
``` sql
SELECT * FROM system.zookeeper_connection;
```
``` text
┌─name──────────────┬─host─────────┬─port─┬─index─┬──────connected_time─┬─is_expired─┬─keeper_api_version─┬──────────client_id─┐
│ default_zookeeper │ 127.0.0.1 │ 2181 │ 0 │ 2023-05-19 14:30:16 │ 0 │ 0 │ 216349144108826660 │
└───────────────────┴──────────────┴──────┴───────┴─────────────────────┴────────────┴────────────────────┴────────────────────┘
```

View File

@ -29,5 +29,5 @@ ClickHouse data types include:
- **Tuples**: A [`Tuple` of elements](./tuple.md), each having an individual type.
- **Nullable**: [`Nullable`](./nullable.md) allows you to store a value as `NULL` when a value is "missing" (instead of the column gettings its default value for the data type)
- **IP addresses**: use [`IPv4`](./domains/ipv4.md) and [`IPv6`](./domains/ipv6.md) to efficiently store IP addresses
- **Geo types**: for[ geographical data](./geo.md), including `Point`, `Ring`, `Polygon` and `MultiPolygon`
- **Special data types**: including [`Expression`](./special-data-types/expression.md), [`Set`](./special-data-types/set.md), [`Nothing`](./special-data-types/nothing.md) and [`Interval`](./special-data-types/interval.md)
- **Geo types**: for [geographical data](./geo.md), including `Point`, `Ring`, `Polygon` and `MultiPolygon`
- **Special data types**: including [`Expression`](./special-data-types/expression.md), [`Set`](./special-data-types/set.md), [`Nothing`](./special-data-types/nothing.md) and [`Interval`](./special-data-types/interval.md)

View File

@ -267,14 +267,16 @@ or
LAYOUT(HASHED())
```
If `shards` greater then 1 (default is `1`) the dictionary will load data in parallel, useful if you have huge amount of elements in one dictionary.
Configuration example:
``` xml
<layout>
<hashed>
<!-- If shards greater then 1 (default is `1`) the dictionary will load
data in parallel, useful if you have huge amount of elements in one
dictionary. -->
<shards>10</shards>
<!-- Size of the backlog for blocks in parallel queue.
Since the bottleneck in parallel loading is rehash, and so to avoid
@ -284,6 +286,14 @@ Configuration example:
10000 is good balance between memory and speed.
Even for 10e10 elements and can handle all the load without starvation. -->
<shard_load_queue_backlog>10000</shard_load_queue_backlog>
<!-- Maximum load factor of the hash table, with greater values, the memory
is utilized more efficiently (less memory is wasted) but read/performance
may deteriorate.
Valid values: [0.5, 0.99]
Default: 0.5 -->
<max_load_factor>0.5</max_load_factor>
</hashed>
</layout>
```
@ -291,7 +301,7 @@ Configuration example:
or
``` sql
LAYOUT(HASHED(SHARDS 10 [SHARD_LOAD_QUEUE_BACKLOG 10000]))
LAYOUT(HASHED([SHARDS 1] [SHARD_LOAD_QUEUE_BACKLOG 10000] [MAX_LOAD_FACTOR 0.5]))
```
### sparse_hashed
@ -304,14 +314,18 @@ Configuration example:
``` xml
<layout>
<sparse_hashed />
<sparse_hashed>
<!-- <shards>1</shards> -->
<!-- <shard_load_queue_backlog>10000</shard_load_queue_backlog> -->
<!-- <max_load_factor>0.5</max_load_factor> -->
</sparse_hashed>
</layout>
```
or
``` sql
LAYOUT(SPARSE_HASHED())
LAYOUT(SPARSE_HASHED([SHARDS 1] [SHARD_LOAD_QUEUE_BACKLOG 10000] [MAX_LOAD_FACTOR 0.5]))
```
It is also possible to use `shards` for this type of dictionary, and again it is more important for `sparse_hashed` then for `hashed`, since `sparse_hashed` is slower.
@ -325,8 +339,9 @@ Configuration example:
``` xml
<layout>
<complex_key_hashed>
<shards>1</shards>
<!-- <shards>1</shards> -->
<!-- <shard_load_queue_backlog>10000</shard_load_queue_backlog> -->
<!-- <max_load_factor>0.5</max_load_factor> -->
</complex_key_hashed>
</layout>
```
@ -334,7 +349,7 @@ Configuration example:
or
``` sql
LAYOUT(COMPLEX_KEY_HASHED([SHARDS 1] [SHARD_LOAD_QUEUE_BACKLOG 10000]))
LAYOUT(COMPLEX_KEY_HASHED([SHARDS 1] [SHARD_LOAD_QUEUE_BACKLOG 10000] [MAX_LOAD_FACTOR 0.5]))
```
### complex_key_sparse_hashed
@ -346,7 +361,9 @@ Configuration example:
``` xml
<layout>
<complex_key_sparse_hashed>
<shards>1</shards>
<!-- <shards>1</shards> -->
<!-- <shard_load_queue_backlog>10000</shard_load_queue_backlog> -->
<!-- <max_load_factor>0.5</max_load_factor> -->
</complex_key_sparse_hashed>
</layout>
```
@ -354,7 +371,7 @@ Configuration example:
or
``` sql
LAYOUT(COMPLEX_KEY_SPARSE_HASHED([SHARDS 1] [SHARD_LOAD_QUEUE_BACKLOG 10000]))
LAYOUT(COMPLEX_KEY_SPARSE_HASHED([SHARDS 1] [SHARD_LOAD_QUEUE_BACKLOG 10000] [MAX_LOAD_FACTOR 0.5]))
```
### hashed_array
@ -2197,16 +2214,16 @@ Result:
└─────────────────────────────────┴───────┘
```
## RegExp Tree Dictionary {#regexp-tree-dictionary}
## Regular Expression Tree Dictionary {#regexp-tree-dictionary}
Regexp Tree dictionary stores multiple trees of regular expressions with attributions. Users can retrieve strings in the dictionary. If a string matches the root of the regexp tree, we will collect the corresponding attributes of the matched root and continue to walk the children. If any of the children matches the string, we will collect attributes and rewrite the old ones if conflicts occur, then continue the traverse until we reach leaf nodes.
Regular expression tree dictionaries are a special type of dictionary which represent the mapping from key to attributes using a tree of regular expressions. There are some use cases, e.g. parsing of (user agent)[https://en.wikipedia.org/wiki/User_agent] strings, which can be expressed elegantly with regexp tree dictionaries.
Example of the ddl query for creating Regexp Tree dictionary:
### Use Regular Expression Tree Dictionary in ClickHouse Open-Source
<CloudDetails />
Regular expression tree dictionaries are defined in ClickHouse open-source using the YAMLRegExpTree source which is provided the path to a YAML file containing the regular expression tree.
```sql
create dictionary regexp_dict
CREATE DICTIONARY regexp_dict
(
regexp String,
name String,
@ -2218,17 +2235,15 @@ LAYOUT(regexp_tree)
...
```
**Source**
The dictionary source `YAMLRegExpTree` represents the structure of a regexp tree. For example:
We introduce a type of source called `YAMLRegExpTree` representing the structure of Regexp Tree dictionary. An Example of a valid yaml config is like:
```xml
```yaml
- regexp: 'Linux/(\d+[\.\d]*).+tlinux'
name: 'TencentOS'
version: '\1'
- regexp: '\d+/tclwebkit(?:\d+[\.\d]*)'
name: 'Andriod'
name: 'Android'
versions:
- regexp: '33/tclwebkit'
version: '13'
@ -2240,17 +2255,14 @@ We introduce a type of source called `YAMLRegExpTree` representing the structure
version: '10'
```
The key `regexp` represents the regular expression of a tree node. The name of key is same as the dictionary key. The `name` and `version` is user-defined attributions in the dicitionary. The `versions` (which can be any name that not appear in attributions or the key) indicates the children nodes of this tree.
This config consists of a list of regular expression tree nodes. Each node has the following structure:
**Back Reference**
- **regexp**: the regular expression of the node.
- **attributes**: a list of user-defined dictionary attributes. In this example, there are two attributes: `name` and `version`. The first node defines both attributes. The second node only defines attribute `name`. Attribute `version` is provided by the child nodes of the second node.
- The value of an attribute may contain **back references**, referring to capture groups of the matched regular expression. In the example, the value of attribute `version` in the first node consists of a back-reference `\1` to capture group `(\d+[\.\d]*)` in the regular expression. Back-reference numbers range from 1 to 9 and are written as `$1` or `\1` (for number 1). The back reference is replaced by the matched capture group during query execution.
- **child nodes**: a list of children of a regexp tree node, each of which has its own attributes and (potentially) children nodes. String matching proceeds in a depth-first fashion. If a string matches a regexp node, the dictionary checks if it also matches the nodes' child nodes. If that is the case, the attributes of the deepest matching node are assigned. Attributes of a child node overwrite equally named attributes of parent nodes. The name of child nodes in YAML files can be arbitrary, e.g. `versions` in above example.
The value of an attribution could contain a back reference which refers to a capture group of the matched regular expression. Reference number ranges from 1 to 9 and writes as `$1` or `\1`.
During the query execution, the back reference in the value will be replaced by the matched capture group.
**Query**
Due to the specialty of Regexp Tree dictionary, we only allow functions `dictGet`, `dictGetOrDefault` and `dictGetOrNull` work with it.
Regexp tree dictionaries only allow access using functions `dictGet`, `dictGetOrDefault` and `dictGetOrNull`.
Example:
@ -2260,12 +2272,83 @@ SELECT dictGet('regexp_dict', ('name', 'version'), '31/tclwebkit1024');
Result:
```
```text
┌─dictGet('regexp_dict', ('name', 'version'), '31/tclwebkit1024')─┐
│ ('Andriod','12') │
│ ('Android','12') │
└─────────────────────────────────────────────────────────────────┘
```
In this case, we first match the regular expression `\d+/tclwebkit(?:\d+[\.\d]*)` in the top layer's second node. The dictionary then continues to look into the child nodes and finds that the string also matches `3[12]/tclwebkit`. As a result, the value of attribute `name` is `Android` (defined in the first layer) and the value of attribute `version` is `12` (defined the child node).
With a powerful YAML configure file, we can use a regexp tree dictionaries as a user agent string parser. We support [uap-core](https://github.com/ua-parser/uap-core) and demonstrate how to use it in the functional test [02504_regexp_dictionary_ua_parser](https://github.com/ClickHouse/ClickHouse/blob/master/tests/queries/0_stateless/02504_regexp_dictionary_ua_parser.sh)
### Use Regular Expression Tree Dictionary in ClickHouse Cloud
Above used `YAMLRegExpTree` source works in ClickHouse Open Source but not in ClickHouse Cloud. To use regexp tree dictionaries in ClickHouse could, first create a regexp tree dictionary from a YAML file locally in ClickHouse Open Source, then dump this dictionary into a CSV file using the `dictionary` table function and the [INTO OUTFILE](../statements/select/into-outfile.md) clause.
```sql
SELECT * FROM dictionary(regexp_dict) INTO OUTFILE('regexp_dict.csv')
```
The content of csv file is:
```text
1,0,"Linux/(\d+[\.\d]*).+tlinux","['version','name']","['\\1','TencentOS']"
2,0,"(\d+)/tclwebkit(\d+[\.\d]*)","['comment','version','name']","['test $1 and $2','$1','Android']"
3,2,"33/tclwebkit","['version']","['13']"
4,2,"3[12]/tclwebkit","['version']","['12']"
5,2,"3[12]/tclwebkit","['version']","['11']"
6,2,"3[12]/tclwebkit","['version']","['10']"
```
The schema of dumped file is:
- `id UInt64`: the id of the RegexpTree node.
- `parent_id UInt64`: the id of the parent of a node.
- `regexp String`: the regular expression string.
- `keys Array(String)`: the names of user-defined attributes.
- `values Array(String)`: the values of user-defined attributes.
To create the dictionary in ClickHouse Cloud, first create a table `regexp_dictionary_source_table` with below table structure:
```sql
CREATE TABLE regexp_dictionary_source_table
(
id UInt64,
parent_id UInt64,
regexp String,
keys Array(String),
values Array(String)
) ENGINE=Memory;
```
Then update the local CSV by
```bash
clickhouse client \
--host MY_HOST \
--secure \
--password MY_PASSWORD \
--query "
INSERT INTO regexp_dictionary_source_table
SELECT * FROM input ('id UInt64, parent_id UInt64, regexp String, keys Array(String), values Array(String)')
FORMAT CSV" < regexp_dict.csv
```
You can see how to [Insert Local Files](https://clickhouse.com/docs/en/integrations/data-ingestion/insert-local-files) for more details. After we initialize the source table, we can create a RegexpTree by table source:
``` sql
CREATE DICTIONARY regexp_dict
(
regexp String,
name String,
version String
PRIMARY KEY(regexp)
SOURCE(CLICKHOUSE(TABLE 'regexp_dictionary_source_table'))
LIFETIME(0)
LAYOUT(regexp_tree);
```
## Embedded Dictionaries {#embedded-dictionaries}
<SelfManaged />

View File

@ -284,13 +284,17 @@ Manipulates data in the specifies partition matching the specified filtering exp
Syntax:
``` sql
ALTER TABLE [db.]table [ON CLUSTER cluster] UPDATE column1 = expr1 [, ...] [IN PARTITION partition_id] WHERE filter_expr
ALTER TABLE [db.]table [ON CLUSTER cluster] UPDATE column1 = expr1 [, ...] [IN PARTITION partition_expr] WHERE filter_expr
```
### Example
``` sql
-- using partition name
ALTER TABLE mt UPDATE x = x + 1 IN PARTITION 2 WHERE p = 2;
-- using partition id
ALTER TABLE mt UPDATE x = x + 1 IN PARTITION ID '2' WHERE p = 2;
```
### See Also
@ -304,13 +308,17 @@ Deletes data in the specifies partition matching the specified filtering express
Syntax:
``` sql
ALTER TABLE [db.]table [ON CLUSTER cluster] DELETE [IN PARTITION partition_id] WHERE filter_expr
ALTER TABLE [db.]table [ON CLUSTER cluster] DELETE [IN PARTITION partition_expr] WHERE filter_expr
```
### Example
``` sql
-- using partition name
ALTER TABLE mt DELETE IN PARTITION 2 WHERE p = 2;
-- using partition id
ALTER TABLE mt DELETE IN PARTITION ID '2' WHERE p = 2;
```
### See Also

View File

@ -9,7 +9,7 @@ sidebar_label: GRANT
- Grants [privileges](#grant-privileges) to ClickHouse user accounts or roles.
- Assigns roles to user accounts or to the other roles.
To revoke privileges, use the [REVOKE](../../sql-reference/statements/revoke.md) statement. Also you can list granted privileges with the [SHOW GRANTS](../../sql-reference/statements/show.md#show-grants-statement) statement.
To revoke privileges, use the [REVOKE](../../sql-reference/statements/revoke.md) statement. Also you can list granted privileges with the [SHOW GRANTS](../../sql-reference/statements/show.md#show-grants) statement.
## Granting Privilege Syntax

View File

@ -3185,16 +3185,6 @@ SELECT * FROM test2;
Значение по умолчанию: `0`.
## allow_experimental_geo_types {#allow-experimental-geo-types}
Разрешает использование экспериментальных типов данных для работы с [географическими структурами](../../sql-reference/data-types/geo.md).
Возможные значения:
- 0 — использование типов данных для работы с географическими структурами не поддерживается.
- 1 — использование типов данных для работы с географическими структурами поддерживается.
Значение по умолчанию: `0`.
## database_atomic_wait_for_drop_and_detach_synchronously {#database_atomic_wait_for_drop_and_detach_synchronously}
Добавляет модификатор `SYNC` ко всем запросам `DROP` и `DETACH`.

View File

@ -8,13 +8,8 @@ sidebar_label: Географические структуры
ClickHouse поддерживает типы данных для отображения географических объектов — точек (местоположений), территорий и т.п.
:::danger "Предупреждение"
Сейчас использование типов данных для работы с географическими структурами является экспериментальной возможностью. Чтобы использовать эти типы данных, включите настройку `allow_experimental_geo_types = 1`.
:::
**См. также**
- [Хранение географических структур данных](https://ru.wikipedia.org/wiki/GeoJSON).
- Настройка [allow_experimental_geo_types](../../operations/settings/settings.md#allow-experimental-geo-types).
## Point {#point-data-type}
@ -25,7 +20,6 @@ ClickHouse поддерживает типы данных для отображ
Запрос:
```sql
SET allow_experimental_geo_types = 1;
CREATE TABLE geo_point (p Point) ENGINE = Memory();
INSERT INTO geo_point VALUES((10, 10));
SELECT p, toTypeName(p) FROM geo_point;
@ -47,7 +41,6 @@ SELECT p, toTypeName(p) FROM geo_point;
Запрос:
```sql
SET allow_experimental_geo_types = 1;
CREATE TABLE geo_ring (r Ring) ENGINE = Memory();
INSERT INTO geo_ring VALUES([(0, 0), (10, 0), (10, 10), (0, 10)]);
SELECT r, toTypeName(r) FROM geo_ring;
@ -69,7 +62,6 @@ SELECT r, toTypeName(r) FROM geo_ring;
Запись в этой таблице описывает многоугольник с одной дырой:
```sql
SET allow_experimental_geo_types = 1;
CREATE TABLE geo_polygon (pg Polygon) ENGINE = Memory();
INSERT INTO geo_polygon VALUES([[(20, 20), (50, 20), (50, 50), (20, 50)], [(30, 30), (50, 50), (50, 30)]]);
SELECT pg, toTypeName(pg) FROM geo_polygon;
@ -92,7 +84,6 @@ SELECT pg, toTypeName(pg) FROM geo_polygon;
Запись в этой таблице описывает элемент, состоящий из двух многоугольников — первый без дыр, а второй с одной дырой:
```sql
SET allow_experimental_geo_types = 1;
CREATE TABLE geo_multipolygon (mpg MultiPolygon) ENGINE = Memory();
INSERT INTO geo_multipolygon VALUES([[[(0, 0), (10, 0), (10, 10), (0, 10)]], [[(20, 20), (50, 20), (50, 50), (20, 50)],[(30, 30), (50, 50), (50, 30)]]]);
SELECT mpg, toTypeName(mpg) FROM geo_multipolygon;

View File

@ -4,10 +4,10 @@ if (NOT(
AND CMAKE_HOST_SYSTEM_PROCESSOR STREQUAL CMAKE_SYSTEM_PROCESSOR
)
)
set (COMPRESSOR "${CMAKE_BINARY_DIR}/native/utils/self-extracting-executable/pre_compressor")
set (DECOMPRESSOR "--decompressor=${CMAKE_BINARY_DIR}/utils/self-extracting-executable/decompressor")
set (COMPRESSOR "${PROJECT_BINARY_DIR}/native/utils/self-extracting-executable/pre_compressor")
set (DECOMPRESSOR "--decompressor=${PROJECT_BINARY_DIR}/utils/self-extracting-executable/decompressor")
else ()
set (COMPRESSOR "${CMAKE_BINARY_DIR}/utils/self-extracting-executable/compressor")
set (COMPRESSOR "${PROJECT_BINARY_DIR}/utils/self-extracting-executable/compressor")
endif ()
add_custom_target (self-extracting ALL

View File

@ -719,8 +719,12 @@
<!-- Default profile of settings. -->
<default_profile>default</default_profile>
<!-- Comma-separated list of prefixes for user-defined settings. -->
<custom_settings_prefixes></custom_settings_prefixes>
<!-- Comma-separated list of prefixes for user-defined settings.
The server will allow to set these settings, and retrieve them with the getSetting function.
They are also logged in the query_log, similarly to other settings, but have no special effect.
The "SQL_" prefix is introduced for compatibility with MySQL - these settings are being set be Tableau.
-->
<custom_settings_prefixes>SQL_</custom_settings_prefixes>
<!-- System profile of settings. This settings are used by internal processes (Distributed DDL worker and so on). -->
<!-- <system_profile>default</system_profile> -->

View File

@ -38,6 +38,9 @@ public:
if (!query->hasGroupBy())
return;
if (query->isGroupByWithCube() || query->isGroupByWithRollup())
return;
auto & group_by = query->getGroupBy().getNodes();
if (query->isGroupByWithGroupingSets())
{

View File

@ -255,6 +255,7 @@ std::unique_ptr<WriteBuffer> BackupWriterS3::writeFile(const String & file_name)
client,
s3_uri.bucket,
fs::path(s3_uri.key) / file_name,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings,
std::nullopt,
threadPoolCallbackRunner<void>(BackupsIOThreadPool::get(), "BackupWriterS3"),

View File

@ -528,7 +528,7 @@ target_link_libraries (clickhouse_common_io PUBLIC ch_contrib::fast_float)
if (USE_ORC)
dbms_target_link_libraries(PUBLIC ${ORC_LIBRARIES})
dbms_target_include_directories(SYSTEM BEFORE PUBLIC ${ORC_INCLUDE_DIR} "${CMAKE_BINARY_DIR}/contrib/orc/c++/include")
dbms_target_include_directories(SYSTEM BEFORE PUBLIC ${ORC_INCLUDE_DIR} "${PROJECT_BINARY_DIR}/contrib/orc/c++/include")
endif ()
if (TARGET ch_contrib::rocksdb)

View File

@ -4,6 +4,8 @@
namespace ProfileEvents
{
extern const Event DistributedConnectionTries;
extern const Event DistributedConnectionUsable;
extern const Event DistributedConnectionMissingTable;
extern const Event DistributedConnectionStaleReplica;
}
@ -35,6 +37,7 @@ void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::
SCOPE_EXIT(is_finished = true);
try
{
ProfileEvents::increment(ProfileEvents::DistributedConnectionTries);
result.entry = pool->get(*timeouts, settings, /* force_connected = */ false);
AsyncCallbackSetter async_setter(&*result.entry, std::move(async_callback));
@ -45,6 +48,7 @@ void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::
if (!table_to_check || server_revision < DBMS_MIN_REVISION_WITH_TABLES_STATUS)
{
result.entry->forceConnected(*timeouts);
ProfileEvents::increment(ProfileEvents::DistributedConnectionUsable);
result.is_usable = true;
result.is_up_to_date = true;
return;
@ -65,6 +69,7 @@ void ConnectionEstablisher::run(ConnectionEstablisher::TryResult & result, std::
return;
}
ProfileEvents::increment(ProfileEvents::DistributedConnectionUsable);
result.is_usable = true;
UInt64 max_allowed_delay = settings ? UInt64(settings->max_replica_delay_for_distributed_queries) : 0;

View File

@ -135,7 +135,6 @@ private:
Protocol::Compression compression; /// Whether to compress data when interacting with the server.
Protocol::Secure secure; /// Whether to encrypt data when interacting with the server.
Int64 priority; /// priority from <remote_servers>
};
/**
@ -192,6 +191,7 @@ inline bool operator==(const ConnectionPoolFactory::Key & lhs, const ConnectionP
{
return lhs.max_connections == rhs.max_connections && lhs.host == rhs.host && lhs.port == rhs.port
&& lhs.default_database == rhs.default_database && lhs.user == rhs.user && lhs.password == rhs.password
&& lhs.quota_key == rhs.quota_key
&& lhs.cluster == rhs.cluster && lhs.cluster_secret == rhs.cluster_secret && lhs.client_name == rhs.client_name
&& lhs.compression == rhs.compression && lhs.secure == rhs.secure && lhs.priority == rhs.priority;
}

View File

@ -73,9 +73,9 @@ IConnectionPool::Entry ConnectionPoolWithFailover::get(const ConnectionTimeouts
Int64 ConnectionPoolWithFailover::getPriority() const
{
return (*std::max_element(nested_pools.begin(), nested_pools.end(), [](const auto &a, const auto &b)
return (*std::max_element(nested_pools.begin(), nested_pools.end(), [](const auto & a, const auto & b)
{
return a->getPriority() - b->getPriority();
return a->getPriority() < b->getPriority();
}))->getPriority();
}

View File

@ -10,6 +10,10 @@
* Instead of this class, you could just use the pair (version, key) in the HashSet as the key
* but then the table would accumulate all the keys that it ever stored, and it was unreasonably growing.
* This class goes a step further and considers the keys with the old version empty in the hash table.
*
* Zero values note:
* A cell in ClearableHashSet can store a zero values as normal value
* If its version is equal to the version of the set itself, then it's not considered as empty even key's value is zero value of the corresponding type
*/
@ -48,30 +52,6 @@ struct ClearableHashTableCell : public BaseCell
ClearableHashTableCell(const Key & key_, const State & state) : BaseCell(key_, state), version(state.version) {}
};
using StringRefBaseCell = HashSetCellWithSavedHash<StringRef, DefaultHash<StringRef>, ClearableHashSetState>;
/// specialization for StringRef to allow zero size key (empty string)
template <>
struct ClearableHashTableCell<StringRef, StringRefBaseCell> : public StringRefBaseCell
{
using State = ClearableHashSetState;
using value_type = typename StringRefBaseCell::value_type;
UInt32 version;
bool isZero(const State & state) const { return version != state.version; }
static bool isZero(const StringRef & key_, const State & state_) { return StringRefBaseCell::isZero(key_, state_); }
/// Set the key value to zero.
void setZero() { version = 0; }
/// Do I need to store the zero key separately (that is, can a zero key be inserted into the hash table).
static constexpr bool need_zero_value_storage = true;
ClearableHashTableCell() { } /// NOLINT
ClearableHashTableCell(const StringRef & key_, const State & state) : StringRefBaseCell(key_, state), version(state.version) { }
};
template <
typename Key,
typename Hash = DefaultHash<Key>,
@ -90,13 +70,6 @@ public:
{
++this->version;
this->m_size = 0;
if constexpr (Cell::need_zero_value_storage)
{
/// clear ZeroValueStorage
if (this->hasZero())
this->clearHasZero();
}
}
};
@ -119,13 +92,6 @@ public:
{
++this->version;
this->m_size = 0;
if constexpr (Cell::need_zero_value_storage)
{
/// clear ZeroValueStorage
if (this->hasZero())
this->clearHasZero();
}
}
};

View File

@ -358,7 +358,7 @@ public:
std::pair<LookupResult, bool> res;
emplace(Cell::getKey(x), res.first, res.second);
if (res.second)
insertSetMapped(res.first->getMapped(), x);
res.first->setMapped(x);
return res;
}

View File

@ -9,6 +9,8 @@
/** NOTE HashMap could only be used for memmoveable (position independent) types.
* Example: std::string is not position independent in libstdc++ with C++11 ABI or in libc++.
* Also, key in hash table must be of type, that zero bytes is compared equals to zero key.
*
* Please keep in sync with PackedHashMap.h
*/
namespace DB
@ -53,13 +55,13 @@ PairNoInit<std::decay_t<First>, std::decay_t<Second>> makePairNoInit(First && fi
}
template <typename Key, typename TMapped, typename Hash, typename TState = HashTableNoState>
template <typename Key, typename TMapped, typename Hash, typename TState = HashTableNoState, typename Pair = PairNoInit<Key, TMapped>>
struct HashMapCell
{
using Mapped = TMapped;
using State = TState;
using value_type = PairNoInit<Key, Mapped>;
using value_type = Pair;
using mapped_type = Mapped;
using key_type = Key;
@ -151,14 +153,14 @@ struct HashMapCell
namespace std
{
template <typename Key, typename TMapped, typename Hash, typename TState>
struct tuple_size<HashMapCell<Key, TMapped, Hash, TState>> : std::integral_constant<size_t, 2> { };
template <typename Key, typename TMapped, typename Hash, typename TState, typename Pair>
struct tuple_size<HashMapCell<Key, TMapped, Hash, TState, Pair>> : std::integral_constant<size_t, 2> { };
template <typename Key, typename TMapped, typename Hash, typename TState>
struct tuple_element<0, HashMapCell<Key, TMapped, Hash, TState>> { using type = Key; };
template <typename Key, typename TMapped, typename Hash, typename TState, typename Pair>
struct tuple_element<0, HashMapCell<Key, TMapped, Hash, TState, Pair>> { using type = Key; };
template <typename Key, typename TMapped, typename Hash, typename TState>
struct tuple_element<1, HashMapCell<Key, TMapped, Hash, TState>> { using type = TMapped; };
template <typename Key, typename TMapped, typename Hash, typename TState, typename Pair>
struct tuple_element<1, HashMapCell<Key, TMapped, Hash, TState, Pair>> { using type = TMapped; };
}
template <typename Key, typename TMapped, typename Hash, typename TState = HashTableNoState>

View File

@ -41,6 +41,8 @@ public:
using Base = HashTable<Key, TCell, Hash, Grower, Allocator>;
using typename Base::LookupResult;
using Base::Base;
void merge(const Self & rhs)
{
if (!this->hasZero() && rhs.hasZero())

View File

@ -117,7 +117,7 @@ inline bool bitEquals(T && a, T && b)
* 3) Hash tables that store the key and do not have a "mapped" value, e.g. the normal HashTable.
* GetKey returns the key, and GetMapped returns a zero void pointer. This simplifies generic
* code that works with mapped values: it can overload on the return type of GetMapped(), and
* doesn't need other parameters. One example is insertSetMapped() function.
* doesn't need other parameters. One example is Cell::setMapped() function.
*
* 4) Hash tables that store both the key and the "mapped" value, e.g. HashMap. Both GetKey and
* GetMapped are supported.
@ -216,17 +216,6 @@ struct HashTableCell
};
/**
* A helper function for HashTable::insert() to set the "mapped" value.
* Overloaded on the mapped type, does nothing if it's VoidMapped.
*/
template <typename ValueType>
void insertSetMapped(VoidMapped /* dest */, const ValueType & /* src */) {}
template <typename MappedType, typename ValueType>
void insertSetMapped(MappedType & dest, const ValueType & src) { dest = src.second; }
/** Determines the size of the hash table, and when and how much it should be resized.
* Has very small state (one UInt8) and useful for Set-s allocated in automatic memory (see uniqExact as an example).
*/
@ -241,6 +230,8 @@ struct HashTableGrower
/// If collision resolution chains are contiguous, we can implement erase operation by moving the elements.
static constexpr auto performs_linear_probing_with_single_step = true;
static constexpr size_t max_size_degree = 23;
/// The size of the hash table in the cells.
size_t bufSize() const { return 1ULL << size_degree; }
@ -259,17 +250,18 @@ struct HashTableGrower
/// Increase the size of the hash table.
void increaseSize()
{
size_degree += size_degree >= 23 ? 1 : 2;
size_degree += size_degree >= max_size_degree ? 1 : 2;
}
/// Set the buffer size by the number of elements in the hash table. Used when deserializing a hash table.
void set(size_t num_elems)
{
size_degree = num_elems <= 1
? initial_size_degree
: ((initial_size_degree > static_cast<size_t>(log2(num_elems - 1)) + 2)
? initial_size_degree
: (static_cast<size_t>(log2(num_elems - 1)) + 2));
if (num_elems <= 1)
size_degree = initial_size_degree;
else if (initial_size_degree > static_cast<size_t>(log2(num_elems - 1)) + 2)
size_degree = initial_size_degree;
else
size_degree = static_cast<size_t>(log2(num_elems - 1)) + 2;
}
void setBufSize(size_t buf_size_)
@ -281,6 +273,7 @@ struct HashTableGrower
/** Determines the size of the hash table, and when and how much it should be resized.
* This structure is aligned to cache line boundary and also occupies it all.
* Precalculates some values to speed up lookups and insertion into the HashTable (and thus has bigger memory footprint than HashTableGrower).
* This grower assume 0.5 load factor
*/
template <size_t initial_size_degree = 8>
class alignas(64) HashTableGrowerWithPrecalculation
@ -290,6 +283,7 @@ class alignas(64) HashTableGrowerWithPrecalculation
UInt8 size_degree = initial_size_degree;
size_t precalculated_mask = (1ULL << initial_size_degree) - 1;
size_t precalculated_max_fill = 1ULL << (initial_size_degree - 1);
static constexpr size_t max_size_degree = 23;
public:
UInt8 sizeDegree() const { return size_degree; }
@ -319,16 +313,17 @@ public:
bool overflow(size_t elems) const { return elems > precalculated_max_fill; }
/// Increase the size of the hash table.
void increaseSize() { increaseSizeDegree(size_degree >= 23 ? 1 : 2); }
void increaseSize() { increaseSizeDegree(size_degree >= max_size_degree ? 1 : 2); }
/// Set the buffer size by the number of elements in the hash table. Used when deserializing a hash table.
void set(size_t num_elems)
{
size_degree = num_elems <= 1
? initial_size_degree
: ((initial_size_degree > static_cast<size_t>(log2(num_elems - 1)) + 2)
? initial_size_degree
: (static_cast<size_t>(log2(num_elems - 1)) + 2));
if (num_elems <= 1)
size_degree = initial_size_degree;
else if (initial_size_degree > static_cast<size_t>(log2(num_elems - 1)) + 2)
size_degree = initial_size_degree;
else
size_degree = static_cast<size_t>(log2(num_elems - 1)) + 2;
increaseSizeDegree(0);
}
@ -753,6 +748,7 @@ protected:
public:
using key_type = Key;
using grower_type = Grower;
using mapped_type = typename Cell::mapped_type;
using value_type = typename Cell::value_type;
using cell_type = Cell;
@ -770,6 +766,14 @@ public:
alloc(grower);
}
explicit HashTable(const Grower & grower_)
: grower(grower_)
{
if (Cell::need_zero_value_storage)
this->zeroValue()->setZero();
alloc(grower);
}
HashTable(size_t reserve_for_num_elements) /// NOLINT
{
if (Cell::need_zero_value_storage)
@ -1037,7 +1041,7 @@ public:
}
if (res.second)
insertSetMapped(res.first->getMapped(), x);
res.first->setMapped(x);
return res;
}

View File

@ -88,8 +88,12 @@ inline StringRef & ALWAYS_INLINE keyHolderGetKey(DB::ArenaKeyHolder & holder)
inline void ALWAYS_INLINE keyHolderPersistKey(DB::ArenaKeyHolder & holder)
{
// Hash table shouldn't ask us to persist a zero key
assert(holder.key.size > 0);
// Normally, our hash table shouldn't ask to persist a zero key,
// but it can happened in the case of clearable hash table (ClearableHashSet, for example).
// The clearable hash table doesn't use zero storage and
// distinguishes empty keys by using cell version, not the value itself.
// So, when an empty StringRef is inserted in ClearableHashSet we'll get here key of zero size.
// assert(holder.key.size > 0);
holder.key.data = holder.pool.insert(holder.key.data, holder.key.size);
}

View File

@ -0,0 +1,107 @@
#pragma once
/// Packed versions HashMap, please keep in sync with HashMap.h
#include <Common/HashTable/HashMap.h>
/// A pair that does not initialize the elements, if not needed.
///
/// NOTE: makePairNoInit() is omitted for PackedPairNoInit since it is not
/// required for PackedHashMap (see mergeBlockWithPipe() for details)
template <typename First, typename Second>
struct __attribute__((packed)) PackedPairNoInit
{
First first;
Second second;
PackedPairNoInit() {} /// NOLINT
template <typename FirstValue>
PackedPairNoInit(FirstValue && first_, NoInitTag)
: first(std::forward<FirstValue>(first_))
{
}
template <typename FirstValue, typename SecondValue>
PackedPairNoInit(FirstValue && first_, SecondValue && second_)
: first(std::forward<FirstValue>(first_))
, second(std::forward<SecondValue>(second_))
{
}
};
/// The difference with ZeroTraits is that PackedZeroTraits accepts PackedPairNoInit instead of Key.
namespace PackedZeroTraits
{
template <typename First, typename Second, template <typename, typename> class PackedPairNoInit>
bool check(const PackedPairNoInit<First, Second> p) { return p.first == First{}; }
template <typename First, typename Second, template <typename, typename> class PackedPairNoInit>
void set(PackedPairNoInit<First, Second> & p) { p.first = First{}; }
}
/// setZero() should be overwritten to pass the pair instead of key, to avoid
/// "reference binding to misaligned address" errors from UBsan.
template <typename Key, typename TMapped, typename Hash, typename TState = HashTableNoState>
struct PackedHashMapCell : public HashMapCell<Key, TMapped, Hash, TState, PackedPairNoInit<Key, TMapped>>
{
using Base = HashMapCell<Key, TMapped, Hash, TState, PackedPairNoInit<Key, TMapped>>;
using State = typename Base::State;
using value_type = typename Base::value_type;
using key_type = typename Base::key_type;
using Mapped = typename Base::Mapped;
using Base::Base;
void setZero() { PackedZeroTraits::set(this->value); }
Key getKey() const { return this->value.first; }
static Key getKey(const value_type & value_) { return value_.first; }
Mapped & getMapped() { return this->value.second; }
Mapped getMapped() const { return this->value.second; }
value_type getValue() const { return this->value; }
bool keyEquals(const Key key_) const { return bitEqualsByValue(this->value.first, key_); }
bool keyEquals(const Key key_, size_t /*hash_*/) const { return bitEqualsByValue(this->value.first, key_); }
bool keyEquals(const Key key_, size_t /*hash_*/, const State & /*state*/) const { return bitEqualsByValue(this->value.first, key_); }
bool isZero(const State & state) const { return isZero(this->value.first, state); }
static bool isZero(const Key key, const State & /*state*/) { return ZeroTraits::check(key); }
static inline bool bitEqualsByValue(key_type a, key_type b) { return a == b; }
template <size_t I>
auto get() const
{
if constexpr (I == 0) return this->value.first;
else if constexpr (I == 1) return this->value.second;
}
};
namespace std
{
template <typename Key, typename TMapped, typename Hash, typename TState>
struct tuple_size<PackedHashMapCell<Key, TMapped, Hash, TState>> : std::integral_constant<size_t, 2> { };
template <typename Key, typename TMapped, typename Hash, typename TState>
struct tuple_element<0, PackedHashMapCell<Key, TMapped, Hash, TState>> { using type = Key; };
template <typename Key, typename TMapped, typename Hash, typename TState>
struct tuple_element<1, PackedHashMapCell<Key, TMapped, Hash, TState>> { using type = TMapped; };
}
/// Packed HashMap - HashMap with structure without padding
///
/// Sometimes padding in structure can be crucial, consider the following
/// example <UInt64, UInt16> as <Key, Value> in this case the padding overhead
/// is 0.375, and this can be major in case of lots of keys.
///
/// Note, there is no need to provide PackedHashSet, since it cannot have padding.
template <
typename Key,
typename Mapped,
typename Hash = DefaultHash<Key>,
typename Grower = HashTableGrower<>,
typename Allocator = HashTableAllocator>
using PackedHashMap = HashMapTable<Key, PackedHashMapCell<Key, Mapped, Hash, HashTableNoState>, Hash, Grower, Allocator>;

View File

@ -224,7 +224,7 @@ public:
emplace(Cell::getKey(x), res.first, res.second, hash_value);
if (res.second)
insertSetMapped(res.first->getMapped(), x);
res.first->setMapped(x);
return res;
}

View File

@ -63,12 +63,13 @@ const char * analyzeImpl(
bool is_first_call = begin == regexp.data();
int depth = 0;
is_trivial = true;
bool is_prefix = true;
required_substring.clear();
bool has_alternative_on_depth_0 = false;
bool has_case_insensitive_flag = false;
/// Substring with a position.
using Substring = std::pair<std::string, size_t>;
/// Substring with is_prefix.
using Substring = std::pair<std::string, bool>;
using Substrings = std::vector<Substring>;
Substrings trivial_substrings(1);
@ -98,6 +99,9 @@ const char * analyzeImpl(
auto finish_non_trivial_char = [&](bool create_new_substr = true)
{
is_trivial = false;
if (create_new_substr)
is_prefix = false;
if (depth != 0)
return;
@ -106,6 +110,7 @@ const char * analyzeImpl(
if (alter.suffix)
{
alter.literal += last_substring->first;
alter.suffix = false;
}
}
@ -126,16 +131,24 @@ const char * analyzeImpl(
if (alter.prefix)
{
alter.literal = last_substring->first + alter.literal;
alter.prefix = is_prefix;
}
}
if (group_required_string.prefix)
{
last_substring->first += group_required_string.literal;
last_substring->second = is_prefix;
}
else
{
finish_non_trivial_char();
last_substring->first = group_required_string.literal;
last_substring->second = false;
}
is_prefix = is_prefix && group_required_string.prefix && group_required_string.suffix;
/// if we can still append, no need to finish it. e.g. abc(de)fg should capture abcdefg
if (!last_substring->first.empty() && !group_required_string.suffix)
{
@ -185,7 +198,6 @@ const char * analyzeImpl(
goto ordinary;
default:
/// all other escape sequences are not supported
is_trivial = false;
finish_non_trivial_char();
break;
}
@ -196,6 +208,7 @@ const char * analyzeImpl(
case '|':
is_trivial = false;
is_prefix = false;
++pos;
if (depth == 0)
{
@ -205,6 +218,7 @@ const char * analyzeImpl(
break;
case '(':
/// bracket does not break is_prefix. for example abc(d) has a prefix 'abcd'
is_trivial = false;
if (!in_square_braces)
{
@ -258,7 +272,6 @@ const char * analyzeImpl(
case '[':
in_square_braces = true;
++depth;
is_trivial = false;
finish_non_trivial_char();
++pos;
break;
@ -270,7 +283,6 @@ const char * analyzeImpl(
--depth;
if (depth == 0)
in_square_braces = false;
is_trivial = false;
finish_non_trivial_char();
++pos;
break;
@ -284,7 +296,6 @@ const char * analyzeImpl(
break;
case '^': case '$': case '.': case '+':
is_trivial = false;
finish_non_trivial_char();
++pos;
break;
@ -296,7 +307,6 @@ const char * analyzeImpl(
case '?':
[[fallthrough]];
case '*':
is_trivial = false;
if (depth == 0 && !last_substring->first.empty() && !in_square_braces)
{
last_substring->first.resize(last_substring->first.size() - 1);
@ -318,8 +328,9 @@ const char * analyzeImpl(
default:
if (depth == 0 && !in_curly_braces && !in_square_braces)
{
/// record the first position of last string.
if (last_substring->first.empty())
last_substring->second = pos - begin;
last_substring->second = is_prefix;
last_substring->first.push_back(*pos);
}
++pos;
@ -328,10 +339,9 @@ const char * analyzeImpl(
}
finish:
finish_non_trivial_char(false);
if (!is_trivial)
{
finish_non_trivial_char(false);
/// we calculate required substring even though has_alternative_on_depth_0.
/// we will clear the required substring after putting it to alternatives.
if (!has_case_insensitive_flag)
@ -357,7 +367,7 @@ finish:
if (max_length >= MIN_LENGTH_FOR_STRSTR || (!is_first_call && max_length > 0))
{
required_substring.literal = candidate_it->first;
required_substring.prefix = candidate_it->second == 0;
required_substring.prefix = candidate_it->second;
required_substring.suffix = candidate_it + 1 == trivial_substrings.end();
}
}
@ -365,7 +375,8 @@ finish:
else if (!trivial_substrings.empty())
{
required_substring.literal = trivial_substrings.front().first;
required_substring.prefix = trivial_substrings.front().second == 0;
/// trivial string means the whole regex is a simple string literal, so the prefix and suffix should be true.
required_substring.prefix = true;
required_substring.suffix = true;
}

View File

@ -7,7 +7,13 @@
#include <Common/logger_useful.h>
#include <Common/Exception.h>
#include <Common/ProfileEvents.h>
#include <Common/Stopwatch.h>
namespace ProfileEvents
{
extern const Event ConnectionPoolIsFullMicroseconds;
}
namespace DB
{
@ -144,17 +150,19 @@ public:
return Entry(*items.back());
}
Stopwatch blocked;
if (timeout < 0)
{
LOG_INFO(log, "No free connections in pool. Waiting undefinitelly.");
LOG_INFO(log, "No free connections in pool. Waiting indefinitely.");
available.wait(lock);
}
else
{
auto timeout_ms = std::chrono::microseconds(timeout);
auto timeout_ms = std::chrono::milliseconds(timeout);
LOG_INFO(log, "No free connections in pool. Waiting {} ms.", timeout_ms.count());
available.wait_for(lock, timeout_ms);
}
ProfileEvents::increment(ProfileEvents::ConnectionPoolIsFullMicroseconds, blocked.elapsedMicroseconds());
}
}

View File

@ -101,7 +101,7 @@ public:
struct ShuffledPool
{
NestedPool * pool{};
const PoolState * state{};
const PoolState * state{}; // WARNING: valid only during initial ordering, dangling
size_t index = 0;
size_t error_count = 0;
size_t slowdown_count = 0;
@ -115,7 +115,6 @@ public:
/// this functor. The pools with lower result value will be tried first.
using GetPriorityFunc = std::function<size_t(size_t index)>;
/// Returns at least min_entries and at most max_entries connections (at most one connection per nested pool).
/// The method will throw if it is unable to get min_entries alive connections or
/// if fallback_to_stale_replicas is false and it is unable to get min_entries connections to up-to-date replicas.
@ -175,10 +174,11 @@ PoolWithFailoverBase<TNestedPool>::getShuffledPools(
}
/// Sort the pools into order in which they will be tried (based on respective PoolStates).
/// Note that `error_count` and `slowdown_count` are used for ordering, but set to zero in the resulting ShuffledPool
std::vector<ShuffledPool> shuffled_pools;
shuffled_pools.reserve(nested_pools.size());
for (size_t i = 0; i < nested_pools.size(); ++i)
shuffled_pools.push_back(ShuffledPool{nested_pools[i].get(), &pool_states[i], i, 0});
shuffled_pools.push_back(ShuffledPool{nested_pools[i].get(), &pool_states[i], i, /* error_count = */ 0, /* slowdown_count = */ 0});
::sort(
shuffled_pools.begin(), shuffled_pools.end(),
[](const ShuffledPool & lhs, const ShuffledPool & rhs)
@ -227,6 +227,10 @@ PoolWithFailoverBase<TNestedPool>::getMany(
{
std::vector<ShuffledPool> shuffled_pools = getShuffledPools(max_ignored_errors, get_priority);
/// Limit `max_tries` value by `max_error_cap` to avoid unlimited number of retries
if (max_tries > max_error_cap)
max_tries = max_error_cap;
/// We will try to get a connection from each pool until a connection is produced or max_tries is reached.
std::vector<TryResult> try_results(shuffled_pools.size());
size_t entries_count = 0;
@ -371,7 +375,7 @@ PoolWithFailoverBase<TNestedPool>::updatePoolStates(size_t max_ignored_errors)
/// distributed_replica_max_ignored_errors
for (auto & state : result)
state.error_count = std::max<UInt64>(0, state.error_count - max_ignored_errors);
state.error_count = state.error_count > max_ignored_errors ? state.error_count - max_ignored_errors : 0;
return result;
}

View File

@ -131,6 +131,8 @@
M(ZooKeeperBytesSent, "Number of bytes send over network while communicating with ZooKeeper.") \
M(ZooKeeperBytesReceived, "Number of bytes received over network while communicating with ZooKeeper.") \
\
M(DistributedConnectionTries, "Total count of distributed connection attempts.") \
M(DistributedConnectionUsable, "Total count of successful distributed connections to a usable server (with required table, but maybe stale).") \
M(DistributedConnectionFailTry, "Total count when distributed connection fails with retry.") \
M(DistributedConnectionMissingTable, "Number of times we rejected a replica from a distributed query, because it did not contain a table needed for the query.") \
M(DistributedConnectionStaleReplica, "Number of times we rejected a replica from a distributed query, because some table needed for a query had replication lag higher than the configured threshold.") \
@ -501,6 +503,8 @@ The server successfully detected this situation and will download merged part fr
M(MergeTreeReadTaskRequestsSentElapsedMicroseconds, "Time spent in callbacks requested from the remote server back to the initiator server to choose the read task (for MergeTree tables). Measured on the remote server side.") \
M(MergeTreeAllRangesAnnouncementsSentElapsedMicroseconds, "Time spent in sending the announcement from the remote server to the initiator server about the set of data parts (for MergeTree tables). Measured on the remote server side.") \
\
M(ConnectionPoolIsFullMicroseconds, "Total time spent waiting for a slot in connection pool.") \
\
M(LogTest, "Number of log messages with level Test") \
M(LogTrace, "Number of log messages with level Trace") \
M(LogDebug, "Number of log messages with level Debug") \

View File

@ -61,20 +61,20 @@ UInt64 Throttler::add(size_t amount)
throw Exception::createDeprecated(limit_exceeded_exception_message + std::string(" Maximum: ") + toString(limit), ErrorCodes::LIMIT_EXCEEDED);
/// Wait unless there is positive amount of tokens - throttling
Int64 sleep_time = 0;
Int64 sleep_time_ns = 0;
if (max_speed && tokens_value < 0)
{
sleep_time = static_cast<Int64>(-tokens_value / max_speed * NS);
accumulated_sleep += sleep_time;
sleepForNanoseconds(sleep_time);
accumulated_sleep -= sleep_time;
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_time / 1000UL);
sleep_time_ns = static_cast<Int64>(-tokens_value / max_speed * NS);
accumulated_sleep += sleep_time_ns;
sleepForNanoseconds(sleep_time_ns);
accumulated_sleep -= sleep_time_ns;
ProfileEvents::increment(ProfileEvents::ThrottlerSleepMicroseconds, sleep_time_ns / 1000UL);
}
if (parent)
sleep_time += parent->add(amount);
sleep_time_ns += parent->add(amount);
return static_cast<UInt64>(sleep_time);
return static_cast<UInt64>(sleep_time_ns);
}
void Throttler::reset()

View File

@ -34,15 +34,15 @@ public:
const std::shared_ptr<Throttler> & parent_ = nullptr);
/// Use `amount` tokens, sleeps if required or throws exception on limit overflow.
/// Returns duration of sleep in microseconds (to distinguish sleeping on different kinds of throttlers for metrics)
/// Returns duration of sleep in nanoseconds (to distinguish sleeping on different kinds of throttlers for metrics)
UInt64 add(size_t amount);
UInt64 add(size_t amount, ProfileEvents::Event event_amount, ProfileEvents::Event event_sleep_us)
{
UInt64 sleep_us = add(amount);
UInt64 sleep_ns = add(amount);
ProfileEvents::increment(event_amount, amount);
ProfileEvents::increment(event_sleep_us, sleep_us);
return sleep_us;
ProfileEvents::increment(event_sleep_us, sleep_ns / 1000UL);
return sleep_ns;
}
/// Not thread safe

View File

@ -466,6 +466,8 @@ public:
/// Useful to check owner of ephemeral node.
virtual int64_t getSessionID() const = 0;
virtual String getConnectedAddress() const = 0;
/// If the method will throw an exception, callbacks won't be called.
///
/// After the method is executed successfully, you must wait for callbacks

View File

@ -39,6 +39,7 @@ public:
bool isExpired() const override { return expired; }
int64_t getSessionID() const override { return 0; }
String getConnectedAddress() const override { return connected_zk_address; }
void create(
@ -126,6 +127,8 @@ private:
zkutil::ZooKeeperArgs args;
String connected_zk_address;
std::mutex push_request_mutex;
std::atomic<bool> expired{false};

View File

@ -111,6 +111,26 @@ void ZooKeeper::init(ZooKeeperArgs args_)
LOG_TRACE(log, "Initialized, hosts: {}", fmt::join(args.hosts, ","));
else
LOG_TRACE(log, "Initialized, hosts: {}, chroot: {}", fmt::join(args.hosts, ","), args.chroot);
String address = impl->getConnectedAddress();
size_t colon_pos = address.find(':');
connected_zk_host = address.substr(0, colon_pos);
connected_zk_port = address.substr(colon_pos + 1);
connected_zk_index = 0;
if (args.hosts.size() > 1)
{
for (size_t i = 0; i < args.hosts.size(); i++)
{
if (args.hosts[i] == address)
{
connected_zk_index = i;
break;
}
}
}
}
else if (args.implementation == "testkeeper")
{

View File

@ -523,6 +523,10 @@ public:
void setServerCompletelyStarted();
String getConnectedZooKeeperHost() const { return connected_zk_host; }
String getConnectedZooKeeperPort() const { return connected_zk_port; }
size_t getConnectedZooKeeperIndex() const { return connected_zk_index; }
private:
void init(ZooKeeperArgs args_);
@ -586,6 +590,10 @@ private:
ZooKeeperArgs args;
String connected_zk_host;
String connected_zk_port;
size_t connected_zk_index;
std::mutex mutex;
Poco::Logger * log = nullptr;

View File

@ -433,6 +433,8 @@ void ZooKeeper::connect(
}
connected = true;
connected_zk_address = node.address.toString();
break;
}
catch (...)
@ -448,6 +450,8 @@ void ZooKeeper::connect(
if (!connected)
{
WriteBufferFromOwnString message;
connected_zk_address = "";
message << "All connection tries failed while connecting to ZooKeeper. nodes: ";
bool first = true;
for (const auto & node : nodes)

View File

@ -125,6 +125,8 @@ public:
/// Useful to check owner of ephemeral node.
int64_t getSessionID() const override { return session_id; }
String getConnectedAddress() const override { return connected_zk_address; }
void executeGenericRequest(
const ZooKeeperRequestPtr & request,
ResponseCallback callback);
@ -201,6 +203,7 @@ public:
private:
ACLs default_acls;
String connected_zk_address;
zkutil::ZooKeeperArgs args;

View File

@ -4,37 +4,40 @@
TEST(OptimizeRE, analyze)
{
auto test_f = [](const std::string & regexp, const std::string & answer, std::vector<std::string> expect_alternatives = {}, bool trival_expected = false)
auto test_f = [](const std::string & regexp, const std::string & required, std::vector<std::string> expect_alternatives = {}, bool trival_expected = false, bool prefix_expected = false)
{
std::string required;
std::string answer;
bool is_trivial;
bool is_prefix;
std::vector<std::string> alternatives;
OptimizedRegularExpression::analyze(regexp, required, is_trivial, is_prefix, alternatives);
OptimizedRegularExpression::analyze(regexp, answer, is_trivial, is_prefix, alternatives);
std::cerr << regexp << std::endl;
EXPECT_EQ(required, answer);
EXPECT_EQ(alternatives, expect_alternatives);
EXPECT_EQ(is_trivial, trival_expected);
EXPECT_EQ(is_prefix, prefix_expected);
};
test_f("abc", "abc", {}, true);
test_f("abc", "abc", {}, true, true);
test_f("c([^k]*)de", "");
test_f("abc(de)fg", "abcdefg");
test_f("abc(de|xyz)fg", "abc", {"abcdefg", "abcxyzfg"});
test_f("abc(de?f|xyz)fg", "abc", {"abcd", "abcxyzfg"});
test_f("abc(de)fg", "abcdefg", {}, false, true);
test_f("abc(de|xyz)fg", "abc", {"abcdefg", "abcxyzfg"}, false, true);
test_f("abc(de?f|xyz)fg", "abc", {"abcd", "abcxyzfg"}, false, true);
test_f("abc|fgk|xyz", "", {"abc","fgk", "xyz"});
test_f("(abc)", "abc");
test_f("(abc)", "abc", {}, false, true);
test_f("(abc|fgk)", "", {"abc","fgk"});
test_f("(abc|fgk)(e|f|zkh|)", "", {"abc","fgk"});
test_f("abc(abc|fg)xyzz", "xyzz", {"abcabcxyzz","abcfgxyzz"});
test_f("((abc|fg)kkk*)xyzz", "xyzz", {"abckk", "fgkk"});
test_f("abc(*(abc|fg)*)xyzz", "xyzz");
test_f("abc[k]xyzz", "xyzz");
test_f("(abc[k]xyzz)", "xyzz");
test_f("abc((de)fg(hi))jk", "abcdefghijk");
test_f("abc((?:de)fg(?:hi))jk", "abcdefghijk");
test_f("abc((de)fghi+zzz)jk", "abcdefghi");
test_f("abc((de)fg(hi))?jk", "abc");
test_f("abc((de)fghi?zzz)jk", "abcdefgh");
test_f("abc((de)fg(hi))jk", "abcdefghijk", {}, false, true);
test_f("abc((?:de)fg(?:hi))jk", "abcdefghijk", {}, false, true);
test_f("abc((de)fghi+zzz)jk", "abcdefghi", {}, false, true);
test_f("abc((de)fg(hi))?jk", "abc", {}, false, true);
test_f("abc((de)fghi?zzz)jk", "abcdefgh", {}, false, true);
test_f("abc(*cd)jk", "cdjk");
test_f(R"(abc(de|xyz|(\{xx\}))fg)", "abc", {"abcdefg", "abcxyzfg", "abc{xx}fg"});
test_f(R"(abc(de|xyz|(\{xx\}))fg)", "abc", {"abcdefg", "abcxyzfg", "abc{xx}fg"}, false, true);
test_f("abc(abc|fg)?xyzz", "xyzz");
test_f("abc(abc|fg){0,1}xyzz", "xyzz");
test_f("abc(abc|fg)xyzz|bcdd?k|bc(f|g|h?)z", "", {"abcabcxyzz", "abcfgxyzz", "bcd", "bc"});
@ -43,4 +46,5 @@ TEST(OptimizeRE, analyze)
test_f(R"([Bb]ai[Dd]u[Ss]pider(?:-[A-Za-z]{1,30})(?:-[A-Za-z]{1,30}|)|bingbot|\bYeti(?:-[a-z]{1,30}|)|Catchpoint(?: bot|)|[Cc]harlotte|Daumoa(?:-feedfetcher|)|(?:[a-zA-Z]{1,30}-|)Googlebot(?:-[a-zA-Z]{1,30}|))", "", {"pider-", "bingbot", "Yeti-", "Yeti", "Catchpoint bot", "Catchpoint", "harlotte", "Daumoa-feedfetcher", "Daumoa", "-Googlebot", "Googlebot"});
test_f("abc|(:?xx|yy|zz|x?)def", "", {"abc", "def"});
test_f("abc|(:?xx|yy|zz|x?){1,2}def", "", {"abc", "def"});
test_f(R"(\\A(?:(?:[-0-9_a-z]+(?:\\.[-0-9_a-z]+)*)/k8s1)\\z)", "/k8s1");
}

View File

@ -279,7 +279,17 @@ private:
flush();
if (log_file_settings.max_size != 0)
ftruncate(file_buffer->getFD(), initial_file_size + file_buffer->count());
{
int res = -1;
do
{
res = ftruncate(file_buffer->getFD(), initial_file_size + file_buffer->count());
}
while (res < 0 && errno == EINTR);
if (res != 0)
LOG_WARNING(log, "Could not ftruncate file. Error: {}, errno: {}", errnoToString(), errno);
}
if (log_file_settings.compress_logs)
compressed_buffer.reset();

View File

@ -149,6 +149,7 @@ void KeeperSnapshotManagerS3::uploadSnapshotImpl(const std::string & snapshot_pa
s3_client->client,
s3_client->uri.bucket,
key,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings_1
};
};

View File

@ -23,6 +23,8 @@ namespace DB
M(UInt64, io_thread_pool_queue_size, 10000, "Queue size for IO thread pool.", 0) \
M(UInt64, max_outdated_parts_loading_thread_pool_size, 32, "The maximum number of threads that would be used for loading outdated data parts on startup", 0) \
M(UInt64, outdated_part_loading_thread_pool_queue_size, 10000, "Queue size for parts loading thread pool.", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited.", 0) \
M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited.", 0) \
M(UInt64, max_remote_read_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for read. Zero means unlimited.", 0) \
M(UInt64, max_remote_write_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for write. Zero means unlimited.", 0) \
M(UInt64, max_local_read_bandwidth_for_server, 0, "The maximum speed of local reads in bytes per second. Zero means unlimited.", 0) \

View File

@ -19,7 +19,7 @@ class IColumn;
/** List of settings: type, name, default value, description, flags
*
* This looks rather unconvenient. It is done that way to avoid repeating settings in different places.
* This looks rather inconvenient. It is done that way to avoid repeating settings in different places.
* Note: as an alternative, we could implement settings to be completely dynamic in form of map: String -> Field,
* but we are not going to do it, because settings is used everywhere as static struct fields.
*
@ -101,8 +101,6 @@ class IColumn;
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \
M(Bool, replace_running_query, false, "Whether the running request should be canceled with the same id as the new one.", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_replicated_sends_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated sends. Zero means unlimited. Only has meaning at server startup.", 0) \
M(UInt64, max_remote_read_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for read.", 0) \
M(UInt64, max_remote_write_network_bandwidth, 0, "The maximum speed of data exchange over the network in bytes per second for write.", 0) \
M(UInt64, max_local_read_bandwidth, 0, "The maximum speed of local reads in bytes per second.", 0) \
@ -541,7 +539,6 @@ class IColumn;
M(Seconds, lock_acquire_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "How long locking request should wait before failing", 0) \
M(Bool, materialize_ttl_after_modify, true, "Apply TTL for old data, after ALTER MODIFY TTL query", 0) \
M(String, function_implementation, "", "Choose function implementation for specific target or variant (experimental). If empty enable all of them.", 0) \
M(Bool, allow_experimental_geo_types, true, "Allow geo data types such as Point, Ring, Polygon, MultiPolygon", 0) \
M(Bool, data_type_default_nullable, false, "Data types without NULL or NOT NULL will make Nullable", 0) \
M(Bool, cast_keep_nullable, false, "CAST operator keep Nullable for result data type", 0) \
M(Bool, cast_ipv4_ipv6_default_on_conversion_error, false, "CAST operator into IPv4, CAST operator into IPV6 type, toIPv4, toIPv6 functions will return default value instead of throwing exception on conversion error.", 0) \
@ -768,6 +765,8 @@ class IColumn;
MAKE_OBSOLETE(M, Bool, allow_experimental_bigint_types, true) \
MAKE_OBSOLETE(M, Bool, allow_experimental_window_functions, true) \
MAKE_OBSOLETE(M, Bool, allow_experimental_lightweight_delete, true) \
MAKE_OBSOLETE(M, Bool, allow_experimental_geo_types, true) \
\
MAKE_OBSOLETE(M, Milliseconds, async_insert_stale_timeout_ms, 0) \
MAKE_OBSOLETE(M, HandleKafkaErrorMode, handle_kafka_error_mode, HandleKafkaErrorMode::DEFAULT) \
MAKE_OBSOLETE(M, Bool, database_replicated_ddl_output, true) \
@ -792,6 +791,8 @@ class IColumn;
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, background_distributed_schedule_pool_size, 16) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, max_remote_read_network_bandwidth_for_server, 0) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, max_remote_write_network_bandwidth_for_server, 0) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, max_replicated_fetches_network_bandwidth_for_server, 0) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, max_replicated_sends_network_bandwidth_for_server, 0) \
/* ---- */ \
MAKE_OBSOLETE(M, DefaultDatabaseEngine, default_database_engine, DefaultDatabaseEngine::Atomic) \
MAKE_OBSOLETE(M, UInt64, max_pipeline_depth, 0) \
@ -913,6 +914,7 @@ class IColumn;
M(Bool, output_format_parquet_fixed_string_as_fixed_byte_array, true, "Use Parquet FIXED_LENGTH_BYTE_ARRAY type instead of Binary for FixedString columns.", 0) \
M(ParquetVersion, output_format_parquet_version, "2.latest", "Parquet format version for output format. Supported versions: 1.0, 2.4, 2.6 and 2.latest (default)", 0) \
M(ParquetCompression, output_format_parquet_compression_method, "lz4", "Compression method for Parquet output format. Supported codecs: snappy, lz4, brotli, zstd, gzip, none (uncompressed)", 0) \
M(Bool, output_format_parquet_compliant_nested_types, true, "In parquet file schema, use name 'element' instead of 'item' for list elements. This is a historical artifact of Arrow library implementation. Generally increases compatibility, except perhaps with some old versions of Arrow.", 0) \
M(String, output_format_avro_codec, "", "Compression codec used for output. Possible values: 'null', 'deflate', 'snappy'.", 0) \
M(UInt64, output_format_avro_sync_interval, 16 * 1024, "Sync interval in bytes.", 0) \
M(String, output_format_avro_string_column_pattern, "", "For Avro format: regexp of String columns to select as AVRO string.", 0) \

View File

@ -80,8 +80,9 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"23.5", {{"input_format_parquet_preserve_order", true, false, "Allow Parquet reade to reorder rows for better parallelism."},
{"parallelize_output_from_storages", false, true, "Allow parallelism when executing queries that read from file/url/s3/etc. This may reorder rows."}}},
{"23.5", {{"input_format_parquet_preserve_order", true, false, "Allow Parquet reader to reorder rows for better parallelism."},
{"parallelize_output_from_storages", false, true, "Allow parallelism when executing queries that read from file/url/s3/etc. This may reorder rows."},
{"output_format_parquet_compliant_nested_types", false, true, "Change an internal field name in output Parquet file schema."}}},
{"23.4", {{"allow_suspicious_indices", true, false, "If true, index can defined with identical expressions"},
{"connect_timeout_with_failover_ms", 50, 1000, "Increase default connect timeout because of async connect"},
{"connect_timeout_with_failover_secure_ms", 100, 1000, "Increase default secure connect timeout because of async connect"},

View File

@ -9,6 +9,7 @@
#include <Common/logger_useful.h>
#include <Common/ConcurrentBoundedQueue.h>
#include <Common/CurrentMetrics.h>
#include <Common/MemoryTrackerBlockerInThread.h>
#include <Core/Defines.h>
@ -21,6 +22,7 @@
#include <Dictionaries/DictionarySource.h>
#include <Dictionaries/DictionaryFactory.h>
#include <Dictionaries/HierarchyDictionariesUtils.h>
#include <Dictionaries/HashedDictionaryCollectionTraits.h>
namespace CurrentMetrics
{
@ -28,24 +30,11 @@ namespace CurrentMetrics
extern const Metric HashedDictionaryThreadsActive;
}
namespace
{
/// NOTE: Trailing return type is explicitly specified for SFINAE.
/// google::sparse_hash_map
template <typename T> auto getKeyFromCell(const T & value) -> decltype(value->first) { return value->first; } // NOLINT
template <typename T> auto getValueFromCell(const T & value) -> decltype(value->second) { return value->second; } // NOLINT
/// HashMap
template <typename T> auto getKeyFromCell(const T & value) -> decltype(value->getKey()) { return value->getKey(); } // NOLINT
template <typename T> auto getValueFromCell(const T & value) -> decltype(value->getMapped()) { return value->getMapped(); } // NOLINT
}
namespace DB
{
using namespace HashedDictionaryImpl;
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
@ -80,6 +69,9 @@ public:
shards_queues[shard].emplace(backlog);
pool.scheduleOrThrowOnError([this, shard, thread_group = CurrentThread::getGroup()]
{
/// Do not account memory that was occupied by the dictionaries for the query/user context.
MemoryTrackerBlockerInThread memory_blocker;
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("HashedDictLoad");
@ -238,14 +230,14 @@ HashedDictionary<dictionary_key_type, sparse, sharded>::~HashedDictionary()
pool.trySchedule([&container, thread_group = CurrentThread::getGroup()]
{
/// Do not account memory that was occupied by the dictionaries for the query/user context.
MemoryTrackerBlockerInThread memory_blocker;
if (thread_group)
CurrentThread::attachToGroupIfDetached(thread_group);
setThreadName("HashedDictDtor");
if constexpr (sparse)
container.clear();
else
container.clearAndShrink();
clearContainer(container);
});
++hash_tables_count;
@ -647,6 +639,8 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::createAttributes()
const auto size = dict_struct.attributes.size();
attributes.reserve(size);
HashTableGrowerWithPrecalculationAndMaxLoadFactor grower(configuration.max_load_factor);
for (const auto & dictionary_attribute : dict_struct.attributes)
{
auto type_call = [&, this](const auto & dictionary_attribute_type)
@ -656,8 +650,28 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::createAttributes()
using ValueType = DictionaryValueType<AttributeType>;
auto is_nullable_sets = dictionary_attribute.is_nullable ? std::make_optional<NullableSets>(configuration.shards) : std::optional<NullableSets>{};
Attribute attribute{dictionary_attribute.underlying_type, std::move(is_nullable_sets), CollectionsHolder<ValueType>(configuration.shards)};
attributes.emplace_back(std::move(attribute));
if constexpr (IsBuiltinHashTable<typename CollectionsHolder<ValueType>::value_type>)
{
CollectionsHolder<ValueType> collections;
collections.reserve(configuration.shards);
for (size_t i = 0; i < configuration.shards; ++i)
collections.emplace_back(grower);
Attribute attribute{dictionary_attribute.underlying_type, std::move(is_nullable_sets), std::move(collections)};
attributes.emplace_back(std::move(attribute));
}
else
{
Attribute attribute{dictionary_attribute.underlying_type, std::move(is_nullable_sets), CollectionsHolder<ValueType>(configuration.shards)};
for (auto & container : std::get<CollectionsHolder<ValueType>>(attribute.containers))
container.max_load_factor(configuration.max_load_factor);
attributes.emplace_back(std::move(attribute));
}
if constexpr (IsBuiltinHashTable<typename CollectionsHolder<ValueType>::value_type>)
LOG_TRACE(log, "Using builtin hash table for {} attribute", dictionary_attribute.name);
else
LOG_TRACE(log, "Using sparsehash for {} attribute", dictionary_attribute.name);
};
callOnDictionaryAttributeType(dictionary_attribute.underlying_type, type_call);
@ -665,7 +679,9 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::createAttributes()
if (unlikely(attributes.size()) == 0)
{
no_attributes_containers.resize(configuration.shards);
no_attributes_containers.reserve(configuration.shards);
for (size_t i = 0; i < configuration.shards; ++i)
no_attributes_containers.emplace_back(grower);
}
string_arenas.resize(configuration.shards);
@ -834,12 +850,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::resize(size_t added
if (unlikely(attributes_size == 0))
{
size_t reserve_size = added_rows + no_attributes_containers.front().size();
if constexpr (sparse)
no_attributes_containers.front().resize(reserve_size);
else
no_attributes_containers.front().reserve(reserve_size);
resizeContainer(no_attributes_containers.front(), reserve_size);
return;
}
@ -849,11 +860,7 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::resize(size_t added
{
auto & container = containers.front();
size_t reserve_size = added_rows + container.size();
if constexpr (sparse)
container.resize(reserve_size);
else
container.reserve(reserve_size);
resizeContainer(container, reserve_size);
});
}
}
@ -973,25 +980,9 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::calculateBytesAlloc
{
for (const auto & container : containers)
{
using ContainerType = std::decay_t<decltype(container)>;
using AttributeValueType = typename ContainerType::mapped_type;
bytes_allocated += sizeof(container);
if constexpr (sparse || std::is_same_v<AttributeValueType, Field>)
{
/// bucket_count() - Returns table size, that includes empty and deleted
/// size() - Returns table size, without empty and deleted
/// and since this is sparsehash, empty cells should not be significant,
/// and since items cannot be removed from the dictionary, deleted is also not important.
bytes_allocated += container.size() * (sizeof(KeyType) + sizeof(AttributeValueType));
bucket_count += container.bucket_count();
}
else
{
bytes_allocated += container.getBufferSizeInBytes();
bucket_count += container.getBufferSizeInCells();
}
bytes_allocated += getBufferSizeInBytes(container);
bucket_count += getBufferSizeInCells(container);
}
});
@ -1010,17 +1001,8 @@ void HashedDictionary<dictionary_key_type, sparse, sharded>::calculateBytesAlloc
for (const auto & container : no_attributes_containers)
{
bytes_allocated += sizeof(container);
if constexpr (sparse)
{
bytes_allocated += container.size() * (sizeof(KeyType));
bucket_count += container.bucket_count();
}
else
{
bytes_allocated += container.getBufferSizeInBytes();
bucket_count += container.getBufferSizeInCells();
}
bytes_allocated += getBufferSizeInBytes(container);
bucket_count += getBufferSizeInCells(container);
}
}
@ -1078,12 +1060,7 @@ Pipe HashedDictionary<dictionary_key_type, sparse, sharded>::read(const Names &
keys.reserve(keys.size() + container.size());
for (const auto & key : container)
{
if constexpr (sparse)
keys.emplace_back(key);
else
keys.emplace_back(key.getKey());
}
keys.emplace_back(getSetKeyFromCell(key));
}
}
@ -1192,9 +1169,14 @@ void registerDictionaryHashed(DictionaryFactory & factory)
if (shard_load_queue_backlog <= 0)
throw Exception(ErrorCodes::BAD_ARGUMENTS,"{}: SHARD_LOAD_QUEUE_BACKLOG parameter should be greater then zero", full_name);
float max_load_factor = static_cast<float>(config.getDouble(config_prefix + dictionary_layout_prefix + ".max_load_factor", 0.5));
if (max_load_factor < 0.5f || max_load_factor > 0.99f)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "{}: max_load_factor parameter should be within [0.5, 0.99], got {}", full_name, max_load_factor);
HashedDictionaryConfiguration configuration{
static_cast<UInt64>(shards),
static_cast<UInt64>(shard_load_queue_backlog),
max_load_factor,
require_nonempty,
dict_lifetime,
};

View File

@ -4,17 +4,14 @@
#include <memory>
#include <variant>
#include <optional>
#include <sparsehash/sparse_hash_map>
#include <sparsehash/sparse_hash_set>
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/HashSet.h>
#include <Core/Block.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/IDictionarySource.h>
#include <Dictionaries/DictionaryHelpers.h>
#include <Dictionaries/HashedDictionaryCollectionType.h>
/** This dictionary stores all content in a hash table in memory
* (a separate Key -> Value map for each attribute)
@ -28,6 +25,7 @@ struct HashedDictionaryConfiguration
{
const UInt64 shards;
const UInt64 shard_load_queue_backlog;
const float max_load_factor;
const bool require_nonempty;
const DictionaryLifetime lifetime;
};
@ -136,42 +134,7 @@ public:
private:
template <typename Value>
using CollectionTypeNonSparse = std::conditional_t<
dictionary_key_type == DictionaryKeyType::Simple,
HashMap<UInt64, Value, DefaultHash<UInt64>>,
HashMapWithSavedHash<StringRef, Value, DefaultHash<StringRef>>>;
using NoAttributesCollectionTypeNonSparse = std::conditional_t<
dictionary_key_type == DictionaryKeyType::Simple,
HashSet<UInt64, DefaultHash<UInt64>>,
HashSetWithSavedHash<StringRef, DefaultHash<StringRef>>>;
/// Here we use sparse_hash_map with DefaultHash<> for the following reasons:
///
/// - DefaultHash<> is used for HashMap
/// - DefaultHash<> (from HashTable/Hash.h> works better then std::hash<>
/// in case of sequential set of keys, but with random access to this set, i.e.
///
/// SELECT number FROM numbers(3000000) ORDER BY rand()
///
/// And even though std::hash<> works better in some other cases,
/// DefaultHash<> is preferred since the difference for this particular
/// case is significant, i.e. it can be 10x+.
template <typename Value>
using CollectionTypeSparse = std::conditional_t<
dictionary_key_type == DictionaryKeyType::Simple,
google::sparse_hash_map<UInt64, Value, DefaultHash<KeyType>>,
google::sparse_hash_map<StringRef, Value, DefaultHash<KeyType>>>;
using NoAttributesCollectionTypeSparse = google::sparse_hash_set<KeyType, DefaultHash<KeyType>>;
template <typename Value>
using CollectionType = std::conditional_t<sparse, CollectionTypeSparse<Value>, CollectionTypeNonSparse<Value>>;
template <typename Value>
using CollectionsHolder = std::vector<CollectionType<Value>>;
using NoAttributesCollectionType = std::conditional_t<sparse, NoAttributesCollectionTypeSparse, NoAttributesCollectionTypeNonSparse>;
using CollectionsHolder = std::vector<typename HashedDictionaryImpl::HashedDictionaryMapType<dictionary_key_type, sparse, KeyType, Value>::Type>;
using NullableSet = HashSet<KeyType, DefaultHash<KeyType>>;
using NullableSets = std::vector<NullableSet>;
@ -269,7 +232,7 @@ private:
BlockPtr update_field_loaded_block;
std::vector<std::unique_ptr<Arena>> string_arenas;
std::vector<NoAttributesCollectionType> no_attributes_containers;
std::vector<typename HashedDictionaryImpl::HashedDictionarySetType<dictionary_key_type, sparse, KeyType>::Type> no_attributes_containers;
DictionaryHierarchicalParentToChildIndexPtr hierarchical_index;
};

View File

@ -0,0 +1,107 @@
#pragma once
#include <type_traits>
#include <sparsehash/sparse_hash_map>
#include <Common/HashTable/Hash.h>
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/HashSet.h>
#include <Common/HashTable/PackedHashMap.h>
namespace DB
{
namespace HashedDictionaryImpl
{
/// sparse_hash_map/sparse_hash_set
template <typename C>
concept IsGoogleSparseHashTable = std::is_same_v<C, google::sparse_hash_map<
typename C::key_type,
typename C::mapped_type,
/// HashFcn is not exported in sparse_hash_map is public type
DefaultHash<typename C::key_type>>>;
template <typename V>
concept IsStdMapCell = requires (V v)
{
v->first;
v->second;
};
/// HashMap/HashMapWithSavedHash/HashSet/HashMapWithSavedHash/PackedHashMap and their Cells
template <typename C>
concept IsBuiltinHashTable = (
std::is_same_v<C, HashMapWithSavedHash<
typename C::key_type,
typename C::mapped_type,
DefaultHash<typename C::key_type>,
typename C::grower_type>> ||
std::is_same_v<C, HashMap<
typename C::key_type,
typename C::mapped_type,
DefaultHash<typename C::key_type>,
typename C::grower_type>> ||
std::is_same_v<C, PackedHashMap<
typename C::key_type,
typename C::mapped_type,
DefaultHash<typename C::key_type>,
typename C::grower_type>> ||
std::is_same_v<C, HashSetWithSavedHash<
typename C::key_type,
DefaultHash<typename C::key_type>,
typename C::grower_type>> ||
std::is_same_v<C, HashSet<
typename C::key_type,
DefaultHash<typename C::key_type>,
typename C::grower_type>>
);
template <typename V>
concept IsBuiltinSetCell = requires (V v)
{
v.getKey();
};
template <typename V>
concept IsBuiltinMapCell = requires (V v)
{
v->getKey();
v->getMapped();
};
// NOLINTBEGIN(*)
/// google::sparse_hash_map
template <typename T> auto getSetKeyFromCell(const T & value) { return value; }
template <typename T> auto getKeyFromCell(const T & value) requires (IsStdMapCell<T>) { return value->first; }
template <typename T> auto getValueFromCell(const T & value) requires (IsStdMapCell<T>) { return value->second; }
/// size() - returns table size, without empty and deleted
/// and since this is sparsehash, empty cells should not be significant,
/// and since items cannot be removed from the dictionary, deleted is also not important.
///
/// NOTE: for google::sparse_hash_set value_type is Key, for sparse_hash_map
/// value_type is std::pair<Key, Value>, and now we correctly takes into
/// account padding in structures, if any.
template <typename C> auto getBufferSizeInBytes(const C & c) requires (IsGoogleSparseHashTable<C>) { return c.size() * sizeof(typename C::value_type); }
/// bucket_count() - Returns table size, that includes empty and deleted
template <typename C> auto getBufferSizeInCells(const C & c) requires (IsGoogleSparseHashTable<C>) { return c.bucket_count(); }
template <typename C> auto resizeContainer(C & c, size_t size) requires (IsGoogleSparseHashTable<C>) { return c.resize(size); }
template <typename C> auto clearContainer(C & c) requires (IsGoogleSparseHashTable<C>) { return c.clear(); }
/// HashMap
template <typename T> auto getSetKeyFromCell(const T & value) requires (IsBuiltinSetCell<T>) { return value.getKey(); }
template <typename T> auto getKeyFromCell(const T & value) requires (IsBuiltinMapCell<T>) { return value->getKey(); }
template <typename T> auto getValueFromCell(const T & value) requires (IsBuiltinMapCell<T>) { return value->getMapped(); }
template <typename C> auto getBufferSizeInBytes(const C & c) requires (IsBuiltinHashTable<C>) { return c.getBufferSizeInBytes(); }
template <typename C> auto getBufferSizeInCells(const C & c) requires (IsBuiltinHashTable<C>) { return c.getBufferSizeInCells(); }
template <typename C> auto resizeContainer(C & c, size_t size) requires (IsBuiltinHashTable<C>) { return c.reserve(size); }
template <typename C> void clearContainer(C & c) requires (IsBuiltinHashTable<C>) { return c.clearAndShrink(); }
// NOLINTEND(*)
}
}

View File

@ -0,0 +1,262 @@
#pragma once
#include <Dictionaries/IDictionary.h>
#include <Common/HashTable/PackedHashMap.h>
#include <Common/HashTable/HashMap.h>
#include <Common/HashTable/HashSet.h>
#include <Core/Types_fwd.h>
#include <sparsehash/sparse_hash_map>
#include <sparsehash/sparse_hash_set>
#include <type_traits>
namespace DB
{
namespace HashedDictionaryImpl
{
/// Return true if the type is POD [1] for the purpose of layout (this is not
/// the same as STL traits has).
///
/// [1]: https://stackoverflow.com/questions/4178175/what-are-aggregates-and-pods-and-how-why-are-they-special/4178176#4178176
///
/// The behaviour had been change in clang-16, see this for more details:
/// - https://github.com/llvm/llvm-project/commit/a8b0c6fa28acced71db33e80bd0b51d00422035b
/// - https://github.com/llvm/llvm-project/commit/277123376ce08c98b07c154bf83e4092a5d4d3c6
/// - https://github.com/llvm/llvm-project/issues/62422
/// - https://github.com/llvm/llvm-project/issues/62353
/// - https://github.com/llvm/llvm-project/issues/62358
template <typename V>
constexpr bool isPodLayout()
{
if constexpr (std::is_same_v<V, UUID>)
return false;
if constexpr (std::is_same_v<V, DateTime64>)
return false;
if constexpr (std::is_same_v<V, Decimal32> || std::is_same_v<V, Decimal64> || std::is_same_v<V, Decimal128> || std::is_same_v<V, Decimal256>)
return false;
if constexpr (std::is_same_v<V, StringRef>)
return false;
if constexpr (std::is_same_v<V, IPv6> || std::is_same_v<V, IPv4>)
return false;
return true;
}
/// HashMap with packed structure is better than google::sparse_hash_map if the
/// <K, V> pair is small, for the sizeof(std::pair<K, V>) == 16, RSS for hash
/// table with 1e9 elements will be:
///
/// - google::sparse_hash_map : 26GiB
/// - HashMap : 35GiB
/// - PackedHashMap : 22GiB
/// - google::sparse_hash_map<packed_pair>: 17GiB
///
/// Also note here sizeof(std::pair<>) was used since google::sparse_hash_map
/// uses it to store <K, V>, yes we can modify google::sparse_hash_map to work
/// with packed analog of std::pair, but the allocator overhead is still
/// significant, because of tons of reallocations (and those cannot be solved
/// with reserve() due to some internals of google::sparse_hash_map) and poor
/// jemalloc support of such pattern, which results in 33% fragmentation (in
/// comparison with glibc).
///
/// Plus since google::sparse_hash_map cannot use packed structure, it will
/// have the same memory footprint for everything from UInt8 to UInt64 values
/// and so on.
///
/// Returns true hen google::sparse_hash_map should be used, otherwise
/// PackedHashMap should be used instead.
template <typename K, typename V>
constexpr bool useSparseHashForHashedDictionary()
{
if constexpr (!isPodLayout<K>())
return true;
if constexpr (!isPodLayout<V>())
return true;
/// NOTE: One should not use PackedPairNoInit<K, V> here since this will
/// create instantion of this type, and it could be illformed.
return sizeof(V) > 8;
}
/// Grower with custom fill limit/load factor (instead of default 50%).
///
/// Based on HashTableGrowerWithPrecalculation
template <size_t initial_size_degree = 8>
class alignas(64) HashTableGrowerWithPrecalculationAndMaxLoadFactor
{
UInt8 size_degree = initial_size_degree;
size_t precalculated_mask = (1ULL << initial_size_degree) - 1;
size_t precalculated_max_fill = 1ULL << (initial_size_degree - 1);
float max_load_factor = 0.5;
/// HashTableGrowerWithPrecalculation has 23, but to decrease memory usage
/// at least slightly 19 is used here. Also note, that for dictionaries it
/// is not that important since they are not that frequently loaded.
static constexpr size_t max_size_degree_quadratic = 19;
public:
static constexpr auto initial_count = 1ULL << initial_size_degree;
/// If collision resolution chains are contiguous, we can implement erase operation by moving the elements.
static constexpr auto performs_linear_probing_with_single_step = true;
HashTableGrowerWithPrecalculationAndMaxLoadFactor() = default;
explicit HashTableGrowerWithPrecalculationAndMaxLoadFactor(float max_load_factor_)
: max_load_factor(max_load_factor_)
{
increaseSizeDegree(0);
}
UInt8 sizeDegree() const { return size_degree; }
void increaseSizeDegree(UInt8 delta)
{
size_degree += delta;
precalculated_mask = (1ULL << size_degree) - 1;
precalculated_max_fill = static_cast<size_t>((1ULL << size_degree) * max_load_factor);
}
/// The size of the hash table in the cells.
size_t bufSize() const { return 1ULL << size_degree; }
/// From the hash value, get the cell number in the hash table.
size_t place(size_t x) const { return x & precalculated_mask; }
/// The next cell in the collision resolution chain.
size_t next(size_t pos) const { return (pos + 1) & precalculated_mask; }
/// Whether the hash table is sufficiently full. You need to increase the size of the hash table, or remove something unnecessary from it.
bool overflow(size_t elems) const { return elems > precalculated_max_fill; }
/// Increase the size of the hash table.
void increaseSize() { increaseSizeDegree(size_degree >= max_size_degree_quadratic ? 1 : 2); }
/// Set the buffer size by the number of elements in the hash table. Used when deserializing a hash table.
void set(size_t num_elems)
{
if (num_elems <= 1)
size_degree = initial_size_degree;
else if (initial_size_degree > static_cast<size_t>(log2(num_elems - 1)) + 2)
size_degree = initial_size_degree;
else
{
/// Slightly more optimal than HashTableGrowerWithPrecalculation
/// and takes into account max_load_factor.
size_degree = static_cast<size_t>(log2(num_elems - 1)) + 1;
if ((1ULL << size_degree) * max_load_factor < num_elems)
++size_degree;
}
increaseSizeDegree(0);
}
void setBufSize(size_t buf_size_)
{
size_degree = static_cast<size_t>(log2(buf_size_ - 1) + 1);
increaseSizeDegree(0);
}
};
static_assert(sizeof(HashTableGrowerWithPrecalculationAndMaxLoadFactor<>) == 64);
/// Above goes various specialisations for the hash table that will be used for
/// HASHED/SPARSE_HASHED dictionary, it could use one of the following depends
/// on the layout of the dictionary and types of key/value (for more info see
/// comments in this file):
/// - HashMap
/// - HashSet
/// - HashMapWithSavedHash
/// - HashSetWithSavedHash
/// - PackedHashMap
/// - google::sparse_hash_map
///
/// Map (dictionary with attributes)
///
/// Type of the hash table for the dictionary.
template <DictionaryKeyType dictionary_key_type, bool sparse, typename Key, typename Value>
struct HashedDictionaryMapType;
/// Default implementation using builtin HashMap (for HASHED layout).
template <DictionaryKeyType dictionary_key_type, typename Key, typename Value>
struct HashedDictionaryMapType<dictionary_key_type, /* sparse= */ false, Key, Value>
{
using Type = std::conditional_t<
dictionary_key_type == DictionaryKeyType::Simple,
HashMap<UInt64, Value, DefaultHash<UInt64>, HashTableGrowerWithPrecalculationAndMaxLoadFactor<>>,
HashMapWithSavedHash<StringRef, Value, DefaultHash<StringRef>, HashTableGrowerWithPrecalculationAndMaxLoadFactor<>>>;
};
/// Implementations for SPARSE_HASHED layout.
template <DictionaryKeyType dictionary_key_type, typename Key, typename Value, bool use_sparse_hash>
struct HashedDictionarySparseMapType;
/// Implementation based on google::sparse_hash_map for SPARSE_HASHED.
template <DictionaryKeyType dictionary_key_type, typename Key, typename Value>
struct HashedDictionarySparseMapType<dictionary_key_type, Key, Value, /* use_sparse_hash= */ true>
{
/// Here we use sparse_hash_map with DefaultHash<> for the following reasons:
///
/// - DefaultHash<> is used for HashMap
/// - DefaultHash<> (from HashTable/Hash.h> works better then std::hash<>
/// in case of sequential set of keys, but with random access to this set, i.e.
///
/// SELECT number FROM numbers(3000000) ORDER BY rand()
///
/// And even though std::hash<> works better in some other cases,
/// DefaultHash<> is preferred since the difference for this particular
/// case is significant, i.e. it can be 10x+.
using Type = std::conditional_t<
dictionary_key_type == DictionaryKeyType::Simple,
google::sparse_hash_map<UInt64, Value, DefaultHash<Key>>,
google::sparse_hash_map<StringRef, Value, DefaultHash<Key>>>;
};
/// Implementation based on PackedHashMap for SPARSE_HASHED.
template <DictionaryKeyType dictionary_key_type, typename Key, typename Value>
struct HashedDictionarySparseMapType<dictionary_key_type, Key, Value, /* use_sparse_hash= */ false>
{
using Type = std::conditional_t<
dictionary_key_type == DictionaryKeyType::Simple,
PackedHashMap<UInt64, Value, DefaultHash<UInt64>, HashTableGrowerWithPrecalculationAndMaxLoadFactor<>>,
PackedHashMap<StringRef, Value, DefaultHash<StringRef>, HashTableGrowerWithPrecalculationAndMaxLoadFactor<>>>;
};
template <DictionaryKeyType dictionary_key_type, typename Key, typename Value>
struct HashedDictionaryMapType<dictionary_key_type, /* sparse= */ true, Key, Value>
: public HashedDictionarySparseMapType<
dictionary_key_type, Key, Value,
/* use_sparse_hash= */ useSparseHashForHashedDictionary<Key, Value>()>
{};
///
/// Set (dictionary with attributes)
///
/// Type of the hash table for the dictionary.
template <DictionaryKeyType dictionary_key_type, bool sparse, typename Key>
struct HashedDictionarySetType;
/// Default implementation using builtin HashMap (for HASHED layout).
template <DictionaryKeyType dictionary_key_type, typename Key>
struct HashedDictionarySetType<dictionary_key_type, /* sparse= */ false, Key>
{
using Type = std::conditional_t<
dictionary_key_type == DictionaryKeyType::Simple,
HashSet<UInt64, DefaultHash<UInt64>, HashTableGrowerWithPrecalculationAndMaxLoadFactor<>>,
HashSetWithSavedHash<StringRef, DefaultHash<StringRef>, HashTableGrowerWithPrecalculationAndMaxLoadFactor<>>>;
};
/// Implementation for SPARSE_HASHED.
///
/// NOTE: There is no implementation based on google::sparse_hash_set since
/// PackedHashMap is more optimal anyway (see comments for
/// useSparseHashForHashedDictionary()).
template <DictionaryKeyType dictionary_key_type, typename Key>
struct HashedDictionarySetType<dictionary_key_type, /* sparse= */ true, Key>
{
using Type = std::conditional_t<
dictionary_key_type == DictionaryKeyType::Simple,
HashSet<UInt64, DefaultHash<UInt64>, HashTableGrowerWithPrecalculationAndMaxLoadFactor<>>,
HashSet<StringRef, DefaultHash<StringRef>, HashTableGrowerWithPrecalculationAndMaxLoadFactor<>>>;
};
}
}

View File

@ -156,11 +156,11 @@ void buildLayoutConfiguration(
const auto value_field = value_literal->value;
if (value_field.getType() != Field::Types::UInt64 && value_field.getType() != Field::Types::String)
if (value_field.getType() != Field::Types::UInt64 && value_field.getType() != Field::Types::Float64 && value_field.getType() != Field::Types::String)
{
throw DB::Exception(
ErrorCodes::BAD_ARGUMENTS,
"Dictionary layout parameter value must be an UInt64 or String, got '{}' instead",
"Dictionary layout parameter value must be an UInt64, Float64 or String, got '{}' instead",
value_field.getTypeName());
}

View File

@ -26,7 +26,7 @@ WriteBufferFromAzureBlobStorage::WriteBufferFromAzureBlobStorage(
size_t max_single_part_upload_size_,
size_t buf_size_,
const WriteSettings & write_settings_)
: BufferWithOwnMemory<WriteBuffer>(buf_size_, nullptr, 0)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, log(&Poco::Logger::get("WriteBufferFromAzureBlobStorage"))
, max_single_part_upload_size(max_single_part_upload_size_)
, blob_path(blob_path_)
@ -51,7 +51,7 @@ void WriteBufferFromAzureBlobStorage::execWithRetry(std::function<void()> func,
if (i == num_tries - 1)
throw;
LOG_DEBUG(log, "Write at attempt {} for blob `{}` failed: {}", i + 1, blob_path, e.Message);
LOG_DEBUG(log, "Write at attempt {} for blob `{}` failed: {} {}", i + 1, blob_path, e.what(), e.Message);
};
for (size_t i = 0; i < num_tries; ++i)

View File

@ -6,7 +6,7 @@
#include <memory>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteSettings.h>
#include <azure/storage/blobs.hpp>
@ -21,7 +21,7 @@ class Logger;
namespace DB
{
class WriteBufferFromAzureBlobStorage : public BufferWithOwnMemory<WriteBuffer>
class WriteBufferFromAzureBlobStorage : public WriteBufferFromFileBase
{
public:
using AzureClientPtr = std::shared_ptr<const Azure::Storage::Blobs::BlobContainerClient>;
@ -37,6 +37,9 @@ public:
void nextImpl() override;
std::string getFileName() const override { return blob_path; }
void sync() override { next(); }
private:
void finalizeImpl() override;
void execWithRetry(std::function<void()> func, size_t num_tries, size_t cost = 0);

View File

@ -1,11 +1,11 @@
#include "WriteIndirectBufferFromRemoteFS.h"
#include "WriteBufferWithFinalizeCallback.h"
namespace DB
{
WriteIndirectBufferFromRemoteFS::WriteIndirectBufferFromRemoteFS(
WriteBufferWithFinalizeCallback::WriteBufferWithFinalizeCallback(
std::unique_ptr<WriteBuffer> impl_,
CreateMetadataCallback && create_callback_,
FinalizeCallback && create_callback_,
const String & remote_path_)
: WriteBufferFromFileDecorator(std::move(impl_))
, create_metadata_callback(std::move(create_callback_))
@ -14,7 +14,7 @@ WriteIndirectBufferFromRemoteFS::WriteIndirectBufferFromRemoteFS(
}
WriteIndirectBufferFromRemoteFS::~WriteIndirectBufferFromRemoteFS()
WriteBufferWithFinalizeCallback::~WriteBufferWithFinalizeCallback()
{
try
{
@ -26,7 +26,7 @@ WriteIndirectBufferFromRemoteFS::~WriteIndirectBufferFromRemoteFS()
}
}
void WriteIndirectBufferFromRemoteFS::finalizeImpl()
void WriteBufferWithFinalizeCallback::finalizeImpl()
{
WriteBufferFromFileDecorator::finalizeImpl();
if (create_metadata_callback)

View File

@ -8,25 +8,25 @@
namespace DB
{
using CreateMetadataCallback = std::function<void(size_t bytes_count)>;
using FinalizeCallback = std::function<void(size_t bytes_count)>;
/// Stores data in S3/HDFS and adds the object path and object size to metadata file on local FS.
class WriteIndirectBufferFromRemoteFS final : public WriteBufferFromFileDecorator
class WriteBufferWithFinalizeCallback final : public WriteBufferFromFileDecorator
{
public:
WriteIndirectBufferFromRemoteFS(
WriteBufferWithFinalizeCallback(
std::unique_ptr<WriteBuffer> impl_,
CreateMetadataCallback && create_callback_,
FinalizeCallback && create_callback_,
const String & remote_path_);
~WriteIndirectBufferFromRemoteFS() override;
~WriteBufferWithFinalizeCallback() override;
String getFileName() const override { return remote_path; }
private:
void finalizeImpl() override;
CreateMetadataCallback create_metadata_callback;
FinalizeCallback create_metadata_callback;
String remote_path;
};

View File

@ -129,7 +129,6 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes>,
FinalizeCallback && finalize_callback,
size_t buf_size,
const WriteSettings & write_settings)
{
@ -138,14 +137,12 @@ std::unique_ptr<WriteBufferFromFileBase> AzureObjectStorage::writeObject( /// NO
LOG_TEST(log, "Writing file: {}", object.remote_path);
auto buffer = std::make_unique<WriteBufferFromAzureBlobStorage>(
return std::make_unique<WriteBufferFromAzureBlobStorage>(
client.get(),
object.remote_path,
settings.get()->max_single_part_upload_size,
buf_size,
patchSettings(write_settings));
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(buffer), std::move(finalize_callback), object.remote_path);
}
void AzureObjectStorage::findAllFiles(const std::string & path, RelativePathsWithSize & children, int max_keys) const
@ -282,7 +279,6 @@ void AzureObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguratio
{
auto new_settings = getAzureBlobStorageSettings(config, config_prefix, context);
settings.set(std::move(new_settings));
applyRemoteThrottlingSettings(context);
/// We don't update client
}

View File

@ -7,7 +7,6 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/MultiVersion.h>
@ -83,7 +82,6 @@ public:
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;

View File

@ -97,13 +97,12 @@ std::unique_ptr<WriteBufferFromFileBase> CachedObjectStorage::writeObject( /// N
const StoredObject & object,
WriteMode mode, // Cached doesn't support append, only rewrite
std::optional<ObjectAttributes> attributes,
FinalizeCallback && finalize_callback,
size_t buf_size,
const WriteSettings & write_settings)
{
/// Add cache relating settings to WriteSettings.
auto modified_write_settings = IObjectStorage::patchSettings(write_settings);
auto implementation_buffer = object_storage->writeObject(object, mode, attributes, std::move(finalize_callback), buf_size, modified_write_settings);
auto implementation_buffer = object_storage->writeObject(object, mode, attributes, buf_size, modified_write_settings);
bool cache_on_write = modified_write_settings.enable_filesystem_cache_on_write_operations
&& FileCacheFactory::instance().getByName(cache_config_name).settings.cache_on_write_operations

View File

@ -43,7 +43,6 @@ public:
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;

View File

@ -1,5 +1,6 @@
#include <Disks/ObjectStorages/DiskObjectStorageTransaction.h>
#include <Disks/ObjectStorages/DiskObjectStorage.h>
#include <Disks/IO/WriteBufferWithFinalizeCallback.h>
#include <Common/checkStackSize.h>
#include <ranges>
#include <Common/logger_useful.h>
@ -658,14 +659,16 @@ std::unique_ptr<WriteBufferFromFileBase> DiskObjectStorageTransaction::writeFile
operations_to_execute.emplace_back(std::move(write_operation));
/// We always use mode Rewrite because we simulate append using metadata and different files
return object_storage.writeObject(
auto impl = object_storage.writeObject(
object,
/// We always use mode Rewrite because we simulate append using metadata and different files
WriteMode::Rewrite,
object_attributes,
std::move(create_metadata_callback),
buf_size,
settings);
return std::make_unique<WriteBufferWithFinalizeCallback>(
std::move(impl), std::move(create_metadata_callback), object.remote_path);
}

View File

@ -9,7 +9,6 @@
#include <Storages/HDFS/ReadBufferFromHDFS.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Common/getRandomASCIIString.h>
@ -83,7 +82,6 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes,
FinalizeCallback && finalize_callback,
size_t buf_size,
const WriteSettings & write_settings)
{
@ -93,11 +91,9 @@ std::unique_ptr<WriteBufferFromFileBase> HDFSObjectStorage::writeObject( /// NOL
"HDFS API doesn't support custom attributes/metadata for stored objects");
/// Single O_WRONLY in libhdfs adds O_TRUNC
auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(
return std::make_unique<WriteBufferFromHDFS>(
object.remote_path, config, settings->replication, patchSettings(write_settings), buf_size,
mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND);
return std::make_unique<WriteIndirectBufferFromRemoteFS>(std::move(hdfs_buffer), std::move(finalize_callback), object.remote_path);
}
@ -156,11 +152,6 @@ void HDFSObjectStorage::copyObject( /// NOLINT
}
void HDFSObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration &, const std::string &, ContextPtr context)
{
applyRemoteThrottlingSettings(context);
}
std::unique_ptr<IObjectStorage> HDFSObjectStorage::cloneObjectStorage(const std::string &, const Poco::Util::AbstractConfiguration &, const std::string &, ContextPtr)
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "HDFS object storage doesn't support cloning");

View File

@ -81,7 +81,6 @@ public:
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
@ -105,11 +104,6 @@ public:
void startup() override;
void applyNewSettings(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
ContextPtr context) override;
String getObjectsNamespace() const override { return ""; }
std::unique_ptr<IObjectStorage> cloneObjectStorage(

View File

@ -55,27 +55,16 @@ const std::string & IObjectStorage::getCacheName() const
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getCacheName() is not implemented for object storage");
}
void IObjectStorage::applyRemoteThrottlingSettings(ContextPtr context)
{
std::unique_lock lock{throttlers_mutex};
remote_read_throttler = context->getRemoteReadThrottler();
remote_write_throttler = context->getRemoteWriteThrottler();
}
ReadSettings IObjectStorage::patchSettings(const ReadSettings & read_settings) const
{
std::unique_lock lock{throttlers_mutex};
ReadSettings settings{read_settings};
settings.remote_throttler = remote_read_throttler;
settings.for_object_storage = true;
return settings;
}
WriteSettings IObjectStorage::patchSettings(const WriteSettings & write_settings) const
{
std::unique_lock lock{throttlers_mutex};
WriteSettings settings{write_settings};
settings.remote_throttler = remote_write_throttler;
settings.for_object_storage = true;
return settings;
}

View File

@ -48,8 +48,6 @@ struct ObjectMetadata
std::optional<ObjectAttributes> attributes;
};
using FinalizeCallback = std::function<void(size_t bytes_count)>;
/// Base class for all object storages which implement some subset of ordinary filesystem operations.
///
/// Examples of object storages are S3, Azure Blob Storage, HDFS.
@ -119,7 +117,6 @@ public:
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) = 0;
@ -165,9 +162,10 @@ public:
/// Apply new settings, in most cases reiniatilize client and some other staff
virtual void applyNewSettings(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
ContextPtr context) = 0;
const Poco::Util::AbstractConfiguration &,
const std::string & /*config_prefix*/,
ContextPtr)
{}
/// Sometimes object storages have something similar to chroot or namespace, for example
/// buckets in S3. If object storage doesn't have any namepaces return empty string.
@ -205,10 +203,6 @@ public:
virtual WriteSettings patchSettings(const WriteSettings & write_settings) const;
protected:
/// Should be called from implementation of applyNewSettings()
void applyRemoteThrottlingSettings(ContextPtr context);
private:
mutable std::mutex throttlers_mutex;
ThrottlerPtr remote_read_throttler;

View File

@ -7,7 +7,6 @@
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/createReadBufferFromFileBase.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <IO/SeekAvoidingReadBuffer.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/copyData.h>
@ -124,7 +123,6 @@ std::unique_ptr<WriteBufferFromFileBase> LocalObjectStorage::writeObject( /// NO
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> /* attributes */,
FinalizeCallback && finalize_callback,
size_t buf_size,
const WriteSettings & /* write_settings */)
{
@ -132,9 +130,7 @@ std::unique_ptr<WriteBufferFromFileBase> LocalObjectStorage::writeObject( /// NO
throw Exception(ErrorCodes::BAD_ARGUMENTS, "LocalObjectStorage doesn't support append to files");
LOG_TEST(log, "Write object: {}", object.remote_path);
auto impl = std::make_unique<WriteBufferFromFile>(object.remote_path, buf_size);
return std::make_unique<WriteIndirectBufferFromRemoteFS>(
std::move(impl), std::move(finalize_callback), object.remote_path);
return std::make_unique<WriteBufferFromFile>(object.remote_path, buf_size);
}
void LocalObjectStorage::removeObject(const StoredObject & object)

View File

@ -41,7 +41,6 @@ public:
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;

View File

@ -8,7 +8,6 @@
#include <Disks/ObjectStorages/DiskObjectStorageCommon.h>
#include <Disks/IO/AsynchronousReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <IO/WriteBufferFromS3.h>
#include <IO/ReadBufferFromS3.h>
@ -160,8 +159,7 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
const StoredObject & object,
WriteMode mode, // S3 doesn't support append, only rewrite
std::optional<ObjectAttributes> attributes,
FinalizeCallback && finalize_callback,
size_t buf_size [[maybe_unused]],
size_t buf_size,
const WriteSettings & write_settings)
{
WriteSettings disk_write_settings = IObjectStorage::patchSettings(write_settings);
@ -174,17 +172,15 @@ std::unique_ptr<WriteBufferFromFileBase> S3ObjectStorage::writeObject( /// NOLIN
if (write_settings.s3_allow_parallel_part_upload)
scheduler = threadPoolCallbackRunner<void>(getThreadPoolWriter(), "VFSWrite");
auto s3_buffer = std::make_unique<WriteBufferFromS3>(
return std::make_unique<WriteBufferFromS3>(
client.get(),
bucket,
object.remote_path,
buf_size,
settings_ptr->request_settings,
attributes,
std::move(scheduler),
disk_write_settings);
return std::make_unique<WriteIndirectBufferFromRemoteFS>(
std::move(s3_buffer), std::move(finalize_callback), object.remote_path);
}
void S3ObjectStorage::findAllFiles(const std::string & path, RelativePathsWithSize & children, int max_keys) const
@ -443,7 +439,6 @@ void S3ObjectStorage::applyNewSettings(const Poco::Util::AbstractConfiguration &
auto new_client = getClient(config, config_prefix, context, *new_s3_settings);
s3_settings.set(std::move(new_s3_settings));
client.set(std::move(new_client));
applyRemoteThrottlingSettings(context);
}
std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(

View File

@ -97,7 +97,6 @@ public:
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;

View File

@ -9,7 +9,6 @@
#include <IO/WriteHelpers.h>
#include <Disks/IO/ReadIndirectBufferFromRemoteFS.h>
#include <Disks/IO/WriteIndirectBufferFromRemoteFS.h>
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/ReadBufferFromWebServer.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
@ -211,7 +210,6 @@ std::unique_ptr<WriteBufferFromFileBase> WebObjectStorage::writeObject( /// NOLI
const StoredObject & /* object */,
WriteMode /* mode */,
std::optional<ObjectAttributes> /* attributes */,
FinalizeCallback && /* finalize_callback */,
size_t /* buf_size */,
const WriteSettings & /* write_settings */)
{

View File

@ -51,7 +51,6 @@ public:
const StoredObject & object,
WriteMode mode,
std::optional<ObjectAttributes> attributes = {},
FinalizeCallback && finalize_callback = {},
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;

View File

@ -122,6 +122,7 @@ FormatSettings getFormatSettings(ContextPtr context, const Settings & settings)
format_settings.parquet.output_fixed_string_as_fixed_byte_array = settings.output_format_parquet_fixed_string_as_fixed_byte_array;
format_settings.parquet.max_block_size = settings.input_format_parquet_max_block_size;
format_settings.parquet.output_compression_method = settings.output_format_parquet_compression_method;
format_settings.parquet.output_compliant_nested_types = settings.output_format_parquet_compliant_nested_types;
format_settings.pretty.charset = settings.output_format_pretty_grid_charset.toString() == "ASCII" ? FormatSettings::Pretty::Charset::ASCII : FormatSettings::Pretty::Charset::UTF8;
format_settings.pretty.color = settings.output_format_pretty_color;
format_settings.pretty.max_column_pad_width = settings.output_format_pretty_max_column_pad_width;

View File

@ -220,6 +220,7 @@ struct FormatSettings
UInt64 max_block_size = 8192;
ParquetVersion output_version;
ParquetCompression output_compression_method = ParquetCompression::SNAPPY;
bool output_compliant_nested_types = true;
} parquet;
struct Pretty

View File

@ -90,7 +90,7 @@ public:
return std::make_shared<DataTypeNumber<ResultType>>();
}
template <typename T, typename U>
template <typename ResultType, typename T, typename U>
static ResultType apply(
const T * scores,
const U * labels,

View File

@ -0,0 +1,81 @@
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/getLeastSupertype.h>
#include <Core/Types_fwd.h>
#include <DataTypes/Serializations/ISerialization.h>
#include <Functions/castTypeToEither.h>
#include <Functions/array/arrayScalarProduct.h>
#include <base/types.h>
#include <Functions/FunctionBinaryArithmetic.h>
namespace DB
{
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
struct NameArrayDotProduct
{
static constexpr auto name = "arrayDotProduct";
};
class ArrayDotProductImpl
{
public:
static DataTypePtr getReturnType(const DataTypePtr & left, const DataTypePtr & right)
{
using Types = TypeList<DataTypeFloat32, DataTypeFloat64,
DataTypeUInt8, DataTypeUInt16, DataTypeUInt32, DataTypeUInt64,
DataTypeInt8, DataTypeInt16, DataTypeInt32, DataTypeInt64>;
DataTypePtr result_type;
bool valid = castTypeToEither(Types{}, left.get(), [&](const auto & left_)
{
return castTypeToEither(Types{}, right.get(), [&](const auto & right_)
{
using LeftDataType = typename std::decay_t<decltype(left_)>::FieldType;
using RightDataType = typename std::decay_t<decltype(right_)>::FieldType;
using ResultType = typename NumberTraits::ResultOfAdditionMultiplication<LeftDataType, RightDataType>::Type;
if (std::is_same_v<LeftDataType, Float32> && std::is_same_v<RightDataType, Float32>)
result_type = std::make_shared<DataTypeFloat32>();
else
result_type = std::make_shared<DataTypeFromFieldType<ResultType>>();
return true;
});
});
if (!valid)
throw Exception(
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Arguments of function {} "
"only support: UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64, Float32, Float64.",
std::string(NameArrayDotProduct::name));
return result_type;
}
template <typename ResultType, typename T, typename U>
static inline NO_SANITIZE_UNDEFINED ResultType apply(
const T * left,
const U * right,
size_t size)
{
ResultType result = 0;
for (size_t i = 0; i < size; ++i)
result += static_cast<ResultType>(left[i]) * static_cast<ResultType>(right[i]);
return result;
}
};
using FunctionArrayDotProduct = FunctionArrayScalarProduct<ArrayDotProductImpl, NameArrayDotProduct>;
REGISTER_FUNCTION(ArrayDotProduct)
{
factory.registerFunction<FunctionArrayDotProduct>();
}
// These functions are used by TupleOrArrayFunction in Function/vectorFunctions.cpp
FunctionPtr createFunctionArrayDotProduct(ContextPtr context_) { return FunctionArrayDotProduct::create(context_); }
}

View File

@ -6,6 +6,7 @@
#include <Functions/FunctionHelpers.h>
#include <Functions/IFunction.h>
#include <Interpreters/Context_fwd.h>
#include <Core/TypeId.h>
namespace DB
@ -18,6 +19,7 @@ namespace ErrorCodes
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
@ -29,29 +31,28 @@ public:
static FunctionPtr create(ContextPtr) { return std::make_shared<FunctionArrayScalarProduct>(); }
private:
using ResultColumnType = ColumnVector<typename Method::ResultType>;
template <typename T>
template <typename ResultType, typename T>
ColumnPtr executeNumber(const ColumnsWithTypeAndName & arguments) const
{
ColumnPtr res;
if ( (res = executeNumberNumber<T, UInt8>(arguments))
|| (res = executeNumberNumber<T, UInt16>(arguments))
|| (res = executeNumberNumber<T, UInt32>(arguments))
|| (res = executeNumberNumber<T, UInt64>(arguments))
|| (res = executeNumberNumber<T, Int8>(arguments))
|| (res = executeNumberNumber<T, Int16>(arguments))
|| (res = executeNumberNumber<T, Int32>(arguments))
|| (res = executeNumberNumber<T, Int64>(arguments))
|| (res = executeNumberNumber<T, Float32>(arguments))
|| (res = executeNumberNumber<T, Float64>(arguments)))
if ( (res = executeNumberNumber<ResultType, T, UInt8>(arguments))
|| (res = executeNumberNumber<ResultType, T, UInt16>(arguments))
|| (res = executeNumberNumber<ResultType, T, UInt32>(arguments))
|| (res = executeNumberNumber<ResultType, T, UInt64>(arguments))
|| (res = executeNumberNumber<ResultType, T, Int8>(arguments))
|| (res = executeNumberNumber<ResultType, T, Int16>(arguments))
|| (res = executeNumberNumber<ResultType, T, Int32>(arguments))
|| (res = executeNumberNumber<ResultType, T, Int64>(arguments))
|| (res = executeNumberNumber<ResultType, T, Float32>(arguments))
|| (res = executeNumberNumber<ResultType, T, Float64>(arguments)))
return res;
return nullptr;
}
template <typename T, typename U>
template <typename ResultType, typename T, typename U>
ColumnPtr executeNumberNumber(const ColumnsWithTypeAndName & arguments) const
{
ColumnPtr col1 = arguments[0].column->convertToFullColumnIfConst();
@ -72,7 +73,7 @@ private:
if (!col_nested1 || !col_nested2)
return nullptr;
auto col_res = ResultColumnType::create();
auto col_res = ColumnVector<ResultType>::create();
vector(
col_nested1->getData(),
@ -83,12 +84,12 @@ private:
return col_res;
}
template <typename T, typename U>
template <typename ResultType, typename T, typename U>
static NO_INLINE void vector(
const PaddedPODArray<T> & data1,
const PaddedPODArray<U> & data2,
const ColumnArray::Offsets & offsets,
PaddedPODArray<typename Method::ResultType> & result)
PaddedPODArray<ResultType> & result)
{
size_t size = offsets.size();
result.resize(size);
@ -97,7 +98,7 @@ private:
for (size_t i = 0; i < size; ++i)
{
size_t array_size = offsets[i] - current_offset;
result[i] = Method::apply(&data1[current_offset], &data2[current_offset], array_size);
result[i] = Method::template apply<ResultType, T, U>(&data1[current_offset], &data2[current_offset], array_size);
current_offset = offsets[i];
}
}
@ -130,24 +131,51 @@ public:
return Method::getReturnType(nested_types[0], nested_types[1]);
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t /* input_rows_count */) const override
template <typename ResultType>
ColumnPtr executeWithResultType(const ColumnsWithTypeAndName & arguments) const
{
ColumnPtr res;
if (!((res = executeNumber<UInt8>(arguments))
|| (res = executeNumber<UInt16>(arguments))
|| (res = executeNumber<UInt32>(arguments))
|| (res = executeNumber<UInt64>(arguments))
|| (res = executeNumber<Int8>(arguments))
|| (res = executeNumber<Int16>(arguments))
|| (res = executeNumber<Int32>(arguments))
|| (res = executeNumber<Int64>(arguments))
|| (res = executeNumber<Float32>(arguments))
|| (res = executeNumber<Float64>(arguments))))
if (!((res = executeNumber<ResultType, UInt8>(arguments))
|| (res = executeNumber<ResultType, UInt16>(arguments))
|| (res = executeNumber<ResultType, UInt32>(arguments))
|| (res = executeNumber<ResultType, UInt64>(arguments))
|| (res = executeNumber<ResultType, Int8>(arguments))
|| (res = executeNumber<ResultType, Int16>(arguments))
|| (res = executeNumber<ResultType, Int32>(arguments))
|| (res = executeNumber<ResultType, Int64>(arguments))
|| (res = executeNumber<ResultType, Float32>(arguments))
|| (res = executeNumber<ResultType, Float64>(arguments))))
throw Exception(ErrorCodes::ILLEGAL_COLUMN,
"Illegal column {} of first argument of function {}", arguments[0].column->getName(), getName());
return res;
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t /* input_rows_count */) const override
{
switch (result_type->getTypeId())
{
#define SUPPORTED_TYPE(type) \
case TypeIndex::type: \
return executeWithResultType<type>(arguments); \
break;
SUPPORTED_TYPE(UInt8)
SUPPORTED_TYPE(UInt16)
SUPPORTED_TYPE(UInt32)
SUPPORTED_TYPE(UInt64)
SUPPORTED_TYPE(Int8)
SUPPORTED_TYPE(Int16)
SUPPORTED_TYPE(Int32)
SUPPORTED_TYPE(Int64)
SUPPORTED_TYPE(Float32)
SUPPORTED_TYPE(Float64)
#undef SUPPORTED_TYPE
default:
throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected result type {}", result_type->getName());
}
}
};
}

View File

@ -38,7 +38,7 @@ public:
return SearchSymbols {std::string{needles.data(), needles.size()}};
}
SearchSymbols getReadNeedles(const Configuration & extractor_configuration)
SearchSymbols getReadKeyNeedles(const Configuration & extractor_configuration)
{
const auto & [key_value_delimiter, quoting_character, pair_delimiters]
= extractor_configuration;
@ -57,6 +57,26 @@ public:
return SearchSymbols {std::string{needles.data(), needles.size()}};
}
SearchSymbols getReadValueNeedles(const Configuration & extractor_configuration)
{
const auto & [key_value_delimiter, quoting_character, pair_delimiters]
= extractor_configuration;
std::vector<char> needles;
needles.push_back(quoting_character);
std::copy(pair_delimiters.begin(), pair_delimiters.end(), std::back_inserter(needles));
if constexpr (WITH_ESCAPING)
{
needles.push_back('\\');
}
return SearchSymbols {std::string{needles.data(), needles.size()}};
}
SearchSymbols getReadQuotedNeedles(const Configuration & extractor_configuration)
{
const auto quoting_character = extractor_configuration.quoting_character;

View File

@ -41,7 +41,8 @@ public:
NeedleFactory<WITH_ESCAPING> needle_factory;
wait_needles = needle_factory.getWaitNeedles(configuration);
read_needles = needle_factory.getReadNeedles(configuration);
read_key_needles = needle_factory.getReadKeyNeedles(configuration);
read_value_needles = needle_factory.getReadValueNeedles(configuration);
read_quoted_needles = needle_factory.getReadQuotedNeedles(configuration);
}
@ -77,7 +78,7 @@ public:
size_t pos = 0;
while (const auto * p = find_first_symbols_or_null({file.begin() + pos, file.end()}, read_needles))
while (const auto * p = find_first_symbols_or_null({file.begin() + pos, file.end()}, read_key_needles))
{
auto character_position = p - file.begin();
size_t next_pos = character_position + 1u;
@ -191,10 +192,6 @@ public:
{
return {pos + 1u, State::READING_QUOTED_VALUE};
}
else if (isKeyValueDelimiter(current_character))
{
return {pos, State::WAITING_KEY};
}
if constexpr (WITH_ESCAPING)
{
@ -218,7 +215,7 @@ public:
size_t pos = 0;
while (const auto * p = find_first_symbols_or_null({file.begin() + pos, file.end()}, read_needles))
while (const auto * p = find_first_symbols_or_null({file.begin() + pos, file.end()}, read_value_needles))
{
const size_t character_position = p - file.begin();
size_t next_pos = character_position + 1u;
@ -237,10 +234,6 @@ public:
}
}
}
else if (isKeyValueDelimiter(*p))
{
return {next_pos, State::WAITING_KEY};
}
else if (isPairDelimiter(*p))
{
value.append(file.begin() + pos, file.begin() + character_position);
@ -300,7 +293,8 @@ public:
private:
SearchSymbols wait_needles;
SearchSymbols read_needles;
SearchSymbols read_key_needles;
SearchSymbols read_value_needles;
SearchSymbols read_quoted_needles;
/*

View File

@ -1429,6 +1429,8 @@ private:
FunctionPtr array_function;
};
extern FunctionPtr createFunctionArrayDotProduct(ContextPtr context_);
extern FunctionPtr createFunctionArrayL1Norm(ContextPtr context_);
extern FunctionPtr createFunctionArrayL2Norm(ContextPtr context_);
extern FunctionPtr createFunctionArrayL2SquaredNorm(ContextPtr context_);
@ -1442,6 +1444,14 @@ extern FunctionPtr createFunctionArrayLpDistance(ContextPtr context_);
extern FunctionPtr createFunctionArrayLinfDistance(ContextPtr context_);
extern FunctionPtr createFunctionArrayCosineDistance(ContextPtr context_);
struct DotProduct
{
static constexpr auto name = "dotProduct";
static constexpr auto CreateTupleFunction = FunctionDotProduct::create;
static constexpr auto CreateArrayFunction = createFunctionArrayDotProduct;
};
struct L1NormTraits
{
static constexpr auto name = "L1Norm";
@ -1530,6 +1540,8 @@ struct CosineDistanceTraits
static constexpr auto CreateArrayFunction = createFunctionArrayCosineDistance;
};
using TupleOrArrayFunctionDotProduct = TupleOrArrayFunction<DotProduct>;
using TupleOrArrayFunctionL1Norm = TupleOrArrayFunction<L1NormTraits>;
using TupleOrArrayFunctionL2Norm = TupleOrArrayFunction<L2NormTraits>;
using TupleOrArrayFunctionL2SquaredNorm = TupleOrArrayFunction<L2SquaredNormTraits>;
@ -1615,8 +1627,8 @@ If the types of the first interval (or the interval in the tuple) and the second
factory.registerFunction<FunctionTupleMultiplyByNumber>();
factory.registerFunction<FunctionTupleDivideByNumber>();
factory.registerFunction<FunctionDotProduct>();
factory.registerAlias("scalarProduct", FunctionDotProduct::name, FunctionFactory::CaseInsensitive);
factory.registerFunction<TupleOrArrayFunctionDotProduct>();
factory.registerAlias("scalarProduct", TupleOrArrayFunctionDotProduct::name, FunctionFactory::CaseInsensitive);
factory.registerFunction<TupleOrArrayFunctionL1Norm>();
factory.registerFunction<TupleOrArrayFunctionL2Norm>();

View File

@ -98,9 +98,9 @@ DynamicResourceManager::State::Resource::~Resource()
if (attached_to != nullptr)
{
ISchedulerNode * root = nodes.find("/")->second.ptr.get();
attached_to->event_queue->enqueue([scheduler = attached_to, root]
attached_to->event_queue->enqueue([my_scheduler = attached_to, root]
{
scheduler->removeChild(root);
my_scheduler->removeChild(root);
});
}
}

View File

@ -133,11 +133,12 @@ ProviderType deduceProviderType(const std::string & url)
Client::Client(
size_t max_redirects_,
ServerSideEncryptionKMSConfig sse_kms_config_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider_,
const Aws::Client::ClientConfiguration & client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing)
: Aws::S3::S3Client(credentials_provider, client_configuration, std::move(sign_payloads), use_virtual_addressing)
: Aws::S3::S3Client(credentials_provider_, client_configuration, std::move(sign_payloads), use_virtual_addressing)
, credentials_provider(credentials_provider_)
, max_redirects(max_redirects_)
, sse_kms_config(std::move(sse_kms_config_))
, log(&Poco::Logger::get("S3Client"))
@ -177,6 +178,7 @@ Client::Client(
Client::Client(const Client & other)
: Aws::S3::S3Client(other)
, initial_endpoint(other.initial_endpoint)
, credentials_provider(other.credentials_provider)
, explicit_region(other.explicit_region)
, detect_region(other.detect_region)
, provider_type(other.provider_type)
@ -188,6 +190,11 @@ Client::Client(const Client & other)
ClientCacheRegistry::instance().registerClient(cache);
}
Aws::Auth::AWSCredentials Client::getCredentials() const
{
return credentials_provider->GetAWSCredentials();
}
bool Client::checkIfWrongRegionDefined(const std::string & bucket, const Aws::S3::S3Error & error, std::string & region) const
{
if (detect_region)
@ -711,7 +718,8 @@ std::unique_ptr<S3::Client> ClientFactory::create( // NOLINT
const String & server_side_encryption_customer_key_base64,
ServerSideEncryptionKMSConfig sse_kms_config,
HTTPHeaderEntries headers,
CredentialsConfiguration credentials_configuration)
CredentialsConfiguration credentials_configuration,
const String & session_token)
{
PocoHTTPClientConfiguration client_configuration = cfg_;
client_configuration.updateSchemeAndRegion();
@ -735,7 +743,7 @@ std::unique_ptr<S3::Client> ClientFactory::create( // NOLINT
// These will be added after request signing
client_configuration.extra_headers = std::move(headers);
Aws::Auth::AWSCredentials credentials(access_key_id, secret_access_key);
Aws::Auth::AWSCredentials credentials(access_key_id, secret_access_key, session_token);
auto credentials_provider = std::make_shared<S3CredentialsProviderChain>(
client_configuration,
std::move(credentials),

View File

@ -138,6 +138,9 @@ public:
/// Returns the initial endpoint.
const String & getInitialEndpoint() const { return initial_endpoint; }
const String & getRegion() const { return explicit_region; }
Aws::Auth::AWSCredentials getCredentials() const;
/// Decorator for RetryStrategy needed for this client to work correctly.
/// We want to manually handle permanent moves (status code 301) because:
@ -207,7 +210,7 @@ private:
Client(size_t max_redirects_,
ServerSideEncryptionKMSConfig sse_kms_config_,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider>& credentials_provider,
const std::shared_ptr<Aws::Auth::AWSCredentialsProvider> & credentials_provider_,
const Aws::Client::ClientConfiguration& client_configuration,
Aws::Client::AWSAuthV4Signer::PayloadSigningPolicy sign_payloads,
bool use_virtual_addressing);
@ -247,6 +250,7 @@ private:
void insertRegionOverride(const std::string & bucket, const std::string & region) const;
String initial_endpoint;
std::shared_ptr<Aws::Auth::AWSCredentialsProvider> credentials_provider;
std::string explicit_region;
mutable bool detect_region = true;
@ -282,7 +286,8 @@ public:
const String & server_side_encryption_customer_key_base64,
ServerSideEncryptionKMSConfig sse_kms_config,
HTTPHeaderEntries headers,
CredentialsConfiguration credentials_configuration);
CredentialsConfiguration credentials_configuration,
const String & session_token = "");
PocoHTTPClientConfiguration createClientConfiguration(
const String & force_region,

View File

@ -92,6 +92,7 @@ void doWriteRequest(std::shared_ptr<const DB::S3::Client> client, const DB::S3::
client,
uri.bucket,
uri.key,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings
);

View File

@ -31,13 +31,15 @@ TimeoutSetter::~TimeoutSetter()
{
try
{
bool connected = socket.impl()->initialized();
if (!connected)
return;
socket.setSendTimeout(old_send_timeout);
socket.setReceiveTimeout(old_receive_timeout);
}
catch (...)
{
/// Sometimes caught on Mac OS X. This message can be safely ignored.
/// If you are developer using Mac, please debug this error message by yourself.
tryLogCurrentException("Client", "TimeoutSetter: Can't reset timeouts");
}
}

View File

@ -79,11 +79,13 @@ WriteBufferFromS3::WriteBufferFromS3(
std::shared_ptr<const S3::Client> client_ptr_,
const String & bucket_,
const String & key_,
size_t buf_size_,
const S3Settings::RequestSettings & request_settings_,
std::optional<std::map<String, String>> object_metadata_,
ThreadPoolCallbackRunner<void> schedule_,
const WriteSettings & write_settings_)
: bucket(bucket_)
: WriteBufferFromFileBase(buf_size_, nullptr, 0)
, bucket(bucket_)
, key(key_)
, request_settings(request_settings_)
, upload_settings(request_settings.getUploadSettings())

View File

@ -5,7 +5,7 @@
#if USE_AWS_S3
#include <base/types.h>
#include <IO/BufferWithOwnMemory.h>
#include <IO/WriteBufferFromFileBase.h>
#include <IO/WriteBuffer.h>
#include <IO/WriteSettings.h>
#include <Storages/StorageS3Settings.h>
@ -24,13 +24,14 @@ namespace DB
* Data is divided on chunks with size greater than 'minimum_upload_part_size'. Last chunk can be less than this threshold.
* Each chunk is written as a part to S3.
*/
class WriteBufferFromS3 final : public BufferWithOwnMemory<WriteBuffer>
class WriteBufferFromS3 final : public WriteBufferFromFileBase
{
public:
WriteBufferFromS3(
std::shared_ptr<const S3::Client> client_ptr_,
const String & bucket_,
const String & key_,
size_t buf_size_,
const S3Settings::RequestSettings & request_settings_,
std::optional<std::map<String, String>> object_metadata_ = std::nullopt,
ThreadPoolCallbackRunner<void> schedule_ = {},
@ -39,8 +40,9 @@ public:
~WriteBufferFromS3() override;
void nextImpl() override;
void preFinalize() override;
std::string getFileName() const override { return key; }
void sync() override { next(); }
public:
class IBufferAllocationPolicy
{
public:

View File

@ -529,6 +529,7 @@ public:
client,
bucket,
file_name,
DBMS_DEFAULT_BUFFER_SIZE,
request_settings,
std::nullopt,
getAsyncPolicy().getScheduler());

View File

@ -1216,11 +1216,22 @@ void ActionsMatcher::visit(const ASTFunction & node, const ASTPtr & ast, Data &
else if (data.is_create_parameterized_view && query_parameter)
{
const auto data_type = DataTypeFactory::instance().get(query_parameter->type);
ColumnWithTypeAndName column(data_type,query_parameter->getColumnName());
/// Use getUniqueName() to allow multiple use of query parameter in the query:
///
/// CREATE VIEW view AS
/// SELECT *
/// FROM system.one
/// WHERE dummy = {k1:Int}+1 OR dummy = {k1:Int}+2
/// ^^ ^^
///
/// NOTE: query in the VIEW will not be modified this is needed
/// only during analysis for CREATE VIEW to avoid duplicated
/// column names.
ColumnWithTypeAndName column(data_type, data.getUniqueName("__" + query_parameter->getColumnName()));
data.addColumn(column);
argument_types.push_back(data_type);
argument_names.push_back(query_parameter->name);
argument_names.push_back(column.name);
}
else
{

View File

@ -487,8 +487,8 @@ Cluster::Cluster(const Poco::Util::AbstractConfiguration & config,
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unknown element in config: {}", replica_key);
}
addShard(settings, std::move(replica_addresses), false, current_shard_num,
std::move(insert_paths), /* treat_local_as_remote */ weight, internal_replication);
addShard(settings, std::move(replica_addresses), /* treat_local_as_remote = */ false, current_shard_num,
std::move(insert_paths), weight, internal_replication);
}
else
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG, "Unknown element in config: {}", key);

Some files were not shown because too many files have changed in this diff Show More