Merge branch 'master' into fix_database_replica_recovery

This commit is contained in:
alesapin 2023-05-09 13:06:45 +02:00 committed by GitHub
commit 277b42451e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
192 changed files with 1743 additions and 1838 deletions

View File

@ -1341,6 +1341,40 @@ jobs:
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestReleaseAnalyzer:
needs: [BuilderDebRelease]
runs-on: [self-hosted, func-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/stateless_analyzer
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Stateless tests (release, analyzer)
REPO_COPY=${{runner.temp}}/stateless_analyzer/ClickHouse
KILL_TIMEOUT=10800
EOF
- name: Download json reports
uses: actions/download-artifact@v3
with:
path: ${{ env.REPORTS_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
- name: Functional test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 functional_test_check.py "$CHECK_NAME" "$KILL_TIMEOUT"
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"
FunctionalStatelessTestAarch64:
needs: [BuilderDebAarch64]
runs-on: [self-hosted, func-tester-aarch64]

3
.gitmodules vendored
View File

@ -253,6 +253,9 @@
[submodule "contrib/qpl"]
path = contrib/qpl
url = https://github.com/intel/qpl
[submodule "contrib/idxd-config"]
path = contrib/idxd-config
url = https://github.com/intel/idxd-config
[submodule "contrib/wyhash"]
path = contrib/wyhash
url = https://github.com/wangyi-fudan/wyhash

View File

@ -314,7 +314,14 @@ struct integer<Bits, Signed>::_impl
const T alpha = t / static_cast<T>(max_int);
if (alpha <= static_cast<T>(max_int))
/** Here we have to use strict comparison.
* The max_int is 2^64 - 1.
* When casted to floating point type, it will be rounded to the closest representable number,
* which is 2^64.
* But 2^64 is not representable in uint64_t,
* so the maximum representable number will be strictly less.
*/
if (alpha < static_cast<T>(max_int))
self = static_cast<uint64_t>(alpha);
else // max(double) / 2^64 will surely contain less than 52 precision bits, so speed up computations.
set_multiplier<double>(self, static_cast<double>(alpha));

View File

@ -9,27 +9,19 @@ if (CMAKE_CXX_COMPILER_LAUNCHER MATCHES "ccache" OR CMAKE_C_COMPILER_LAUNCHER MA
return()
endif()
set(ENABLE_CCACHE "default" CACHE STRING "Deprecated, use COMPILER_CACHE=(auto|ccache|sccache|disabled)")
if (NOT ENABLE_CCACHE STREQUAL "default")
message(WARNING "The -DENABLE_CCACHE is deprecated in favor of -DCOMPILER_CACHE")
endif()
set(COMPILER_CACHE "auto" CACHE STRING "Speedup re-compilations using the caching tools; valid options are 'auto' (ccache, then sccache), 'ccache', 'sccache', or 'disabled'")
# It has pretty complex logic, because the ENABLE_CCACHE is deprecated, but still should
# control the COMPILER_CACHE
# After it will be completely removed, the following block will be much simpler
if (COMPILER_CACHE STREQUAL "ccache" OR (ENABLE_CCACHE AND NOT ENABLE_CCACHE STREQUAL "default"))
find_program (CCACHE_EXECUTABLE ccache)
elseif(COMPILER_CACHE STREQUAL "disabled" OR NOT ENABLE_CCACHE STREQUAL "default")
message(STATUS "Using *ccache: no (disabled via configuration)")
return()
elseif(COMPILER_CACHE STREQUAL "auto")
if(COMPILER_CACHE STREQUAL "auto")
find_program (CCACHE_EXECUTABLE ccache sccache)
elseif (COMPILER_CACHE STREQUAL "ccache")
find_program (CCACHE_EXECUTABLE ccache)
elseif(COMPILER_CACHE STREQUAL "sccache")
find_program (CCACHE_EXECUTABLE sccache)
elseif(COMPILER_CACHE STREQUAL "disabled")
message(STATUS "Using *ccache: no (disabled via configuration)")
return()
else()
message(${RECONFIGURE_MESSAGE_LEVEL} "The COMPILER_CACHE must be one of (auto|ccache|sccache|disabled), given '${COMPILER_CACHE}'")
message(${RECONFIGURE_MESSAGE_LEVEL} "The COMPILER_CACHE must be one of (auto|ccache|sccache|disabled), value: '${COMPILER_CACHE}'")
endif()

View File

@ -177,7 +177,19 @@ endif()
add_contrib (sqlite-cmake sqlite-amalgamation)
add_contrib (s2geometry-cmake s2geometry)
add_contrib (c-ares-cmake c-ares)
add_contrib (qpl-cmake qpl)
if (OS_LINUX AND ARCH_AMD64 AND (ENABLE_AVX2 OR ENABLE_AVX512))
option (ENABLE_QPL "Enable Intel® Query Processing Library" ${ENABLE_LIBRARIES})
elseif(ENABLE_QPL)
message (${RECONFIGURE_MESSAGE_LEVEL} "QPL library is only supported on x86_64 arch with avx2/avx512 support")
endif()
if (ENABLE_QPL)
add_contrib (idxd-config-cmake idxd-config)
add_contrib (qpl-cmake qpl) # requires: idxd-config
else()
message(STATUS "Not using QPL")
endif ()
add_contrib (morton-nd-cmake morton-nd)
if (ARCH_S390X)
add_contrib(crc32-s390x-cmake crc32-s390x)

1
contrib/idxd-config vendored Submodule

@ -0,0 +1 @@
Subproject commit f6605c41a735e3fdfef2d2d18655a33af6490b99

View File

@ -0,0 +1,23 @@
## accel_config is the utility library required by QPL-Deflate codec for controlling and configuring Intel® In-Memory Analytics Accelerator (Intel® IAA).
set (LIBACCEL_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/idxd-config")
set (UUID_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake")
set (LIBACCEL_HEADER_DIR "${ClickHouse_SOURCE_DIR}/contrib/idxd-config-cmake/include")
set (SRCS
"${LIBACCEL_SOURCE_DIR}/accfg/lib/libaccfg.c"
"${LIBACCEL_SOURCE_DIR}/util/log.c"
"${LIBACCEL_SOURCE_DIR}/util/sysfs.c"
)
add_library(_accel-config ${SRCS})
target_compile_options(_accel-config PRIVATE "-D_GNU_SOURCE")
target_include_directories(_accel-config BEFORE
PRIVATE ${UUID_DIR}
PRIVATE ${LIBACCEL_HEADER_DIR}
PRIVATE ${LIBACCEL_SOURCE_DIR})
target_include_directories(_accel-config SYSTEM BEFORE
PUBLIC ${LIBACCEL_SOURCE_DIR}/accfg)
add_library(ch_contrib::accel-config ALIAS _accel-config)

View File

@ -1,36 +1,5 @@
## The Intel® QPL provides high performance implementations of data processing functions for existing hardware accelerator, and/or software path in case if hardware accelerator is not available.
if (OS_LINUX AND ARCH_AMD64 AND (ENABLE_AVX2 OR ENABLE_AVX512))
option (ENABLE_QPL "Enable Intel® Query Processing Library" ${ENABLE_LIBRARIES})
elseif(ENABLE_QPL)
message (${RECONFIGURE_MESSAGE_LEVEL} "QPL library is only supported on x86_64 arch with avx2/avx512 support")
endif()
if (NOT ENABLE_QPL)
message(STATUS "Not using QPL")
return()
endif()
## QPL has build dependency on libaccel-config. Here is to build libaccel-config which is required by QPL.
## libaccel-config is the utility library for controlling and configuring Intel® In-Memory Analytics Accelerator (Intel® IAA).
set (LIBACCEL_SOURCE_DIR "${ClickHouse_SOURCE_DIR}/contrib/idxd-config")
set (UUID_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake")
set (LIBACCEL_HEADER_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl-cmake/idxd-header")
set (SRCS
"${LIBACCEL_SOURCE_DIR}/accfg/lib/libaccfg.c"
"${LIBACCEL_SOURCE_DIR}/util/log.c"
"${LIBACCEL_SOURCE_DIR}/util/sysfs.c"
)
add_library(accel-config ${SRCS})
target_compile_options(accel-config PRIVATE "-D_GNU_SOURCE")
target_include_directories(accel-config BEFORE
PRIVATE ${UUID_DIR}
PRIVATE ${LIBACCEL_HEADER_DIR}
PRIVATE ${LIBACCEL_SOURCE_DIR})
## QPL build start here.
set (QPL_PROJECT_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl")
set (QPL_SRC_DIR "${ClickHouse_SOURCE_DIR}/contrib/qpl/sources")
set (QPL_BINARY_DIR "${ClickHouse_BINARY_DIR}/build/contrib/qpl")
@ -342,12 +311,12 @@ target_compile_definitions(_qpl
PUBLIC -DENABLE_QPL_COMPRESSION)
target_link_libraries(_qpl
PRIVATE accel-config
PRIVATE ch_contrib::accel-config
PRIVATE ch_contrib::isal
PRIVATE ${CMAKE_DL_LIBS})
add_library (ch_contrib::qpl ALIAS _qpl)
target_include_directories(_qpl SYSTEM BEFORE
PUBLIC "${QPL_PROJECT_DIR}/include"
PUBLIC "${LIBACCEL_SOURCE_DIR}/accfg"
PUBLIC ${UUID_DIR})
add_library (ch_contrib::qpl ALIAS _qpl)

View File

@ -36,12 +36,10 @@ RUN arch=${TARGETARCH:-amd64} \
# repo versions doesn't work correctly with C++17
# also we push reports to s3, so we add index.html to subfolder urls
# https://github.com/ClickHouse-Extras/woboq_codebrowser/commit/37e15eaf377b920acb0b48dbe82471be9203f76b
RUN git clone https://github.com/ClickHouse/woboq_codebrowser \
&& cd woboq_codebrowser \
RUN git clone --depth=1 https://github.com/ClickHouse/woboq_codebrowser /woboq_codebrowser \
&& cd /woboq_codebrowser \
&& cmake . -G Ninja -DCMAKE_BUILD_TYPE=Release -DCMAKE_CXX_COMPILER=clang\+\+-${LLVM_VERSION} -DCMAKE_C_COMPILER=clang-${LLVM_VERSION} \
&& ninja \
&& cd .. \
&& rm -rf woboq_codebrowser
&& ninja
ENV CODEGEN=/woboq_codebrowser/generator/codebrowser_generator
ENV CODEINDEX=/woboq_codebrowser/indexgenerator/codebrowser_indexgenerator

View File

@ -184,6 +184,15 @@ sudo yum install -y yum-utils
sudo yum-config-manager --add-repo https://packages.clickhouse.com/rpm/clickhouse.repo
```
For systems with `zypper` package manager (openSUSE, SLES):
``` bash
sudo zypper addrepo -r https://packages.clickhouse.com/rpm/clickhouse.repo -g
sudo zypper --gpg-auto-import-keys refresh clickhouse-stable
```
Later any `yum install` can be replaced by `zypper install`. To specify a particular version, add `-$VERSION` to the end of the package name, e.g. `clickhouse-client-22.2.2.22`.
#### Install ClickHouse server and client
```bash

View File

@ -30,7 +30,7 @@ description: In order to effectively mitigate possible human errors, you should
```
:::note ALL
`ALL` is only applicable to the `RESTORE` command.
`ALL` is only applicable to the `RESTORE` command prior to version 23.4 of Clickhouse.
:::
## Background

View File

@ -1045,7 +1045,7 @@ Default value: `0`.
## background_pool_size {#background_pool_size}
Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance.
Sets the number of threads performing background merges and mutations for tables with MergeTree engines. This setting is also could be applied at server startup from the `default` profile configuration for backward compatibility at the ClickHouse server start. You can only increase the number of threads at runtime. To lower the number of threads you have to restart the server. By adjusting this setting, you manage CPU and disk load. Smaller pool size utilizes less CPU and disk resources, but background processes advance slower which might eventually impact query performance.
Before changing it, please also take a look at related MergeTree settings, such as [number_of_free_entries_in_pool_to_lower_max_size_of_merge](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-lower-max-size-of-merge) and [number_of_free_entries_in_pool_to_execute_mutation](../../operations/settings/merge-tree-settings.md#number-of-free-entries-in-pool-to-execute-mutation).
@ -1063,8 +1063,8 @@ Default value: 16.
## background_merges_mutations_concurrency_ratio {#background_merges_mutations_concurrency_ratio}
Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example if the ratio equals to 2 and
`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operation could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server.
Sets a ratio between the number of threads and the number of background merges and mutations that can be executed concurrently. For example, if the ratio equals to 2 and
`background_pool_size` is set to 16 then ClickHouse can execute 32 background merges concurrently. This is possible, because background operations could be suspended and postponed. This is needed to give small merges more execution priority. You can only increase this ratio at runtime. To lower it you have to restart the server.
The same as for `background_pool_size` setting `background_merges_mutations_concurrency_ratio` could be applied from the `default` profile for backward compatibility.
Possible values:
@ -1079,6 +1079,33 @@ Default value: 2.
<background_merges_mutations_concurrency_ratio>3</background_merges_mutations_concurrency_ratio>
```
## merges_mutations_memory_usage_soft_limit {#merges_mutations_memory_usage_soft_limit}
Sets the limit on how much RAM is allowed to use for performing merge and mutation operations.
Zero means unlimited.
If ClickHouse reaches this limit, it won't schedule any new background merge or mutation operations but will continue to execute already scheduled tasks.
Possible values:
- Any positive integer.
**Example**
```xml
<merges_mutations_memory_usage_soft_limit>0</merges_mutations_memory_usage_soft_limit>
```
## merges_mutations_memory_usage_to_ram_ratio {#merges_mutations_memory_usage_to_ram_ratio}
The default `merges_mutations_memory_usage_soft_limit` value is calculated as `memory_amount * merges_mutations_memory_usage_to_ram_ratio`.
Default value: `0.5`.
**See also**
- [max_memory_usage](../../operations/settings/query-complexity.md#settings_max_memory_usage)
- [merges_mutations_memory_usage_soft_limit](#merges_mutations_memory_usage_soft_limit)
## background_merges_mutations_scheduling_policy {#background_merges_mutations_scheduling_policy}
Algorithm used to select next merge or mutation to be executed by background thread pool. Policy may be changed at runtime without server restart.

View File

@ -8,10 +8,6 @@ sidebar_label: Interval
The family of data types representing time and date intervals. The resulting types of the [INTERVAL](../../../sql-reference/operators/index.md#operator-interval) operator.
:::note
`Interval` data type values cant be stored in tables.
:::
Structure:
- Time interval as an unsigned integer value.
@ -19,6 +15,9 @@ Structure:
Supported interval types:
- `NANOSECOND`
- `MICROSECOND`
- `MILLISECOND`
- `SECOND`
- `MINUTE`
- `HOUR`

View File

@ -78,6 +78,22 @@ GROUP BY
│ 1 │ Bobruisk │ Firefox │
└─────────────┴──────────┴─────────┘
```
### Important note!
Using multiple `arrayJoin` with same expression may not produce expected results due to optimizations.
For that cases, consider modifying repeated array expression with extra operations that do not affect join result - e.g. `arrayJoin(arraySort(arr))`, `arrayJoin(arrayConcat(arr, []))`
Example:
```sql
SELECT
arrayJoin(dice) as first_throw,
/* arrayJoin(dice) as second_throw */ -- is technically correct, but will annihilate result set
arrayJoin(arrayConcat(dice, [])) as second_throw -- intentionally changed expression to force re-evaluation
FROM (
SELECT [1, 2, 3, 4, 5, 6] as dice
);
```
Note the [ARRAY JOIN](../statements/select/array-join.md) syntax in the SELECT query, which provides broader possibilities.
`ARRAY JOIN` allows you to convert multiple arrays with the same number of elements at a time.

View File

@ -26,19 +26,27 @@ SELECT
## makeDate
Creates a [Date](../../sql-reference/data-types/date.md) from a year, month and day argument.
Creates a [Date](../../sql-reference/data-types/date.md)
- from a year, month and day argument, or
- from a year and day of year argument.
**Syntax**
``` sql
makeDate(year, month, day)
makeDate(year, month, day);
makeDate(year, day_of_year);
```
Alias:
- `MAKEDATE(year, month, day);`
- `MAKEDATE(year, day_of_year);`
**Arguments**
- `year` — Year. [Integer](../../sql-reference/data-types/int-uint.md), [Float](../../sql-reference/data-types/float.md) or [Decimal](../../sql-reference/data-types/decimal.md).
- `month` — Month. [Integer](../../sql-reference/data-types/int-uint.md), [Float](../../sql-reference/data-types/float.md) or [Decimal](../../sql-reference/data-types/decimal.md).
- `day` — Day. [Integer](../../sql-reference/data-types/int-uint.md), [Float](../../sql-reference/data-types/float.md) or [Decimal](../../sql-reference/data-types/decimal.md).
- `day_of_year` — Day of the year. [Integer](../../sql-reference/data-types/int-uint.md), [Float](../../sql-reference/data-types/float.md) or [Decimal](../../sql-reference/data-types/decimal.md).
**Returned value**
@ -48,6 +56,8 @@ Type: [Date](../../sql-reference/data-types/date.md).
**Example**
Create a Date from a year, month and day:
``` sql
SELECT makeDate(2023, 2, 28) AS Date;
```
@ -60,6 +70,19 @@ Result:
└────────────┘
```
Create a Date from a year and day of year argument:
``` sql
SELECT makeDate(2023, 42) AS Date;
```
Result:
``` text
┌───────date─┐
│ 2023-02-11 │
└────────────┘
```
## makeDate32
Like [makeDate](#makeDate) but produces a [Date32](../../sql-reference/data-types/date32.md).
@ -108,6 +131,12 @@ Result:
Like [makeDateTime](#makedatetime) but produces a [DateTime64](../../sql-reference/data-types/datetime64.md).
**Syntax**
``` sql
makeDateTime32(year, month, day, hour, minute, second[, fraction[, precision[, timezone]]])
```
## timeZone
Returns the timezone of the server.

View File

@ -1215,96 +1215,3 @@ Result:
│ A240 │
└──────────────────┘
```
## extractKeyValuePairs
Extracts key-value pairs from any string. The string does not need to be 100% structured in a key value pair format;
It can contain noise (e.g. log files). The key-value pair format to be interpreted should be specified via function arguments.
A key-value pair consists of a key followed by a `key_value_delimiter` and a value. Quoted keys and values are also supported. Key value pairs must be separated by pair delimiters.
**Syntax**
``` sql
extractKeyValuePairs(data, [key_value_delimiter], [pair_delimiter], [quoting_character])
```
**Arguments**
- `data` - String to extract key-value pairs from. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `key_value_delimiter` - Character to be used as delimiter between the key and the value. Defaults to `:`. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `pair_delimiters` - Set of character to be used as delimiters between pairs. Defaults to `\space`, `,` and `;`. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `quoting_character` - Character to be used as quoting character. Defaults to `"`. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
**Returned values**
- The extracted key-value pairs in a Map(String, String).
**Examples**
Query:
**Simple case**
``` sql
arthur :) select extractKeyValuePairs('name:neymar, age:31 team:psg,nationality:brazil') as kv
SELECT extractKeyValuePairs('name:neymar, age:31 team:psg,nationality:brazil') as kv
Query id: f9e0ca6f-3178-4ee2-aa2c-a5517abb9cee
┌─kv──────────────────────────────────────────────────────────────────────┐
│ {'name':'neymar','age':'31','team':'psg','nationality':'brazil'} │
└─────────────────────────────────────────────────────────────────────────┘
```
**Single quote as quoting character**
``` sql
arthur :) select extractKeyValuePairs('name:\'neymar\';\'age\':31;team:psg;nationality:brazil,last_key:last_value', ':', ';,', '\'') as kv
SELECT extractKeyValuePairs('name:\'neymar\';\'age\':31;team:psg;nationality:brazil,last_key:last_value', ':', ';,', '\'') as kv
Query id: 0e22bf6b-9844-414a-99dc-32bf647abd5e
┌─kv───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ {'name':'neymar','age':'31','team':'psg','nationality':'brazil','last_key':'last_value'} │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
**Escape sequences without escape sequences support**
``` sql
arthur :) select extractKeyValuePairs('age:a\\x0A\\n\\0') as kv
SELECT extractKeyValuePairs('age:a\\x0A\\n\\0') AS kv
Query id: e9fd26ee-b41f-4a11-b17f-25af6fd5d356
┌─kv─────────────────────┐
│ {'age':'a\\x0A\\n\\0'} │
└────────────────────────┘
```
## extractKeyValuePairsWithEscaping
Same as `extractKeyValuePairs` but with escaping support.
Escape sequences supported: `\x`, `\N`, `\a`, `\b`, `\e`, `\f`, `\n`, `\r`, `\t`, `\v` and `\0`.
Non standard escape sequences are returned as it is (including the backslash) unless they are one of the following:
`\\`, `'`, `"`, `backtick`, `/`, `=` or ASCII control characters (c <= 31).
This function will satisfy the use case where pre-escaping and post-escaping are not suitable. For instance, consider the following
input string: `a: "aaaa\"bbb"`. The expected output is: `a: aaaa\"bbbb`.
- Pre-escaping: Pre-escaping it will output: `a: "aaaa"bbb"` and `extractKeyValuePairs` will then output: `a: aaaa`
- Post-escaping: `extractKeyValuePairs` will output `a: aaaa\` and post-escaping will keep it as it is.
Leading escape sequences will be skipped in keys and will be considered invalid for values.
**Escape sequences with escape sequence support turned on**
``` sql
arthur :) select extractKeyValuePairsWithEscaping('age:a\\x0A\\n\\0') as kv
SELECT extractKeyValuePairsWithEscaping('age:a\\x0A\\n\\0') AS kv
Query id: 44c114f0-5658-4c75-ab87-4574de3a1645
┌─kv────────────────┐
│ {'age':'a\n\n\0'} │
└───────────────────┘
```

View File

@ -109,6 +109,108 @@ SELECT mapFromArrays([1, 2, 3], map('a', 1, 'b', 2, 'c', 3))
└───────────────────────────────────────────────────────┘
```
## extractKeyValuePairs
Extracts key-value pairs, i.e. a [Map(String, String)](../../sql-reference/data-types/map.md), from a string. Parsing is robust towards noise (e.g. log files).
A key-value pair consists of a key, followed by a `key_value_delimiter` and a value. Key value pairs must be separated by `pair_delimiter`. Quoted keys and values are also supported.
**Syntax**
``` sql
extractKeyValuePairs(data[, key_value_delimiter[, pair_delimiter[, quoting_character]]])
```
Alias:
- `str_to_map`
- `mapFromString`
**Arguments**
- `data` - String to extract key-value pairs from. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `key_value_delimiter` - Character to be used as delimiter between the key and the value. Defaults to `:`. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `pair_delimiters` - Set of character to be used as delimiters between pairs. Defaults to ` `, `,` and `;`. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
- `quoting_character` - Character to be used as quoting character. Defaults to `"`. [String](../../sql-reference/data-types/string.md) or [FixedString](../../sql-reference/data-types/fixedstring.md).
**Returned values**
- A [Map(String, String)](../../sql-reference/data-types/map.md) of key-value pairs.
**Examples**
Simple case:
``` sql
SELECT extractKeyValuePairs('name:neymar, age:31 team:psg,nationality:brazil') as kv
```
Result:
``` Result:
┌─kv──────────────────────────────────────────────────────────────────────┐
│ {'name':'neymar','age':'31','team':'psg','nationality':'brazil'} │
└─────────────────────────────────────────────────────────────────────────┘
```
Single quote as quoting character:
``` sql
SELECT extractKeyValuePairs('name:\'neymar\';\'age\':31;team:psg;nationality:brazil,last_key:last_value', ':', ';,', '\'') as kv
```
Result:
``` text
┌─kv───────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ {'name':'neymar','age':'31','team':'psg','nationality':'brazil','last_key':'last_value'} │
└──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────┘
```
Escape sequences without escape sequences support:
``` sql
SELECT extractKeyValuePairs('age:a\\x0A\\n\\0') AS kv
```
Result:
``` text
┌─kv─────────────────────┐
│ {'age':'a\\x0A\\n\\0'} │
└────────────────────────┘
```
## extractKeyValuePairsWithEscaping
Same as `extractKeyValuePairs` but with escaping support.
Supported escape sequences: `\x`, `\N`, `\a`, `\b`, `\e`, `\f`, `\n`, `\r`, `\t`, `\v` and `\0`.
Non standard escape sequences are returned as it is (including the backslash) unless they are one of the following:
`\\`, `'`, `"`, `backtick`, `/`, `=` or ASCII control characters (c <= 31).
This function will satisfy the use case where pre-escaping and post-escaping are not suitable. For instance, consider the following
input string: `a: "aaaa\"bbb"`. The expected output is: `a: aaaa\"bbbb`.
- Pre-escaping: Pre-escaping it will output: `a: "aaaa"bbb"` and `extractKeyValuePairs` will then output: `a: aaaa`
- Post-escaping: `extractKeyValuePairs` will output `a: aaaa\` and post-escaping will keep it as it is.
Leading escape sequences will be skipped in keys and will be considered invalid for values.
**Examples**
Escape sequences with escape sequence support turned on:
``` sql
SELECT extractKeyValuePairsWithEscaping('age:a\\x0A\\n\\0') AS kv
```
Result:
``` result
┌─kv────────────────┐
│ {'age':'a\n\n\0'} │
└───────────────────┘
```
## mapAdd
Collect all the keys and sum corresponding values.

View File

@ -77,15 +77,37 @@ clickhouse-client # or "clickhouse-client --password" if you set up a password.
Команда ClickHouse в Яндексе рекомендует использовать официальные предкомпилированные `rpm` пакеты для CentOS, RedHat и всех остальных дистрибутивов Linux, основанных на rpm.
#### Установка официального репозитория
Сначала нужно подключить официальный репозиторий:
``` bash
sudo yum install -y yum-utils
sudo yum-config-manager --add-repo https://packages.clickhouse.com/rpm/clickhouse.repo
sudo yum install -y clickhouse-server clickhouse-client
```
sudo /etc/init.d/clickhouse-server start
clickhouse-client # or "clickhouse-client --password" if you set up a password.
Для систем с пакетным менеджером `zypper` (openSUSE, SLES):
``` bash
sudo zypper addrepo -r https://packages.clickhouse.com/rpm/clickhouse.repo -g
sudo zypper --gpg-auto-import-keys refresh clickhouse-stable
```
Далее любая команда `yum install` может быть заменена на `zypper install`. Чтобы указать желаемую версию, необходимо добавить `-$VERSION` в имени пакета, например `clickhouse-client-22.2.2.22`.
#### Установка сервера и клиента
``` bash
sudo yum install -y clickhouse-server clickhouse-client
```
#### Запуск сервера
``` bash
sudo systemctl enable clickhouse-server
sudo systemctl start clickhouse-server
sudo systemctl status clickhouse-server
clickhouse-client # илм "clickhouse-client --password" если установлен пароль
```
<details markdown="1">

View File

@ -84,6 +84,17 @@ sudo /etc/init.d/clickhouse-server start
clickhouse-client # or "clickhouse-client --password" if you set up a password.
```
For systems with `zypper` package manager (openSUSE, SLES):
``` bash
sudo zypper addrepo -r https://packages.clickhouse.com/rpm/clickhouse.repo -g
sudo zypper --gpg-auto-import-keys refresh clickhouse-stable
sudo zypper install -y clickhouse-server clickhouse-client
sudo /etc/init.d/clickhouse-server start
clickhouse-client # or "clickhouse-client --password" if you set up a password.
```
<details markdown="1">
<summary>Deprecated Method for installing rpm-packages</summary>

View File

@ -114,7 +114,7 @@ if (BUILD_STANDALONE_KEEPER)
clickhouse_add_executable(clickhouse-keeper ${CLICKHOUSE_KEEPER_STANDALONE_SOURCES})
# Remove some redundant dependencies
target_compile_definitions (clickhouse-keeper PRIVATE -DKEEPER_STANDALONE_BUILD)
target_compile_definitions (clickhouse-keeper PRIVATE -DCLICKHOUSE_PROGRAM_STANDALONE_BUILD)
target_compile_definitions (clickhouse-keeper PUBLIC -DWITHOUT_TEXT_LOG)
target_include_directories(clickhouse-keeper PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../../src") # uses includes from src directory

View File

@ -57,7 +57,7 @@ int mainEntryClickHouseKeeper(int argc, char ** argv)
}
}
#ifdef KEEPER_STANDALONE_BUILD
#ifdef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
// Weak symbols don't work correctly on Darwin
// so we have a stub implementation to avoid linker errors

View File

@ -130,6 +130,7 @@ namespace CurrentMetrics
extern const Metric Revision;
extern const Metric VersionInteger;
extern const Metric MemoryTracking;
extern const Metric MergesMutationsMemoryTracking;
extern const Metric MaxDDLEntryID;
extern const Metric MaxPushedDDLEntryID;
}
@ -1225,6 +1226,25 @@ try
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
size_t merges_mutations_memory_usage_soft_limit = server_settings_.merges_mutations_memory_usage_soft_limit;
size_t default_merges_mutations_server_memory_usage = static_cast<size_t>(memory_amount * server_settings_.merges_mutations_memory_usage_to_ram_ratio);
if (merges_mutations_memory_usage_soft_limit == 0 || merges_mutations_memory_usage_soft_limit > default_merges_mutations_server_memory_usage)
{
merges_mutations_memory_usage_soft_limit = default_merges_mutations_server_memory_usage;
LOG_WARNING(log, "Setting merges_mutations_memory_usage_soft_limit was set to {}"
" ({} available * {:.2f} merges_mutations_memory_usage_to_ram_ratio)",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit),
formatReadableSizeWithBinarySuffix(memory_amount),
server_settings_.merges_mutations_memory_usage_to_ram_ratio);
}
LOG_INFO(log, "Merges and mutations memory limit is set to {}",
formatReadableSizeWithBinarySuffix(merges_mutations_memory_usage_soft_limit));
background_memory_tracker.setSoftLimit(merges_mutations_memory_usage_soft_limit);
background_memory_tracker.setDescription("(background)");
background_memory_tracker.setMetric(CurrentMetrics::MergesMutationsMemoryTracking);
total_memory_tracker.setAllowUseJemallocMemory(server_settings_.allow_use_jemalloc_memory);
auto * global_overcommit_tracker = global_context->getGlobalOvercommitTracker();
@ -1238,8 +1258,13 @@ try
global_context->setMacros(std::make_unique<Macros>(*config, "macros", log));
global_context->setExternalAuthenticatorsConfig(*config);
global_context->loadOrReloadDictionaries(*config);
global_context->loadOrReloadUserDefinedExecutableFunctions(*config);
if (global_context->isServerCompletelyStarted())
{
/// It does not make sense to reload anything before server has started.
/// Moreover, it may break initialization order.
global_context->loadOrReloadDictionaries(*config);
global_context->loadOrReloadUserDefinedExecutableFunctions(*config);
}
global_context->setRemoteHostFilter(*config);

View File

@ -335,7 +335,7 @@ public:
if constexpr (std::endian::native == std::endian::little)
hash_value = hash(x);
else
hash_value = __builtin_bswap32(hash(x));
hash_value = std::byteswap(hash(x));
if (!good(hash_value))
return;

View File

@ -544,6 +544,10 @@ if (TARGET ch_contrib::qpl)
dbms_target_link_libraries(PUBLIC ch_contrib::qpl)
endif ()
if (TARGET ch_contrib::accel-config)
dbms_target_link_libraries(PUBLIC ch_contrib::accel-config)
endif ()
target_link_libraries(clickhouse_common_io PUBLIC boost::context)
dbms_target_link_libraries(PUBLIC boost::context)

View File

@ -53,6 +53,7 @@
M(QueryThread, "Number of query processing threads") \
M(ReadonlyReplica, "Number of Replicated tables that are currently in readonly state due to re-initialization after ZooKeeper session loss or due to startup without ZooKeeper configured.") \
M(MemoryTracking, "Total amount of memory (bytes) allocated by the server.") \
M(MergesMutationsMemoryTracking, "Total amount of memory (bytes) allocated by background tasks (merges and mutations).") \
M(EphemeralNode, "Number of ephemeral nodes hold in ZooKeeper.") \
M(ZooKeeperSession, "Number of sessions (connections) to ZooKeeper. Should be no more than one, because using more than one connection to ZooKeeper may lead to bugs due to lack of linearizability (stale reads) that ZooKeeper consistency model allows.") \
M(ZooKeeperWatch, "Number of watches (event subscriptions) in ZooKeeper.") \

View File

@ -80,6 +80,8 @@ template <
class ClearableHashSet
: public HashTable<Key, ClearableHashTableCell<Key, HashTableCell<Key, Hash, ClearableHashSetState>>, Hash, Grower, Allocator>
{
using Cell = ClearableHashTableCell<Key, HashTableCell<Key, Hash, ClearableHashSetState>>;
public:
using Base = HashTable<Key, ClearableHashTableCell<Key, HashTableCell<Key, Hash, ClearableHashSetState>>, Hash, Grower, Allocator>;
using typename Base::LookupResult;
@ -88,6 +90,13 @@ public:
{
++this->version;
this->m_size = 0;
if constexpr (Cell::need_zero_value_storage)
{
/// clear ZeroValueStorage
if (this->hasZero())
this->clearHasZero();
}
}
};
@ -103,11 +112,20 @@ class ClearableHashSetWithSavedHash : public HashTable<
Grower,
Allocator>
{
using Cell = ClearableHashTableCell<Key, HashSetCellWithSavedHash<Key, Hash, ClearableHashSetState>>;
public:
void clear()
{
++this->version;
this->m_size = 0;
if constexpr (Cell::need_zero_value_storage)
{
/// clear ZeroValueStorage
if (this->hasZero())
this->clearHasZero();
}
}
};

View File

@ -96,12 +96,17 @@ using namespace std::chrono_literals;
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
MemoryTracker total_memory_tracker(nullptr, VariableContext::Global);
MemoryTracker background_memory_tracker(&total_memory_tracker, VariableContext::User, false);
std::atomic<Int64> MemoryTracker::free_memory_in_allocator_arenas;
MemoryTracker::MemoryTracker(VariableContext level_) : parent(&total_memory_tracker), level(level_) {}
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_) : parent(parent_), level(level_) {}
MemoryTracker::MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_)
: parent(parent_)
, log_peak_memory_usage_in_destructor(log_peak_memory_usage_in_destructor_)
, level(level_)
{}
MemoryTracker::~MemoryTracker()
{
@ -528,3 +533,10 @@ void MemoryTracker::setOrRaiseProfilerLimit(Int64 value)
while ((value == 0 || old_value < value) && !profiler_limit.compare_exchange_weak(old_value, value))
;
}
bool canEnqueueBackgroundTask()
{
auto limit = background_memory_tracker.getSoftLimit();
auto amount = background_memory_tracker.get();
return limit == 0 || amount < limit;
}

View File

@ -98,6 +98,7 @@ public:
explicit MemoryTracker(VariableContext level_ = VariableContext::Thread);
explicit MemoryTracker(MemoryTracker * parent_, VariableContext level_ = VariableContext::Thread);
MemoryTracker(MemoryTracker * parent_, VariableContext level_, bool log_peak_memory_usage_in_destructor_);
~MemoryTracker();
@ -110,6 +111,22 @@ public:
return amount.load(std::memory_order_relaxed);
}
// Merges and mutations may pass memory ownership to other threads thus in the end of execution
// MemoryTracker for background task may have a non-zero counter.
// This method is intended to fix the counter inside of background_memory_tracker.
// NOTE: We can't use alloc/free methods to do it, because they also will change the value inside
// of total_memory_tracker.
void adjustOnBackgroundTaskEnd(const MemoryTracker * child)
{
auto background_memory_consumption = child->amount.load(std::memory_order_relaxed);
amount.fetch_sub(background_memory_consumption, std::memory_order_relaxed);
// Also fix CurrentMetrics::MergesMutationsMemoryTracking
auto metric_loaded = metric.load(std::memory_order_relaxed);
if (metric_loaded != CurrentMetrics::end())
CurrentMetrics::sub(metric_loaded, background_memory_consumption);
}
Int64 getPeak() const
{
return peak.load(std::memory_order_relaxed);
@ -220,3 +237,6 @@ public:
};
extern MemoryTracker total_memory_tracker;
extern MemoryTracker background_memory_tracker;
bool canEnqueueBackgroundTask();

View File

@ -191,10 +191,8 @@
\
M(InsertedWideParts, "Number of parts inserted in Wide format.") \
M(InsertedCompactParts, "Number of parts inserted in Compact format.") \
M(InsertedInMemoryParts, "Number of parts inserted in InMemory format.") \
M(MergedIntoWideParts, "Number of parts merged into Wide format.") \
M(MergedIntoCompactParts, "Number of parts merged into Compact format.") \
M(MergedIntoInMemoryParts, "Number of parts in merged into InMemory format.") \
\
M(MergeTreeDataProjectionWriterRows, "Number of rows INSERTed to MergeTree tables projection.") \
M(MergeTreeDataProjectionWriterUncompressedBytes, "Uncompressed bytes (for columns as they stored in memory) INSERTed to MergeTree tables projection.") \

View File

@ -378,6 +378,13 @@ void transpose(const T * src, char * dst, UInt32 num_bits, UInt32 tail = 64)
/// UInt64[N] transposed matrix -> UIntX[64]
template <typename T, bool full = false>
#if defined(__s390x__)
/* Compiler Bug for S390x :- https://github.com/llvm/llvm-project/issues/62572
* Please remove this after the fix is backported
*/
__attribute__((noinline))
#endif
void reverseTranspose(const char * src, T * buf, UInt32 num_bits, UInt32 tail = 64)
{
UInt64 matrix[64] = {};

View File

@ -172,7 +172,7 @@ void registerCodecDeflateQpl(CompressionCodecFactory & factory);
/// Keeper use only general-purpose codecs, so we don't need these special codecs
/// in standalone build
#ifndef KEEPER_STANDALONE_BUILD
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
void registerCodecDelta(CompressionCodecFactory & factory);
void registerCodecT64(CompressionCodecFactory & factory);
void registerCodecDoubleDelta(CompressionCodecFactory & factory);
@ -188,7 +188,7 @@ CompressionCodecFactory::CompressionCodecFactory()
registerCodecZSTD(*this);
registerCodecLZ4HC(*this);
registerCodecMultiple(*this);
#ifndef KEEPER_STANDALONE_BUILD
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
registerCodecDelta(*this);
registerCodecT64(*this);
registerCodecDoubleDelta(*this);

View File

@ -42,6 +42,8 @@ namespace DB
M(String, temporary_data_in_cache, "", "Cache disk name for temporary data.", 0) \
M(UInt64, max_server_memory_usage, 0, "Limit on total memory usage. Zero means Unlimited.", 0) \
M(Double, max_server_memory_usage_to_ram_ratio, 0.9, "Same as max_server_memory_usage but in to ram ratio. Allows to lower max memory on low-memory systems.", 0) \
M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Limit on total memory usage for merges and mutations. Zero means Unlimited.", 0) \
M(Double, merges_mutations_memory_usage_to_ram_ratio, 0.5, "Same as merges_mutations_memory_usage_soft_limit but in to ram ratio. Allows to lower memory limit on low-memory systems.", 0) \
M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \
\
M(UInt64, max_concurrent_queries, 0, "Limit on total number of concurrently executed queries. Zero means Unlimited.", 0) \

View File

@ -712,6 +712,7 @@ class IColumn;
M(String, additional_result_filter, "", "Additional filter expression which would be applied to query result", 0) \
\
M(String, workload, "default", "Name of workload to be used to access resources", 0) \
M(Milliseconds, storage_system_stack_trace_pipe_read_timeout_ms, 100, "Maximum time to read from a pipe for receiving information from the threads when querying the `system.stack_trace` table. This setting is used for testing purposes and not meant to be changed by users.", 0) \
\
M(Bool, parallelize_output_from_storages, true, "Parallelize output for reading step from storage. It allows parallelizing query processing right after reading from storage if possible", 0) \
\
@ -733,7 +734,7 @@ class IColumn;
M(Bool, multiple_joins_try_to_keep_original_names, false, "Do not add aliases to top level expression list on multiple joins rewrite", 0) \
M(UInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \
M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, optimize_distinct_in_order, false, "This optimization has a bug and it is disabled. Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, optimize_sorting_by_input_stream_properties, true, "Optimize sorting by sorting properties of input stream", 0) \
M(UInt64, insert_keeper_max_retries, 20, "Max retries for keeper operations during insert", 0) \
M(UInt64, insert_keeper_retry_initial_backoff_ms, 100, "Initial backoff timeout for keeper operations during insert", 0) \

View File

@ -338,7 +338,7 @@ void SettingFieldString::readBinary(ReadBuffer & in)
/// that. The linker does not complain only because clickhouse-keeper does not call any of below
/// functions. A cleaner alternative would be more modular libraries, e.g. one for data types, which
/// could then be linked by the server and the linker.
#ifndef KEEPER_STANDALONE_BUILD
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
SettingFieldMap::SettingFieldMap(const Field & f) : value(fieldToMap(f)) {}

View File

@ -18,7 +18,7 @@
#include "config.h"
#include "config_version.h"
#if USE_SENTRY && !defined(KEEPER_STANDALONE_BUILD)
#if USE_SENTRY && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
# include <sentry.h>
# include <cstdio>

View File

@ -11,9 +11,6 @@ namespace DB
*
* Mostly the same as Int64.
* But also tagged with interval kind.
*
* Intended usage is for temporary elements in expressions,
* not for storing values in tables.
*/
class DataTypeInterval final : public DataTypeNumberBase<Int64>
{
@ -34,7 +31,6 @@ public:
bool equals(const IDataType & rhs) const override;
bool isParametric() const override { return true; }
bool cannotBeStoredInTables() const override { return true; }
bool isCategorial() const override { return false; }
bool canBeInsideNullable() const override { return true; }
};

View File

@ -1,5 +1,6 @@
#include <Databases/DDLDependencyVisitor.h>
#include <Dictionaries/getDictionaryConfigurationFromAST.h>
#include <Databases/removeWhereConditionPlaceholder.h>
#include <Interpreters/Cluster.h>
#include <Interpreters/Context.h>
#include <Interpreters/misc.h>
@ -12,6 +13,8 @@
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Parsers/ParserSelectWithUnionQuery.h>
#include <Parsers/parseQuery.h>
#include <Common/KnownObjectNames.h>
#include <Poco/String.h>
@ -25,6 +28,7 @@ namespace
/// Used to visits ASTCreateQuery and extracts the names of all tables explicitly referenced in the create query.
class DDLDependencyVisitorData
{
friend void tryVisitNestedSelect(const String & query, DDLDependencyVisitorData & data);
public:
DDLDependencyVisitorData(const ContextPtr & context_, const QualifiedTableName & table_name_, const ASTPtr & ast_)
: create_query(ast_), table_name(table_name_), current_database(context_->getCurrentDatabase()), context(context_)
@ -106,9 +110,17 @@ namespace
if (!info || !info->is_local)
return;
if (info->table_name.database.empty())
info->table_name.database = current_database;
dependencies.emplace(std::move(info->table_name));
if (!info->table_name.table.empty())
{
if (info->table_name.database.empty())
info->table_name.database = current_database;
dependencies.emplace(std::move(info->table_name));
}
else
{
/// We don't have a table name, we have a select query instead
tryVisitNestedSelect(info->query, *this);
}
}
/// ASTTableExpression represents a reference to a table in SELECT query.
@ -424,6 +436,25 @@ namespace
static bool needChildVisit(const ASTPtr &, const ASTPtr & child, const Data & data) { return data.needChildVisit(child); }
static void visit(const ASTPtr & ast, Data & data) { data.visit(ast); }
};
void tryVisitNestedSelect(const String & query, DDLDependencyVisitorData & data)
{
try
{
ParserSelectWithUnionQuery parser;
String description = fmt::format("Query for ClickHouse dictionary {}", data.table_name);
String fixed_query = removeWhereConditionPlaceholder(query);
ASTPtr select = parseQuery(parser, fixed_query, description,
data.context->getSettingsRef().max_query_size, data.context->getSettingsRef().max_parser_depth);
DDLDependencyVisitor::Visitor visitor{data};
visitor.visit(select);
}
catch (...)
{
tryLogCurrentException("DDLDependencyVisitor");
}
}
}

View File

@ -103,7 +103,7 @@ void DDLLoadingDependencyVisitor::visit(const ASTFunctionWithKeyValueArguments &
auto config = getDictionaryConfigurationFromAST(data.create_query->as<ASTCreateQuery &>(), data.global_context);
auto info = getInfoIfClickHouseDictionarySource(config, data.global_context);
if (!info || !info->is_local)
if (!info || !info->is_local || info->table_name.table.empty())
return;
if (info->table_name.database.empty())

View File

@ -137,7 +137,7 @@ namespace
auto config = getDictionaryConfigurationFromAST(data.create_query->as<ASTCreateQuery &>(), data.global_context);
auto info = getInfoIfClickHouseDictionarySource(config, data.global_context);
if (!info || !info->is_local)
if (!info || !info->is_local || info->table_name.table.empty())
return;
auto * source_list = dictionary.source->elements->as<ASTExpressionList>();

View File

@ -0,0 +1,20 @@
#include <Databases/removeWhereConditionPlaceholder.h>
namespace DB
{
std::string removeWhereConditionPlaceholder(const std::string & query)
{
static constexpr auto true_condition = "(1 = 1)";
auto condition_position = query.find(CONDITION_PLACEHOLDER_TO_REPLACE_VALUE);
if (condition_position != std::string::npos)
{
auto query_copy = query;
query_copy.replace(condition_position, CONDITION_PLACEHOLDER_TO_REPLACE_VALUE.size(), true_condition);
return query_copy;
}
return query;
}
}

View File

@ -0,0 +1,15 @@
#pragma once
#include <string>
namespace DB
{
static constexpr std::string_view CONDITION_PLACEHOLDER_TO_REPLACE_VALUE = "{condition}";
/** In case UPDATE_FIELD is specified in {condition} for dictionary that must load all data.
* Replace {condition} with true_condition for initial dictionary load.
* For next dictionary loads {condition} will be updated with UPDATE_FIELD.
*/
std::string removeWhereConditionPlaceholder(const std::string & query);
}

View File

@ -6,7 +6,7 @@
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Dictionaries/DictionaryStructure.h>
#include <Databases/removeWhereConditionPlaceholder.h>
namespace DB
{
@ -24,7 +24,6 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
static constexpr std::string_view CONDITION_PLACEHOLDER_TO_REPLACE_VALUE = "{condition}";
ExternalQueryBuilder::ExternalQueryBuilder(
const DictionaryStructure & dict_struct_,
@ -82,23 +81,8 @@ std::string ExternalQueryBuilder::composeLoadAllQuery() const
writeChar(';', out);
return out.str();
}
else
{
/** In case UPDATE_FIELD is specified in {condition} for dictionary that must load all data.
* Replace {condition} with true_condition for initial dictionary load.
* For next dictionary loads {condition} will be updated with UPDATE_FIELD.
*/
static constexpr auto true_condition = "(1 = 1)";
auto condition_position = query.find(CONDITION_PLACEHOLDER_TO_REPLACE_VALUE);
if (condition_position != std::string::npos)
{
auto query_copy = query;
query_copy.replace(condition_position, CONDITION_PLACEHOLDER_TO_REPLACE_VALUE.size(), true_condition);
return query_copy;
}
return query;
}
return removeWhereConditionPlaceholder(query);
}
void ExternalQueryBuilder::composeLoadAllQuery(WriteBuffer & out) const

View File

@ -649,10 +649,12 @@ getInfoIfClickHouseDictionarySource(DictionaryConfigurationPtr & config, Context
String database = config->getString("dictionary.source.clickhouse.db", "");
String table = config->getString("dictionary.source.clickhouse.table", "");
if (table.empty())
return {};
info.query = config->getString("dictionary.source.clickhouse.query", "");
info.table_name = {database, table};
if (!table.empty())
info.table_name = {database, table};
else if (info.query.empty())
return {};
try
{

View File

@ -18,6 +18,7 @@ getDictionaryConfigurationFromAST(const ASTCreateQuery & query, ContextPtr conte
struct ClickHouseDictionarySourceInfo
{
QualifiedTableName table_name;
String query;
bool is_local = false;
};

View File

@ -47,14 +47,14 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
IAsynchronousReader & reader_,
const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
size_t min_bytes_for_seek_)
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0)
, read_settings(settings_)
, reader(reader_)
, base_priority(settings_.priority)
, impl(impl_)
, prefetch_buffer(settings_.prefetch_buffer_size)
, min_bytes_for_seek(min_bytes_for_seek_)
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr
? CurrentThread::getQueryId() : "")
, current_reader_id(getRandomASCIIString(8))
@ -63,6 +63,8 @@ AsynchronousReadIndirectBufferFromRemoteFS::AsynchronousReadIndirectBufferFromRe
#else
, log(&Poco::Logger::get("AsyncBuffer(" + impl->getFileName() + ")"))
#endif
, async_read_counters(async_read_counters_)
, prefetches_log(prefetches_log_)
{
ProfileEvents::increment(ProfileEvents::RemoteFSBuffers);
}
@ -111,7 +113,7 @@ bool AsynchronousReadIndirectBufferFromRemoteFS::hasPendingDataToRead()
std::future<IAsynchronousReader::Result> AsynchronousReadIndirectBufferFromRemoteFS::asyncReadInto(char * data, size_t size, int64_t priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl);
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, async_read_counters);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;
@ -186,8 +188,8 @@ void AsynchronousReadIndirectBufferFromRemoteFS::appendToPrefetchLog(FilesystemP
.reader_id = current_reader_id,
};
if (auto prefetch_log = Context::getGlobalContextInstance()->getFilesystemReadPrefetchesLog())
prefetch_log->add(elem);
if (prefetches_log)
prefetches_log->add(elem);
}
@ -335,7 +337,7 @@ off_t AsynchronousReadIndirectBufferFromRemoteFS::seek(off_t offset, int whence)
if (impl->initialized()
&& read_until_position && new_pos < *read_until_position
&& new_pos > file_offset_of_buffer_end
&& new_pos < file_offset_of_buffer_end + min_bytes_for_seek)
&& new_pos < file_offset_of_buffer_end + read_settings.remote_read_min_bytes_for_seek)
{
ProfileEvents::increment(ProfileEvents::RemoteFSLazySeeks);
bytes_to_ignore = new_pos - file_offset_of_buffer_end;

View File

@ -12,6 +12,7 @@ namespace Poco { class Logger; }
namespace DB
{
struct AsyncReadCounters;
class ReadBufferFromRemoteFSGather;
/**
@ -34,7 +35,8 @@ public:
explicit AsynchronousReadIndirectBufferFromRemoteFS(
IAsynchronousReader & reader_, const ReadSettings & settings_,
std::shared_ptr<ReadBufferFromRemoteFSGather> impl_,
size_t min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE);
std::shared_ptr<AsyncReadCounters> async_read_counters_,
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log_);
~AsynchronousReadIndirectBufferFromRemoteFS() override;
@ -83,8 +85,6 @@ private:
Memory<> prefetch_buffer;
size_t min_bytes_for_seek;
std::string query_id;
std::string current_reader_id;
@ -95,6 +95,9 @@ private:
Poco::Logger * log;
std::shared_ptr<AsyncReadCounters> async_read_counters;
std::shared_ptr<FilesystemReadPrefetchesLog> prefetches_log;
struct LastPrefetchInfo
{
UInt64 submit_time = 0;

View File

@ -48,7 +48,8 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
size_t file_size_,
bool allow_seeks_after_first_read_,
bool use_external_buffer_,
std::optional<size_t> read_until_position_)
std::optional<size_t> read_until_position_,
std::shared_ptr<FilesystemCacheLog> cache_log_)
: ReadBufferFromFileBase(settings_.remote_fs_buffer_size, nullptr, 0, file_size_)
#ifndef NDEBUG
, log(&Poco::Logger::get("CachedOnDiskReadBufferFromFile(" + source_file_path_ + ")"))
@ -62,12 +63,12 @@ CachedOnDiskReadBufferFromFile::CachedOnDiskReadBufferFromFile(
, read_until_position(read_until_position_ ? *read_until_position_ : file_size_)
, implementation_buffer_creator(implementation_buffer_creator_)
, query_id(query_id_)
, enable_logging(!query_id.empty() && settings_.enable_filesystem_cache_log)
, current_buffer_id(getRandomASCIIString(8))
, allow_seeks_after_first_read(allow_seeks_after_first_read_)
, use_external_buffer(use_external_buffer_)
, query_context_holder(cache_->getQueryContextHolder(query_id, settings_))
, is_persistent(settings_.is_file_cache_persistent)
, cache_log(cache_log_)
{
}
@ -103,7 +104,7 @@ void CachedOnDiskReadBufferFromFile::appendFilesystemCacheLog(
break;
}
if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog())
if (cache_log)
cache_log->add(elem);
}
@ -487,7 +488,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
auto * current_file_segment = &file_segments->front();
auto completed_range = current_file_segment->range();
if (enable_logging)
if (cache_log)
appendFilesystemCacheLog(completed_range, read_type);
chassert(file_offset_of_buffer_end > completed_range.right);
@ -512,7 +513,7 @@ bool CachedOnDiskReadBufferFromFile::completeFileSegmentAndGetNext()
CachedOnDiskReadBufferFromFile::~CachedOnDiskReadBufferFromFile()
{
if (enable_logging && file_segments && !file_segments->empty())
if (cache_log && file_segments && !file_segments->empty())
{
appendFilesystemCacheLog(file_segments->front().range(), read_type);
}
@ -936,6 +937,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
if (result)
{
bool download_current_segment_succeeded = false;
if (download_current_segment)
{
chassert(file_offset_of_buffer_end + size - 1 <= file_segment.range().right);
@ -954,6 +956,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
|| file_segment.getCurrentWriteOffset(false) == implementation_buffer->getFileOffsetOfBufferEnd());
LOG_TEST(log, "Successfully written {} bytes", size);
download_current_segment_succeeded = true;
// The implementation_buffer is valid and positioned correctly (at file_segment->getCurrentWriteOffset()).
// Later reads for this file segment can reuse it.
@ -962,14 +965,15 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
implementation_buffer_can_be_reused = true;
}
else
{
chassert(file_segment.state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
LOG_TRACE(log, "Bypassing cache because writeCache method failed");
}
}
else
{
LOG_TRACE(log, "No space left in cache to reserve {} bytes, will continue without cache download", size);
if (!success)
{
read_type = ReadType::REMOTE_FS_READ_BYPASS_CACHE;
chassert(file_segment.state() == FileSegment::State::PARTIALLY_DOWNLOADED_NO_CONTINUATION);
}
}
@ -990,6 +994,8 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
file_offset_of_buffer_end += size;
if (download_current_segment && download_current_segment_succeeded)
chassert(file_segment.getCurrentWriteOffset(false) >= file_offset_of_buffer_end);
chassert(file_offset_of_buffer_end <= read_until_position);
}

View File

@ -32,7 +32,8 @@ public:
size_t file_size_,
bool allow_seeks_after_first_read_,
bool use_external_buffer_,
std::optional<size_t> read_until_position_ = std::nullopt);
std::optional<size_t> read_until_position_,
std::shared_ptr<FilesystemCacheLog> cache_log_);
~CachedOnDiskReadBufferFromFile() override;
@ -137,7 +138,6 @@ private:
String last_caller_id;
String query_id;
bool enable_logging = false;
String current_buffer_id;
bool allow_seeks_after_first_read;
@ -148,6 +148,8 @@ private:
FileCache::QueryContextHolderPtr query_context_holder;
bool is_persistent;
std::shared_ptr<FilesystemCacheLog> cache_log;
};
}

View File

@ -153,27 +153,27 @@ FileSegment & FileSegmentRangeWriter::allocateFileSegment(size_t offset, FileSeg
void FileSegmentRangeWriter::appendFilesystemCacheLog(const FileSegment & file_segment)
{
if (cache_log)
if (!cache_log)
return;
auto file_segment_range = file_segment.range();
size_t file_segment_right_bound = file_segment_range.left + file_segment.getDownloadedSize(false) - 1;
FilesystemCacheLogElement elem
{
auto file_segment_range = file_segment.range();
size_t file_segment_right_bound = file_segment_range.left + file_segment.getDownloadedSize(false) - 1;
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = source_path,
.file_segment_range = { file_segment_range.left, file_segment_right_bound },
.requested_range = {},
.cache_type = FilesystemCacheLogElement::CacheType::WRITE_THROUGH_CACHE,
.file_segment_size = file_segment_range.size(),
.read_from_cache_attempted = false,
.read_buffer_id = {},
.profile_counters = nullptr,
};
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
.query_id = query_id,
.source_file_path = source_path,
.file_segment_range = { file_segment_range.left, file_segment_right_bound },
.requested_range = {},
.cache_type = FilesystemCacheLogElement::CacheType::WRITE_THROUGH_CACHE,
.file_segment_size = file_segment_range.size(),
.read_from_cache_attempted = false,
.read_buffer_id = {},
.profile_counters = nullptr,
};
cache_log->add(elem);
}
cache_log->add(elem);
}
void FileSegmentRangeWriter::completeFileSegment(FileSegment & file_segment)

View File

@ -8,30 +8,29 @@
#include <iostream>
#include <base/hex.h>
#include <Interpreters/FilesystemCacheLog.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_)
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_)
: ReadBuffer(nullptr, 0)
, read_buffer_creator(std::move(read_buffer_creator_))
, blobs_to_read(blobs_to_read_)
, settings(settings_)
, current_object(!blobs_to_read_.empty() ? blobs_to_read_.front() : throw Exception(ErrorCodes::LOGICAL_ERROR, "Attempt to read zero number of objects"))
, query_id(CurrentThread::isInitialized() && CurrentThread::get().getQueryContext() != nullptr ? CurrentThread::getQueryId() : "")
, log(&Poco::Logger::get("ReadBufferFromRemoteFSGather"))
, enable_cache_log(!query_id.empty() && settings.enable_filesystem_cache_log)
{
if (cache_log_ && settings.enable_filesystem_cache_log)
cache_log = cache_log_;
if (!blobs_to_read.empty())
current_object = blobs_to_read.front();
with_cache = settings.remote_fs_cache
&& settings.enable_filesystem_cache
&& (!query_id.empty() || settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache);
@ -39,7 +38,7 @@ ReadBufferFromRemoteFSGather::ReadBufferFromRemoteFSGather(
SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(const StoredObject & object)
{
if (current_buf != nullptr && !with_cache && enable_cache_log)
if (current_buf != nullptr && !with_cache)
{
appendFilesystemCacheLog();
}
@ -64,7 +63,8 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
object.bytes_size,
/* allow_seeks */false,
/* use_external_buffer */true,
read_until_position ? std::optional<size_t>(read_until_position) : std::nullopt);
read_until_position ? std::optional<size_t>(read_until_position) : std::nullopt,
cache_log);
}
return current_read_buffer_creator();
@ -72,7 +72,9 @@ SeekableReadBufferPtr ReadBufferFromRemoteFSGather::createImplementationBuffer(c
void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
{
chassert(!current_object.remote_path.empty());
if (!cache_log || current_object.remote_path.empty())
return;
FilesystemCacheLogElement elem
{
.event_time = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now()),
@ -83,9 +85,7 @@ void ReadBufferFromRemoteFSGather::appendFilesystemCacheLog()
.file_segment_size = total_bytes_read_from_current_file,
.read_from_cache_attempted = false,
};
if (auto cache_log = Context::getGlobalContextInstance()->getFilesystemCacheLog())
cache_log->add(elem);
cache_log->add(elem);
}
IAsynchronousReader::Result ReadBufferFromRemoteFSGather::readInto(char * data, size_t size, size_t offset, size_t ignore)
@ -99,9 +99,7 @@ IAsynchronousReader::Result ReadBufferFromRemoteFSGather::readInto(char * data,
file_offset_of_buffer_end = offset;
bytes_to_ignore = ignore;
assert(!bytes_to_ignore || initialized());
auto result = nextImpl();
const auto result = nextImpl();
if (result)
return { working_buffer.size(), BufferBase::offset(), nullptr };
@ -111,6 +109,9 @@ IAsynchronousReader::Result ReadBufferFromRemoteFSGather::readInto(char * data,
void ReadBufferFromRemoteFSGather::initialize()
{
if (blobs_to_read.empty())
return;
/// One clickhouse file can be split into multiple files in remote fs.
auto current_buf_offset = file_offset_of_buffer_end;
for (size_t i = 0; i < blobs_to_read.size(); ++i)
@ -144,21 +145,14 @@ bool ReadBufferFromRemoteFSGather::nextImpl()
if (!current_buf)
initialize();
/// If current buffer has remaining data - use it.
if (current_buf)
{
if (readImpl())
return true;
}
else
{
if (!current_buf)
return false;
}
if (readImpl())
return true;
if (!moveToNextBuffer())
{
return false;
}
return readImpl();
}
@ -274,10 +268,8 @@ size_t ReadBufferFromRemoteFSGather::getImplementationBufferOffset() const
ReadBufferFromRemoteFSGather::~ReadBufferFromRemoteFSGather()
{
if (!with_cache && enable_cache_log)
{
if (!with_cache)
appendFilesystemCacheLog();
}
}
}

View File

@ -25,7 +25,8 @@ public:
ReadBufferFromRemoteFSGather(
ReadBufferCreator && read_buffer_creator_,
const StoredObjects & blobs_to_read_,
const ReadSettings & settings_);
const ReadSettings & settings_,
std::shared_ptr<FilesystemCacheLog> cache_log_);
~ReadBufferFromRemoteFSGather() override;
@ -93,7 +94,7 @@ private:
size_t total_bytes_read_from_current_file = 0;
bool enable_cache_log = false;
std::shared_ptr<FilesystemCacheLog> cache_log;
};
}

View File

@ -11,7 +11,6 @@
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <IO/SeekableReadBuffer.h>
#include <IO/AsyncReadCounters.h>
#include <Interpreters/Context.h>
#include <base/getThreadId.h>
#include <future>
@ -75,17 +74,11 @@ std::future<IAsynchronousReader::Result> ThreadPoolRemoteFSReader::submit(Reques
return scheduleFromThreadPool<Result>([request]() -> Result
{
CurrentMetrics::Increment metric_increment{CurrentMetrics::RemoteRead};
std::optional<AsyncReadIncrement> increment;
if (CurrentThread::isInitialized())
{
auto query_context = CurrentThread::get().getQueryContext();
if (query_context)
increment.emplace(query_context->getAsyncReadCounters());
}
auto * remote_fs_fd = assert_cast<RemoteFSFileDescriptor *>(request.descriptor.get());
auto async_read_counters = remote_fs_fd->getReadCounters();
std::optional<AsyncReadIncrement> increment = async_read_counters ? std::optional<AsyncReadIncrement>(async_read_counters) : std::nullopt;
auto watch = std::make_unique<Stopwatch>(CLOCK_MONOTONIC);
Result result = remote_fs_fd->readInto(request.buf, request.size, request.offset, request.ignore);
watch->stop();

View File

@ -8,6 +8,8 @@
namespace DB
{
struct AsyncReadCounters;
class ThreadPoolRemoteFSReader : public IAsynchronousReader
{
public:
@ -24,12 +26,19 @@ private:
class RemoteFSFileDescriptor : public IAsynchronousReader::IFileDescriptor
{
public:
explicit RemoteFSFileDescriptor(ReadBuffer & reader_) : reader(reader_) { }
explicit RemoteFSFileDescriptor(
ReadBuffer & reader_,
std::shared_ptr<AsyncReadCounters> async_read_counters_)
: reader(reader_)
, async_read_counters(async_read_counters_) {}
IAsynchronousReader::Result readInto(char * data, size_t size, size_t offset, size_t ignore = 0);
std::shared_ptr<AsyncReadCounters> getReadCounters() const { return async_read_counters; }
private:
ReadBuffer & reader;
std::shared_ptr<AsyncReadCounters> async_read_counters;
};
}

View File

@ -5,7 +5,9 @@
#include <IO/AsynchronousReadBufferFromFile.h>
#include <Disks/IO/IOUringReader.h>
#include <Disks/IO/ThreadPoolReader.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <IO/SynchronousReader.h>
#include <IO/AsynchronousReader.h>
#include <Common/ProfileEvents.h>
#include "config.h"
@ -27,7 +29,6 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
}
std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
const std::string & filename,
const ReadSettings & settings,
@ -119,11 +120,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
}
else if (settings.local_fs_method == LocalFSReadMethod::pread_fake_async)
{
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER);
auto & reader = getThreadPoolReader(FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader,
settings.priority,
@ -137,11 +134,7 @@ std::unique_ptr<ReadBufferFromFileBase> createReadBufferFromFileBase(
}
else if (settings.local_fs_method == LocalFSReadMethod::pread_threadpool)
{
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
auto & reader = context->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER);
auto & reader = getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER);
res = std::make_unique<AsynchronousReadBufferFromFileWithDescriptorsCache>(
reader,
settings.priority,

View File

@ -0,0 +1,76 @@
#include <Common/ErrorCodes.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <IO/AsynchronousReader.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <Poco/Util/Application.h>
#include <IO/SynchronousReader.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Disks/IO/ThreadPoolReader.h>
#ifndef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
#include <Interpreters/Context.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type)
{
#ifdef CLICKHOUSE_PROGRAM_STANDALONE_BUILD
const auto & config = Poco::Util::Application::instance().config();
switch (type)
{
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
static auto asynchronous_remote_fs_reader = createThreadPoolReader(type, config);
return *asynchronous_remote_fs_reader;
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
static auto asynchronous_local_fs_reader = createThreadPoolReader(type, config);
return *asynchronous_local_fs_reader;
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
static auto synchronous_local_fs_reader = createThreadPoolReader(type, config);
return *synchronous_local_fs_reader;
}
}
#else
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
return context->getThreadPoolReader(type);
#endif
}
std::unique_ptr<IAsynchronousReader> createThreadPoolReader(
FilesystemReaderType type, const Poco::Util::AbstractConfiguration & config)
{
switch (type)
{
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
auto pool_size = config.getUInt(".threadpool_remote_fs_reader_pool_size", 250);
auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000);
return std::make_unique<ThreadPoolRemoteFSReader>(pool_size, queue_size);
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
auto pool_size = config.getUInt(".threadpool_local_fs_reader_pool_size", 100);
auto queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000);
return std::make_unique<ThreadPoolReader>(pool_size, queue_size);
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
return std::make_unique<SynchronousReader>();
}
}
}
}

View File

@ -0,0 +1,23 @@
#pragma once
namespace Poco::Util { class AbstractConfiguration; }
namespace DB
{
class IAsynchronousReader;
enum class FilesystemReaderType
{
SYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_REMOTE_FS_READER,
};
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type);
std::unique_ptr<IAsynchronousReader> createThreadPoolReader(
FilesystemReaderType type,
const Poco::Util::AbstractConfiguration & config);
}

View File

@ -10,6 +10,7 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/ObjectStorages/AzureBlobStorage/AzureBlobStorageAuth.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
@ -86,6 +87,7 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
{
ReadSettings disk_read_settings = patchSettings(read_settings);
auto settings_ptr = settings.get();
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator =
[this, settings_ptr, disk_read_settings]
@ -104,12 +106,16 @@ std::unique_ptr<ReadBufferFromFileBase> AzureObjectStorage::readObjects( /// NOL
auto reader_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
disk_read_settings);
disk_read_settings,
global_context->getFilesystemCacheLog());
if (disk_read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, disk_read_settings, std::move(reader_impl));
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, disk_read_settings, std::move(reader_impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -4,6 +4,7 @@
#include <IO/BoundedReadBuffer.h>
#include <Disks/IO/CachedOnDiskWriteBufferFromFile.h>
#include <Disks/IO/CachedOnDiskReadBufferFromFile.h>
#include <Interpreters/Context.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileCacheFactory.h>
#include <Common/CurrentThread.h>

View File

@ -74,7 +74,7 @@ std::unique_ptr<ReadBufferFromFileBase> HDFSObjectStorage::readObjects( /// NOLI
hdfs_uri, hdfs_path, config, disk_read_settings, /* read_until_position */0, /* use_external_buffer */true);
};
auto hdfs_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, disk_read_settings);
auto hdfs_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), objects, disk_read_settings, nullptr);
auto buf = std::make_unique<ReadIndirectBufferFromRemoteFS>(std::move(hdfs_impl), read_settings);
return std::make_unique<SeekAvoidingReadBuffer>(std::move(buf), settings->min_bytes_for_seek);
}

View File

@ -26,15 +26,6 @@ void IObjectStorage::getDirectoryContents(const std::string &,
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "getDirectoryContents() is not supported");
}
IAsynchronousReader & IObjectStorage::getThreadPoolReader()
{
auto context = Context::getGlobalContextInstance();
if (!context)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Global context not initialized");
return context->getThreadPoolReader(Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
}
ThreadPool & IObjectStorage::getThreadPoolWriter()
{
auto context = Context::getGlobalContextInstance();

View File

@ -157,8 +157,6 @@ public:
virtual const std::string & getCacheName() const;
static IAsynchronousReader & getThreadPoolReader();
static ThreadPool & getThreadPoolWriter();
virtual void shutdown() = 0;

View File

@ -51,6 +51,7 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
std::optional<size_t> file_size) const
{
auto modified_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto read_buffer_creator =
[=] (const std::string & file_path, size_t /* read_until_position */)
-> std::unique_ptr<ReadBufferFromFileBase>
@ -59,14 +60,18 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObjects( /// NOL
};
auto impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator), objects, modified_settings);
std::move(read_buffer_creator), objects, modified_settings,
global_context->getFilesystemCacheLog());
/// We use `remove_fs_method` (not `local_fs_method`) because we are about to use
/// AsynchronousReadIndirectBufferFromRemoteFS which works by the remote_fs_* settings.
if (modified_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, modified_settings, std::move(impl));
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, modified_settings, std::move(impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -98,6 +98,7 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
std::optional<size_t>) const
{
ReadSettings disk_read_settings = patchSettings(read_settings);
auto global_context = Context::getGlobalContextInstance();
auto settings_ptr = s3_settings.get();
@ -121,13 +122,16 @@ std::unique_ptr<ReadBufferFromFileBase> S3ObjectStorage::readObjects( /// NOLINT
auto s3_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
objects,
disk_read_settings);
disk_read_settings,
global_context->getFilesystemCacheLog());
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = getThreadPoolReader();
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, disk_read_settings, std::move(s3_impl), disk_read_settings.remote_read_min_bytes_for_seek);
reader, disk_read_settings, std::move(s3_impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -13,6 +13,7 @@
#include <Disks/IO/ReadBufferFromRemoteFSGather.h>
#include <Disks/IO/ReadBufferFromWebServer.h>
#include <Disks/IO/ThreadPoolRemoteFSReader.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <Storages/MergeTree/MergeTreeData.h>
@ -179,12 +180,20 @@ std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObject( /// NOLINT
read_until_position);
};
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(std::move(read_buffer_creator), StoredObjects{object}, read_settings);
auto global_context = Context::getGlobalContextInstance();
auto web_impl = std::make_unique<ReadBufferFromRemoteFSGather>(
std::move(read_buffer_creator),
StoredObjects{object},
read_settings,
global_context->getFilesystemCacheLog());
if (read_settings.remote_fs_method == RemoteFSReadMethod::threadpool)
{
auto & reader = IObjectStorage::getThreadPoolReader();
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(reader, read_settings, std::move(web_impl), min_bytes_for_seek);
auto & reader = global_context->getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER);
return std::make_unique<AsynchronousReadIndirectBufferFromRemoteFS>(
reader, read_settings, std::move(web_impl),
global_context->getAsyncReadCounters(),
global_context->getFilesystemReadPrefetchesLog());
}
else
{

View File

@ -808,23 +808,20 @@ struct ImplBLAKE3
static constexpr auto name = "BLAKE3";
enum { length = 32 };
#if !USE_BLAKE3
[[noreturn]] static void apply(const char * begin, const size_t size, unsigned char* out_char_data)
#if !USE_BLAKE3
[[noreturn]] static void apply(const char * /*begin*/, const size_t /*size*/, unsigned char * /*out_char_data*/)
{
UNUSED(begin);
UNUSED(size);
UNUSED(out_char_data);
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "BLAKE3 is not available. Rust code or BLAKE3 itself may be disabled.");
}
#else
#else
static void apply(const char * begin, const size_t size, unsigned char* out_char_data)
{
#if defined(MEMORY_SANITIZER)
# if defined(MEMORY_SANITIZER)
auto err_msg = blake3_apply_shim_msan_compat(begin, safe_cast<uint32_t>(size), out_char_data);
__msan_unpoison(out_char_data, length);
#else
# else
auto err_msg = blake3_apply_shim(begin, safe_cast<uint32_t>(size), out_char_data);
#endif
# endif
if (err_msg != nullptr)
{
auto err_st = std::string(err_msg);
@ -832,7 +829,7 @@ struct ImplBLAKE3
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Function returned error message: {}", err_st);
}
}
#endif
#endif
};
template <typename Impl>

View File

@ -228,6 +228,8 @@ REGISTER_FUNCTION(ExtractKeyValuePairs)
```)")
);
factory.registerAlias("str_to_map", NameExtractKeyValuePairs::name, FunctionFactory::CaseInsensitive);
factory.registerAlias("mapFromString", NameExtractKeyValuePairs::name);
}
}

View File

@ -1,5 +1,6 @@
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
@ -20,7 +21,6 @@ namespace DB
{
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int ARGUMENT_OUT_OF_BOUND;
}
@ -28,13 +28,7 @@ namespace ErrorCodes
namespace
{
/// A helper function to simplify comparisons of valid YYYY-MM-DD values for <,>,=
inline constexpr Int64 YearMonthDayToSingleInt(Int64 year, Int64 month, Int64 day)
{
return year * 512 + month * 32 + day;
}
/// Common logic to handle numeric arguments like year, month, day, hour, minute, second
/// Functions common to makeDate, makeDate32, makeDateTime, makeDateTime64
class FunctionWithNumericParamsBase : public IFunction
{
public:
@ -49,36 +43,23 @@ public:
bool useDefaultImplementationForConstants() const override { return true; }
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
protected:
template <class ArgumentNames>
void checkRequiredArguments(const ColumnsWithTypeAndName & arguments, const ArgumentNames & argument_names, const size_t optional_argument_count) const
{
if (arguments.size() < argument_names.size() || arguments.size() > argument_names.size() + optional_argument_count)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Function {} requires {} to {} arguments, but {} given",
getName(), argument_names.size(), argument_names.size() + optional_argument_count, arguments.size());
for (size_t i = 0; i < argument_names.size(); ++i)
{
DataTypePtr argument_type = arguments[i].type;
if (!isNumber(argument_type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Argument '{}' for function {} must be a number", std::string(argument_names[i]), getName());
}
}
template <class ArgumentNames>
void convertRequiredArguments(const ColumnsWithTypeAndName & arguments, const ArgumentNames & argument_names, Columns & converted_arguments) const
Columns convertMandatoryArguments(const ColumnsWithTypeAndName & arguments, const ArgumentNames & argument_names) const
{
Columns converted_arguments;
const DataTypePtr converted_argument_type = std::make_shared<DataTypeFloat32>();
converted_arguments.clear();
converted_arguments.reserve(arguments.size());
for (size_t i = 0; i < argument_names.size(); ++i)
{
ColumnPtr argument_column = castColumn(arguments[i], converted_argument_type);
argument_column = argument_column->convertToFullColumnIfConst();
converted_arguments.push_back(argument_column);
}
return converted_arguments;
}
};
@ -87,7 +68,8 @@ template <typename Traits>
class FunctionMakeDate : public FunctionWithNumericParamsBase
{
private:
static constexpr std::array argument_names = {"year", "month", "day"};
static constexpr std::array mandatory_argument_names_year_month_day = {"year", "month", "day"};
static constexpr std::array mandatory_argument_names_year_dayofyear = {"year", "dayofyear"};
public:
static constexpr auto name = Traits::name;
@ -96,56 +78,103 @@ public:
String getName() const override { return name; }
bool isVariadic() const override { return false; }
size_t getNumberOfArguments() const override { return argument_names.size(); }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
checkRequiredArguments(arguments, argument_names, 0);
const bool isYearMonthDayVariant = (arguments.size() == 3);
if (isYearMonthDayVariant)
{
FunctionArgumentDescriptors args{
{mandatory_argument_names_year_month_day[0], &isNumber<IDataType>, nullptr, "Number"},
{mandatory_argument_names_year_month_day[1], &isNumber<IDataType>, nullptr, "Number"},
{mandatory_argument_names_year_month_day[2], &isNumber<IDataType>, nullptr, "Number"}
};
validateFunctionArgumentTypes(*this, arguments, args);
}
else
{
FunctionArgumentDescriptors args{
{mandatory_argument_names_year_dayofyear[0], &isNumber<IDataType>, nullptr, "Number"},
{mandatory_argument_names_year_dayofyear[1], &isNumber<IDataType>, nullptr, "Number"}
};
validateFunctionArgumentTypes(*this, arguments, args);
}
return std::make_shared<typename Traits::ReturnDataType>();
}
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr &, size_t input_rows_count) const override
{
const bool isYearMonthDayVariant = (arguments.size() == 3);
Columns converted_arguments;
convertRequiredArguments(arguments, argument_names, converted_arguments);
if (isYearMonthDayVariant)
converted_arguments = convertMandatoryArguments(arguments, mandatory_argument_names_year_month_day);
else
converted_arguments = convertMandatoryArguments(arguments, mandatory_argument_names_year_dayofyear);
auto res_column = Traits::ReturnDataType::ColumnType::create(input_rows_count);
auto & result_data = res_column->getData();
const auto & year_data = typeid_cast<const ColumnFloat32 &>(*converted_arguments[0]).getData();
const auto & month_data = typeid_cast<const ColumnFloat32 &>(*converted_arguments[1]).getData();
const auto & day_data = typeid_cast<const ColumnFloat32 &>(*converted_arguments[2]).getData();
const auto & date_lut = DateLUT::instance();
const Int32 max_days_since_epoch = date_lut.makeDayNum(Traits::MAX_DATE[0], Traits::MAX_DATE[1], Traits::MAX_DATE[2]);
for (size_t i = 0; i < input_rows_count; ++i)
if (isYearMonthDayVariant)
{
const auto year = year_data[i];
const auto month = month_data[i];
const auto day = day_data[i];
const auto & year_data = typeid_cast<const ColumnFloat32 &>(*converted_arguments[0]).getData();
const auto & month_data = typeid_cast<const ColumnFloat32 &>(*converted_arguments[1]).getData();
const auto & day_data = typeid_cast<const ColumnFloat32 &>(*converted_arguments[2]).getData();
Int32 day_num = 0;
if (year >= Traits::MIN_YEAR &&
year <= Traits::MAX_YEAR &&
month >= 1 && month <= 12 &&
day >= 1 && day <= 31 &&
YearMonthDayToSingleInt(static_cast<Int64>(year), static_cast<Int64>(month), static_cast<Int64>(day)) <= Traits::MAX_DATE)
for (size_t i = 0; i < input_rows_count; ++i)
{
day_num = date_lut.makeDayNum(static_cast<Int16>(year), static_cast<UInt8>(month), static_cast<UInt8>(day));
}
const auto year = year_data[i];
const auto month = month_data[i];
const auto day = day_data[i];
result_data[i] = day_num;
Int32 day_num = 0;
if (year >= Traits::MIN_YEAR &&
year <= Traits::MAX_YEAR &&
month >= 1 && month <= 12 &&
day >= 1 && day <= 31)
{
Int32 days_since_epoch = date_lut.makeDayNum(static_cast<Int16>(year), static_cast<UInt8>(month), static_cast<UInt8>(day));
if (days_since_epoch <= max_days_since_epoch)
day_num = days_since_epoch;
}
result_data[i] = day_num;
}
}
else
{
const auto & year_data = typeid_cast<const ColumnFloat32 &>(*converted_arguments[0]).getData();
const auto & dayofyear_data = typeid_cast<const ColumnFloat32 &>(*converted_arguments[1]).getData();
for (size_t i = 0; i < input_rows_count; ++i)
{
const auto year = year_data[i];
const auto dayofyear = dayofyear_data[i];
Int32 day_num = 0;
if (year >= Traits::MIN_YEAR &&
year <= Traits::MAX_YEAR &&
dayofyear >= 1 && dayofyear <= 365)
{
Int32 days_since_epoch = date_lut.makeDayNum(static_cast<Int16>(year), 1, 1) + static_cast<Int32>(dayofyear) - 1;
if (days_since_epoch <= max_days_since_epoch)
day_num = days_since_epoch;
}
result_data[i] = day_num;
}
}
return res_column;
}
};
/// makeDate(year, month, day)
struct MakeDateTraits
{
static constexpr auto name = "makeDate";
@ -154,10 +183,9 @@ struct MakeDateTraits
static constexpr auto MIN_YEAR = 1970;
static constexpr auto MAX_YEAR = 2149;
/// This date has the maximum day number that fits in 16-bit uint
static constexpr auto MAX_DATE = YearMonthDayToSingleInt(MAX_YEAR, 6, 6);
static constexpr std::array MAX_DATE = {MAX_YEAR, 6, 6};
};
/// makeDate32(year, month, day)
struct MakeDate32Traits
{
static constexpr auto name = "makeDate32";
@ -165,30 +193,14 @@ struct MakeDate32Traits
static constexpr auto MIN_YEAR = 1900;
static constexpr auto MAX_YEAR = 2299;
static constexpr auto MAX_DATE = YearMonthDayToSingleInt(MAX_YEAR, 12, 31);
static constexpr std::array MAX_DATE = {MAX_YEAR, 12, 31};
};
/// Common implementation for makeDateTime, makeDateTime64
class FunctionMakeDateTimeBase : public FunctionWithNumericParamsBase
{
protected:
static constexpr std::array argument_names = {"year", "month", "day", "hour", "minute", "second"};
public:
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
protected:
void checkRequiredArguments(const ColumnsWithTypeAndName & arguments, const size_t optional_argument_count) const
{
FunctionWithNumericParamsBase::checkRequiredArguments(arguments, argument_names, optional_argument_count);
}
void convertRequiredArguments(const ColumnsWithTypeAndName & arguments, Columns & converted_arguments) const
{
FunctionWithNumericParamsBase::convertRequiredArguments(arguments, argument_names, converted_arguments);
}
static constexpr std::array mandatory_argument_names = {"year", "month", "day", "hour", "minute", "second"};
template <typename T>
static Int64 dateTime(T year, T month, T day_of_month, T hour, T minute, T second, const DateLUTImpl & lut)
@ -235,7 +247,7 @@ protected:
class FunctionMakeDateTime : public FunctionMakeDateTimeBase
{
private:
static constexpr std::array<const char*, 1> optional_argument_names = {"timezone"};
static constexpr std::array optional_argument_names = {"timezone"};
public:
static constexpr auto name = "makeDateTime";
@ -246,11 +258,24 @@ public:
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
checkRequiredArguments(arguments, optional_argument_names.size());
FunctionArgumentDescriptors mandatory_args{
{mandatory_argument_names[0], &isNumber<IDataType>, nullptr, "Number"},
{mandatory_argument_names[1], &isNumber<IDataType>, nullptr, "Number"},
{mandatory_argument_names[2], &isNumber<IDataType>, nullptr, "Number"},
{mandatory_argument_names[3], &isNumber<IDataType>, nullptr, "Number"},
{mandatory_argument_names[4], &isNumber<IDataType>, nullptr, "Number"},
{mandatory_argument_names[5], &isNumber<IDataType>, nullptr, "Number"}
};
FunctionArgumentDescriptors optional_args{
{optional_argument_names[0], &isString<IDataType>, nullptr, "String"}
};
validateFunctionArgumentTypes(*this, arguments, mandatory_args, optional_args);
/// Optional timezone argument
std::string timezone;
if (arguments.size() == argument_names.size() + 1)
if (arguments.size() == mandatory_argument_names.size() + 1)
timezone = extractTimezone(arguments.back());
return std::make_shared<DataTypeDateTime>(timezone);
@ -260,11 +285,10 @@ public:
{
/// Optional timezone argument
std::string timezone;
if (arguments.size() == argument_names.size() + 1)
if (arguments.size() == mandatory_argument_names.size() + 1)
timezone = extractTimezone(arguments.back());
Columns converted_arguments;
convertRequiredArguments(arguments, converted_arguments);
Columns converted_arguments = convertMandatoryArguments(arguments, mandatory_argument_names);
auto res_column = ColumnDateTime::create(input_rows_count);
auto & result_data = res_column->getData();
@ -300,11 +324,11 @@ public:
}
};
/// makeDateTime64(year, month, day, hour, minute, second, [fraction], [precision], [timezone])
/// makeDateTime64(year, month, day, hour, minute, second[, fraction[, precision[, timezone]]])
class FunctionMakeDateTime64 : public FunctionMakeDateTimeBase
{
private:
static constexpr std::array<const char*, 3> optional_argument_names = {"fraction", "precision", "timezone"};
static constexpr std::array optional_argument_names = {"fraction", "precision", "timezone"};
static constexpr UInt8 DEFAULT_PRECISION = 3;
public:
@ -316,11 +340,26 @@ public:
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{
checkRequiredArguments(arguments, optional_argument_names.size());
FunctionArgumentDescriptors mandatory_args{
{mandatory_argument_names[0], &isNumber<IDataType>, nullptr, "Number"},
{mandatory_argument_names[1], &isNumber<IDataType>, nullptr, "Number"},
{mandatory_argument_names[2], &isNumber<IDataType>, nullptr, "Number"},
{mandatory_argument_names[3], &isNumber<IDataType>, nullptr, "Number"},
{mandatory_argument_names[4], &isNumber<IDataType>, nullptr, "Number"},
{mandatory_argument_names[5], &isNumber<IDataType>, nullptr, "Number"}
};
if (arguments.size() >= argument_names.size() + 1)
FunctionArgumentDescriptors optional_args{
{optional_argument_names[0], &isNumber<IDataType>, nullptr, "Number"},
{optional_argument_names[1], &isNumber<IDataType>, nullptr, "Number"},
{optional_argument_names[2], &isString<IDataType>, nullptr, "String"}
};
validateFunctionArgumentTypes(*this, arguments, mandatory_args, optional_args);
if (arguments.size() >= mandatory_argument_names.size() + 1)
{
const auto& fraction_argument = arguments[argument_names.size()];
const auto& fraction_argument = arguments[mandatory_argument_names.size()];
if (!isNumber(fraction_argument.type))
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT,
"Argument 'fraction' for function {} must be a number", getName());
@ -328,12 +367,12 @@ public:
/// Optional precision argument
Int64 precision = DEFAULT_PRECISION;
if (arguments.size() >= argument_names.size() + 2)
precision = extractPrecision(arguments[argument_names.size() + 1]);
if (arguments.size() >= mandatory_argument_names.size() + 2)
precision = extractPrecision(arguments[mandatory_argument_names.size() + 1]);
/// Optional timezone argument
std::string timezone;
if (arguments.size() == argument_names.size() + 3)
if (arguments.size() == mandatory_argument_names.size() + 3)
timezone = extractTimezone(arguments.back());
return std::make_shared<DataTypeDateTime64>(precision, timezone);
@ -343,22 +382,21 @@ public:
{
/// Optional precision argument
Int64 precision = DEFAULT_PRECISION;
if (arguments.size() >= argument_names.size() + 2)
precision = extractPrecision(arguments[argument_names.size() + 1]);
if (arguments.size() >= mandatory_argument_names.size() + 2)
precision = extractPrecision(arguments[mandatory_argument_names.size() + 1]);
/// Optional timezone argument
std::string timezone;
if (arguments.size() == argument_names.size() + 3)
if (arguments.size() == mandatory_argument_names.size() + 3)
timezone = extractTimezone(arguments.back());
Columns converted_arguments;
convertRequiredArguments(arguments, converted_arguments);
Columns converted_arguments = convertMandatoryArguments(arguments, mandatory_argument_names);
/// Optional fraction argument
const ColumnVector<Float64>::Container * fraction_data = nullptr;
if (arguments.size() >= argument_names.size() + 1)
if (arguments.size() >= mandatory_argument_names.size() + 1)
{
ColumnPtr fraction_column = castColumn(arguments[argument_names.size()], std::make_shared<DataTypeFloat64>());
ColumnPtr fraction_column = castColumn(arguments[mandatory_argument_names.size()], std::make_shared<DataTypeFloat64>());
fraction_column = fraction_column->convertToFullColumnIfConst();
converted_arguments.push_back(fraction_column);
fraction_data = &typeid_cast<const ColumnFloat64 &>(*converted_arguments[6]).getData();
@ -439,7 +477,7 @@ private:
REGISTER_FUNCTION(MakeDate)
{
factory.registerFunction<FunctionMakeDate<MakeDateTraits>>();
factory.registerFunction<FunctionMakeDate<MakeDateTraits>>({}, FunctionFactory::CaseInsensitive);
factory.registerFunction<FunctionMakeDate<MakeDate32Traits>>();
factory.registerFunction<FunctionMakeDateTime>();
factory.registerFunction<FunctionMakeDateTime64>();

View File

@ -112,6 +112,22 @@ std::unique_ptr<Client> Client::create(const Client & other)
return std::unique_ptr<Client>(new Client(other));
}
namespace
{
ProviderType deduceProviderType(const std::string & url)
{
if (url.find(".amazonaws.com") != std::string::npos)
return ProviderType::AWS;
if (url.find("storage.googleapis.com") != std::string::npos)
return ProviderType::GCS;
return ProviderType::UNKNOWN;
}
}
Client::Client(
size_t max_redirects_,
ServerSideEncryptionKMSConfig sse_kms_config_,
@ -128,9 +144,28 @@ Client::Client(
endpoint_provider->GetBuiltInParameters().GetParameter("Region").GetString(explicit_region);
endpoint_provider->GetBuiltInParameters().GetParameter("Endpoint").GetString(initial_endpoint);
provider_type = getProviderTypeFromURL(initial_endpoint);
provider_type = deduceProviderType(initial_endpoint);
LOG_TRACE(log, "Provider type: {}", toString(provider_type));
if (provider_type == ProviderType::GCS)
{
/// GCS can operate in 2 modes for header and query params names:
/// - with both x-amz and x-goog prefixes allowed (but cannot mix different prefixes in same request)
/// - only with x-goog prefix
/// first mode is allowed only with HMAC (or unsigned requests) so when we
/// find credential keys we can simply behave as the underlying storage is S3
/// otherwise, we need to be aware we are making requests to GCS
/// and replace all headers with a valid prefix when needed
if (credentials_provider)
{
auto credentials = credentials_provider->GetAWSCredentials();
if (credentials.IsEmpty())
api_mode = ApiMode::GCS;
}
}
LOG_TRACE(log, "API mode: {}", toString(api_mode));
detect_region = provider_type == ProviderType::AWS && explicit_region == Aws::Region::AWS_GLOBAL;
cache = std::make_shared<ClientCache>();
@ -208,7 +243,7 @@ Model::HeadObjectOutcome Client::HeadObject(const HeadObjectRequest & request) c
{
const auto & bucket = request.GetBucket();
request.setProviderType(provider_type);
request.setApiMode(api_mode);
if (auto region = getRegionForBucket(bucket); !region.empty())
{
@ -348,7 +383,7 @@ std::invoke_result_t<RequestFn, RequestType>
Client::doRequest(const RequestType & request, RequestFn request_fn) const
{
const auto & bucket = request.GetBucket();
request.setProviderType(provider_type);
request.setApiMode(api_mode);
if (auto region = getRegionForBucket(bucket); !region.empty())
{
@ -421,9 +456,23 @@ Client::doRequest(const RequestType & request, RequestFn request_fn) const
throw Exception(ErrorCodes::TOO_MANY_REDIRECTS, "Too many redirects");
}
ProviderType Client::getProviderType() const
bool Client::supportsMultiPartCopy() const
{
return provider_type;
return provider_type != ProviderType::GCS;
}
void Client::BuildHttpRequest(const Aws::AmazonWebServiceRequest& request,
const std::shared_ptr<Aws::Http::HttpRequest>& httpRequest) const
{
Aws::S3::S3Client::BuildHttpRequest(request, httpRequest);
if (api_mode == ApiMode::GCS)
{
/// some GCS requests don't like S3 specific headers that the client sets
httpRequest->DeleteHeader("x-amz-api-version");
httpRequest->DeleteHeader("amz-sdk-invocation-id");
httpRequest->DeleteHeader("amz-sdk-request");
}
}
std::string Client::getRegionForBucket(const std::string & bucket, bool force_detect) const

View File

@ -190,7 +190,10 @@ public:
using Aws::S3::S3Client::EnableRequestProcessing;
using Aws::S3::S3Client::DisableRequestProcessing;
ProviderType getProviderType() const;
void BuildHttpRequest(const Aws::AmazonWebServiceRequest& request,
const std::shared_ptr<Aws::Http::HttpRequest>& httpRequest) const override;
bool supportsMultiPartCopy() const;
private:
Client(size_t max_redirects_,
ServerSideEncryptionKMSConfig sse_kms_config_,
@ -238,7 +241,12 @@ private:
std::string explicit_region;
mutable bool detect_region = true;
/// provider type can determine if some functionality is supported
/// but for same provider, we would need to generate different headers depending on the
/// mode
/// E.g. GCS can work in AWS mode in some cases and accept headers with x-amz prefix
ProviderType provider_type{ProviderType::UNKNOWN};
ApiMode api_mode{ApiMode::AWS};
mutable std::shared_ptr<ClientCache> cache;

View File

@ -260,17 +260,6 @@ void PocoHTTPClient::makeRequestInternal(
Poco::Logger * log = &Poco::Logger::get("AWSClient");
auto uri = request.GetUri().GetURIString();
#if 0
auto provider_type = getProviderTypeFromURL(uri);
if (provider_type == ProviderType::GCS)
{
/// some GCS requests don't like S3 specific headers that the client sets
request.DeleteHeader("x-amz-api-version");
request.DeleteHeader("amz-sdk-invocation-id");
request.DeleteHeader("amz-sdk-request");
}
#endif
if (enable_s3_requests_logging)
LOG_TEST(log, "Make request to: {}", uri);

View File

@ -22,20 +22,17 @@ std::string_view toString(ProviderType provider_type)
}
}
bool supportsMultiPartCopy(ProviderType provider_type)
std::string_view toString(ApiMode api_mode)
{
return provider_type != ProviderType::GCS;
}
using enum ApiMode;
ProviderType getProviderTypeFromURL(const std::string & url)
{
if (url.find(".amazonaws.com") != std::string::npos)
return ProviderType::AWS;
if (url.find("storage.googleapis.com") != std::string::npos)
return ProviderType::GCS;
return ProviderType::UNKNOWN;
switch (api_mode)
{
case AWS:
return "AWS";
case GCS:
return "GCS";
}
}
}

View File

@ -10,6 +10,11 @@
namespace DB::S3
{
/// Provider type defines the platform containing the object
/// we are trying to access
/// This information is useful for determining general support for
/// some feature like multipart copy which is currently supported by AWS
/// but not by GCS
enum class ProviderType : uint8_t
{
AWS,
@ -19,9 +24,20 @@ enum class ProviderType : uint8_t
std::string_view toString(ProviderType provider_type);
bool supportsMultiPartCopy(ProviderType provider_type);
/// Mode in which we can use the XML API
/// This value can be same as the provider type but there can be a difference
/// For example, GCS can work in both
/// AWS compatible mode (accept headers starting with x-amz)
/// and GCS mode (accept only headers starting with x-goog)
/// Because GCS mode is enforced when some features are used we
/// need to have support for both.
enum class ApiMode : uint8_t
{
AWS,
GCS
};
ProviderType getProviderTypeFromURL(const std::string & url);
std::string_view toString(ApiMode api_mode);
}

View File

@ -10,7 +10,7 @@ namespace DB::S3
Aws::Http::HeaderValueCollection CopyObjectRequest::GetRequestSpecificHeaders() const
{
auto headers = Model::CopyObjectRequest::GetRequestSpecificHeaders();
if (provider_type != ProviderType::GCS)
if (api_mode != ApiMode::GCS)
return headers;
/// GCS supports same headers as S3 but with a prefix x-goog instead of x-amz

View File

@ -62,15 +62,15 @@ public:
return uri_override;
}
void setProviderType(ProviderType provider_type_) const
void setApiMode(ApiMode api_mode_) const
{
provider_type = provider_type_;
api_mode = api_mode_;
}
protected:
mutable std::string region_override;
mutable std::optional<S3::URI> uri_override;
mutable ProviderType provider_type{ProviderType::UNKNOWN};
mutable ApiMode api_mode{ApiMode::AWS};
};
class CopyObjectRequest : public ExtendedRequest<Model::CopyObjectRequest>

View File

@ -595,7 +595,7 @@ namespace
, src_key(src_key_)
, offset(src_offset_)
, size(src_size_)
, supports_multipart_copy(S3::supportsMultiPartCopy(client_ptr_->getProviderType()))
, supports_multipart_copy(client_ptr_->supportsMultiPartCopy())
{
}

View File

@ -61,7 +61,7 @@ struct CreateFileSegmentSettings
: kind(kind_), unbounded(unbounded_) {}
};
class FileSegment : private boost::noncopyable, public std::enable_shared_from_this<FileSegment>
class FileSegment : private boost::noncopyable
{
friend struct LockedKey;
friend class FileCache; /// Because of reserved_size in tryReserve().

View File

@ -1,8 +1,6 @@
#pragma once
#include <mutex>
#include <Interpreters/Cache/FileCache_fwd.h>
#include <boost/noncopyable.hpp>
#include <map>
namespace DB
{
@ -63,6 +61,8 @@ namespace DB
*/
struct CacheGuard : private boost::noncopyable
{
/// struct is used (not keyword `using`) to make CacheGuard::Lock non-interchangable with other guards locks
/// so, we wouldn't be able to pass CacheGuard::Lock to a function which accepts KeyGuard::Lock, for example
struct Lock : public std::unique_lock<std::mutex>
{
explicit Lock(std::mutex & mutex_) : std::unique_lock<std::mutex>(mutex_) {}

View File

@ -208,10 +208,9 @@ LockedKeyPtr CacheMetadata::lockKeyMetadata(
chassert(key_not_found_policy == KeyNotFoundPolicy::CREATE_EMPTY);
}
/// Not we are at a case:
/// key_state == KeyMetadata::KeyState::REMOVED
/// and KeyNotFoundPolicy == CREATE_EMPTY
/// Retry.
/// Now we are at the case when the key was removed (key_state == KeyMetadata::KeyState::REMOVED)
/// but we need to return empty key (key_not_found_policy == KeyNotFoundPolicy::CREATE_EMPTY)
/// Retry
return lockKeyMetadata(key, key_not_found_policy);
}
@ -241,13 +240,6 @@ void CacheMetadata::doCleanup()
{
auto lock = guard.lock();
/// Let's mention this case.
/// This metadata cleanup is delayed so what is we marked key as deleted and
/// put it to deletion queue, but then the same key was added to cache before
/// we actually performed this delayed removal?
/// In this case it will work fine because on each attempt to add any key to cache
/// we perform this delayed removal.
FileCacheKey cleanup_key;
while (cleanup_queue->tryPop(cleanup_key))
{

View File

@ -4169,35 +4169,8 @@ OrdinaryBackgroundExecutorPtr Context::getCommonExecutor() const
return shared->common_executor;
}
static size_t getThreadPoolReaderSizeFromConfig(Context::FilesystemReaderType type, const Poco::Util::AbstractConfiguration & config)
{
switch (type)
{
case Context::FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
return config.getUInt(".threadpool_remote_fs_reader_pool_size", 250);
}
case Context::FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
return config.getUInt(".threadpool_local_fs_reader_pool_size", 100);
}
case Context::FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
return std::numeric_limits<std::size_t>::max();
}
}
}
size_t Context::getThreadPoolReaderSize(FilesystemReaderType type) const
{
const auto & config = getConfigRef();
return getThreadPoolReaderSizeFromConfig(type, config);
}
IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) const
{
const auto & config = getConfigRef();
auto lock = getLock();
switch (type)
@ -4205,31 +4178,20 @@ IAsynchronousReader & Context::getThreadPoolReader(FilesystemReaderType type) co
case FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER:
{
if (!shared->asynchronous_remote_fs_reader)
{
auto pool_size = getThreadPoolReaderSizeFromConfig(type, config);
auto queue_size = config.getUInt(".threadpool_remote_fs_reader_queue_size", 1000000);
shared->asynchronous_remote_fs_reader = std::make_unique<ThreadPoolRemoteFSReader>(pool_size, queue_size);
}
shared->asynchronous_remote_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->asynchronous_remote_fs_reader;
}
case FilesystemReaderType::ASYNCHRONOUS_LOCAL_FS_READER:
{
if (!shared->asynchronous_local_fs_reader)
{
auto pool_size = getThreadPoolReaderSizeFromConfig(type, config);
auto queue_size = config.getUInt(".threadpool_local_fs_reader_queue_size", 1000000);
shared->asynchronous_local_fs_reader = std::make_unique<ThreadPoolReader>(pool_size, queue_size);
}
shared->asynchronous_local_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->asynchronous_local_fs_reader;
}
case FilesystemReaderType::SYNCHRONOUS_LOCAL_FS_READER:
{
if (!shared->synchronous_local_fs_reader)
{
shared->synchronous_local_fs_reader = std::make_unique<SynchronousReader>();
}
shared->synchronous_local_fs_reader = createThreadPoolReader(type, getConfigRef());
return *shared->synchronous_local_fs_reader;
}

View File

@ -11,6 +11,7 @@
#include <Core/Settings.h>
#include <Core/UUID.h>
#include <IO/AsyncReadCounters.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <Interpreters/ClientInfo.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/DatabaseCatalog.h>
@ -1096,17 +1097,8 @@ public:
OrdinaryBackgroundExecutorPtr getFetchesExecutor() const;
OrdinaryBackgroundExecutorPtr getCommonExecutor() const;
enum class FilesystemReaderType
{
SYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_LOCAL_FS_READER,
ASYNCHRONOUS_REMOTE_FS_READER,
};
IAsynchronousReader & getThreadPoolReader(FilesystemReaderType type) const;
size_t getThreadPoolReaderSize(FilesystemReaderType type) const;
std::shared_ptr<AsyncReadCounters> getAsyncReadCounters() const;
ThreadPool & getThreadPoolWriter() const;

View File

@ -241,13 +241,17 @@ Chain InterpreterInsertQuery::buildChain(
running_group = std::make_shared<ThreadGroup>(getContext());
auto sample = getSampleBlock(columns, table, metadata_snapshot);
return buildChainImpl(table, metadata_snapshot, sample, thread_status_holder, running_group, elapsed_counter_ms);
Chain sink = buildSink(table, metadata_snapshot, thread_status_holder, running_group, elapsed_counter_ms);
Chain chain = buildPreSinkChain(sink.getInputHeader(), table, metadata_snapshot, sample, thread_status_holder);
chain.appendChain(std::move(sink));
return chain;
}
Chain InterpreterInsertQuery::buildChainImpl(
Chain InterpreterInsertQuery::buildSink(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group,
std::atomic_uint64_t * elapsed_counter_ms)
@ -258,14 +262,7 @@ Chain InterpreterInsertQuery::buildChainImpl(
thread_status = nullptr;
auto context_ptr = getContext();
const ASTInsertQuery * query = nullptr;
if (query_ptr)
query = query_ptr->as<ASTInsertQuery>();
const Settings & settings = context_ptr->getSettingsRef();
bool null_as_default = query && query->select && context_ptr->getSettingsRef().insert_null_as_default;
/// We create a pipeline of several streams, into which we will write data.
Chain out;
/// Keep a reference to the context to make sure it stays alive until the chain is executed and destroyed
@ -286,16 +283,48 @@ Chain InterpreterInsertQuery::buildChainImpl(
thread_status_holder, running_group, elapsed_counter_ms);
}
return out;
}
Chain InterpreterInsertQuery::buildPreSinkChain(
const Block & subsequent_header,
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder)
{
ThreadStatus * thread_status = current_thread;
if (!thread_status_holder)
thread_status = nullptr;
auto context_ptr = getContext();
const ASTInsertQuery * query = nullptr;
if (query_ptr)
query = query_ptr->as<ASTInsertQuery>();
const Settings & settings = context_ptr->getSettingsRef();
bool null_as_default = query && query->select && context_ptr->getSettingsRef().insert_null_as_default;
/// We create a pipeline of several streams, into which we will write data.
Chain out;
auto input_header = [&]() -> const Block &
{
return out.empty() ? subsequent_header : out.getInputHeader();
};
/// Note that we wrap transforms one on top of another, so we write them in reverse of data processing order.
/// Checking constraints. It must be done after calculation of all defaults, so we can check them on calculated columns.
if (const auto & constraints = metadata_snapshot->getConstraints(); !constraints.empty())
out.addSource(std::make_shared<CheckConstraintsTransform>(
table->getStorageID(), out.getInputHeader(), metadata_snapshot->getConstraints(), context_ptr));
table->getStorageID(), input_header(), metadata_snapshot->getConstraints(), context_ptr));
auto adding_missing_defaults_dag = addMissingDefaults(
query_sample_block,
out.getInputHeader().getNamesAndTypesList(),
input_header().getNamesAndTypesList(),
metadata_snapshot->getColumns(),
context_ptr,
null_as_default);
@ -316,12 +345,12 @@ Chain InterpreterInsertQuery::buildChainImpl(
bool table_prefers_large_blocks = table->prefersLargeBlocks();
out.addSource(std::make_shared<SquashingChunksTransform>(
out.getInputHeader(),
input_header(),
table_prefers_large_blocks ? settings.min_insert_block_size_rows : settings.max_block_size,
table_prefers_large_blocks ? settings.min_insert_block_size_bytes : 0ULL));
}
auto counting = std::make_shared<CountingTransform>(out.getInputHeader(), thread_status, getContext()->getQuota());
auto counting = std::make_shared<CountingTransform>(input_header(), thread_status, getContext()->getQuota());
counting->setProcessListElement(context_ptr->getProcessListElement());
counting->setProgressCallback(context_ptr->getProgressCallback());
out.addSource(std::move(counting));
@ -362,10 +391,20 @@ BlockIO InterpreterInsertQuery::execute()
// Distributed INSERT SELECT
distributed_pipeline = table->distributedWrite(query, getContext());
std::vector<Chain> out_chains;
std::vector<Chain> presink_chains;
std::vector<Chain> sink_chains;
if (!distributed_pipeline || query.watch)
{
size_t out_streams_size = 1;
/// Number of streams works like this:
/// * For the SELECT, use `max_threads`, or `max_insert_threads`, or whatever
/// InterpreterSelectQuery ends up with.
/// * Use `max_insert_threads` streams for various insert-preparation steps, e.g.
/// materializing and squashing (too slow to do in one thread). That's `presink_chains`.
/// * If the table supports parallel inserts, use the same streams for writing to IStorage.
/// Otherwise ResizeProcessor them down to 1 stream.
/// * If it's not an INSERT SELECT, forget all that and use one stream.
size_t pre_streams_size = 1;
size_t sink_streams_size = 1;
if (query.select)
{
@ -441,10 +480,14 @@ BlockIO InterpreterInsertQuery::execute()
pipeline.dropTotalsAndExtremes();
if (table->supportsParallelInsert() && settings.max_insert_threads > 1)
out_streams_size = std::min(static_cast<size_t>(settings.max_insert_threads), pipeline.getNumStreams());
if (settings.max_insert_threads > 1)
{
pre_streams_size = std::min(static_cast<size_t>(settings.max_insert_threads), pipeline.getNumStreams());
if (table->supportsParallelInsert())
sink_streams_size = pre_streams_size;
}
pipeline.resize(out_streams_size);
pipeline.resize(pre_streams_size);
/// Allow to insert Nullable into non-Nullable columns, NULL values will be added as defaults values.
if (getContext()->getSettingsRef().insert_null_as_default)
@ -476,13 +519,17 @@ BlockIO InterpreterInsertQuery::execute()
running_group = current_thread->getThreadGroup();
if (!running_group)
running_group = std::make_shared<ThreadGroup>(getContext());
for (size_t i = 0; i < out_streams_size; ++i)
for (size_t i = 0; i < sink_streams_size; ++i)
{
auto out = buildChainImpl(table, metadata_snapshot, query_sample_block,
/* thread_status_holder= */ nullptr,
running_group,
/* elapsed_counter_ms= */ nullptr);
out_chains.emplace_back(std::move(out));
auto out = buildSink(table, metadata_snapshot, /* thread_status_holder= */ nullptr,
running_group, /* elapsed_counter_ms= */ nullptr);
sink_chains.emplace_back(std::move(out));
}
for (size_t i = 0; i < pre_streams_size; ++i)
{
auto out = buildPreSinkChain(sink_chains[0].getInputHeader(), table, metadata_snapshot,
query_sample_block, /* thread_status_holder= */ nullptr);
presink_chains.emplace_back(std::move(out));
}
}
@ -495,7 +542,7 @@ BlockIO InterpreterInsertQuery::execute()
}
else if (query.select || query.watch)
{
const auto & header = out_chains.at(0).getInputHeader();
const auto & header = presink_chains.at(0).getInputHeader();
auto actions_dag = ActionsDAG::makeConvertingActions(
pipeline.getHeader().getColumnsWithTypeAndName(),
header.getColumnsWithTypeAndName(),
@ -516,10 +563,14 @@ BlockIO InterpreterInsertQuery::execute()
size_t num_select_threads = pipeline.getNumThreads();
for (auto & chain : out_chains)
for (auto & chain : presink_chains)
resources = chain.detachResources();
for (auto & chain : sink_chains)
resources = chain.detachResources();
pipeline.addChains(std::move(out_chains));
pipeline.addChains(std::move(presink_chains));
pipeline.resize(sink_chains.size());
pipeline.addChains(std::move(sink_chains));
if (!settings.parallel_view_processing)
{
@ -552,7 +603,8 @@ BlockIO InterpreterInsertQuery::execute()
}
else
{
res.pipeline = QueryPipeline(std::move(out_chains.at(0)));
presink_chains.at(0).appendChain(std::move(sink_chains.at(0)));
res.pipeline = QueryPipeline(std::move(presink_chains[0]));
res.pipeline.setNumThreads(std::min<size_t>(res.pipeline.getNumThreads(), settings.max_threads));
if (query.hasInlinedData() && !async_insert)

View File

@ -66,13 +66,19 @@ private:
std::vector<std::unique_ptr<ReadBuffer>> owned_buffers;
Chain buildChainImpl(
Chain buildSink(
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder,
ThreadGroupPtr running_group,
std::atomic_uint64_t * elapsed_counter_ms);
Chain buildPreSinkChain(
const Block & subsequent_header,
const StoragePtr & table,
const StorageMetadataPtr & metadata_snapshot,
const Block & query_sample_block,
ThreadStatusesHolderPtr thread_status_holder);
};

View File

@ -1,19 +1,14 @@
#include "OpenTelemetrySpanLog.h"
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypeMap.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypeEnum.h>
#include <Interpreters/Context.h>
#include <base/hex.h>
#include <Common/CurrentThread.h>
#include <Core/Field.h>
namespace DB
@ -32,11 +27,13 @@ NamesAndTypesList OpenTelemetrySpanLogElement::getNamesAndTypes()
}
);
auto low_cardinality_string = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
return {
{"trace_id", std::make_shared<DataTypeUUID>()},
{"span_id", std::make_shared<DataTypeUInt64>()},
{"parent_span_id", std::make_shared<DataTypeUInt64>()},
{"operation_name", std::make_shared<DataTypeString>()},
{"operation_name", low_cardinality_string},
{"kind", std::move(span_kind_type)},
// DateTime64 is really unwieldy -- there is no "normal" way to convert
// it to an UInt64 count of microseconds, except:
@ -51,15 +48,17 @@ NamesAndTypesList OpenTelemetrySpanLogElement::getNamesAndTypes()
{"start_time_us", std::make_shared<DataTypeUInt64>()},
{"finish_time_us", std::make_shared<DataTypeUInt64>()},
{"finish_date", std::make_shared<DataTypeDate>()},
{"attribute", std::make_shared<DataTypeMap>(std::make_shared<DataTypeString>(), std::make_shared<DataTypeString>())},
{"attribute", std::make_shared<DataTypeMap>(low_cardinality_string, std::make_shared<DataTypeString>())},
};
}
NamesAndAliases OpenTelemetrySpanLogElement::getNamesAndAliases()
{
auto low_cardinality_string = std::make_shared<DataTypeLowCardinality>(std::make_shared<DataTypeString>());
return
{
{"attribute.names", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "mapKeys(attribute)"},
{"attribute.names", std::make_shared<DataTypeArray>(low_cardinality_string), "mapKeys(attribute)"},
{"attribute.values", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>()), "mapValues(attribute)"}
};
}
@ -83,4 +82,3 @@ void OpenTelemetrySpanLogElement::appendToBlock(MutableColumns & columns) const
}
}

View File

@ -84,6 +84,7 @@ ThreadGroupPtr ThreadGroup::createForBackgroundProcess(ContextPtr storage_contex
group->memory_tracker.setProfilerStep(settings.memory_profiler_step);
group->memory_tracker.setSampleProbability(settings.memory_profiler_sample_probability);
group->memory_tracker.setSoftLimit(settings.memory_overcommit_ratio_denominator);
group->memory_tracker.setParent(&background_memory_tracker);
if (settings.memory_tracker_fault_probability > 0.0)
group->memory_tracker.setFaultProbability(settings.memory_tracker_fault_probability);

View File

@ -168,7 +168,6 @@ void FinishAggregatingInOrderAlgorithm::addToAggregation()
accumulated_bytes += static_cast<size_t>(static_cast<double>(states[i].total_bytes) * current_rows / states[i].num_rows);
accumulated_rows += current_rows;
if (!states[i].isValid())
inputs_to_update.push_back(i);
}

View File

@ -19,6 +19,30 @@
#include <DataTypes/DataTypeDateTime64.h>
/// See https://fmt.dev/latest/api.html#formatting-user-defined-types
template <>
struct fmt::formatter<DB::RowNumber>
{
static constexpr auto parse(format_parse_context & ctx)
{
const auto * it = ctx.begin();
const auto * end = ctx.end();
/// Only support {}.
if (it != end && *it != '}')
throw fmt::format_error("Invalid format");
return it;
}
template <typename FormatContext>
auto format(const DB::RowNumber & x, FormatContext & ctx)
{
return fmt::format_to(ctx.out(), "{}:{}", x.block, x.row);
}
};
namespace DB
{
@ -34,7 +58,7 @@ namespace ErrorCodes
// Interface for true window functions. It's not much of an interface, they just
// accept the guts of WindowTransform and do 'something'. Given a small number of
// true window functions, and the fact that the WindowTransform internals are
// pretty much well defined in domain terms (e.g. frame boundaries), this is
// pretty much well-defined in domain terms (e.g. frame boundaries), this is
// somewhat acceptable.
class IWindowFunction
{
@ -323,8 +347,6 @@ void WindowTransform::advancePartitionEnd()
const RowNumber end = blocksEnd();
// fmt::print(stderr, "end {}, partition_end {}\n", end, partition_end);
// If we're at the total end of data, we must end the partition. This is one
// of the few places in calculations where we need special handling for end
// of data, other places will work as usual based on
@ -383,9 +405,6 @@ void WindowTransform::advancePartitionEnd()
const auto block_rows = blockRowsNumber(partition_end);
for (; partition_end.row < block_rows; ++partition_end.row)
{
// fmt::print(stderr, "compare reference '{}' to compared '{}'\n",
// prev_frame_start, partition_end);
size_t i = 0;
for (; i < partition_by_columns; ++i)
{
@ -394,9 +413,6 @@ void WindowTransform::advancePartitionEnd()
const auto * compared_column
= inputAt(partition_end)[partition_by_indices[i]].get();
// fmt::print(stderr, "reference '{}', compared '{}'\n",
// (*reference_column)[prev_frame_start.row],
// (*compared_column)[partition_end.row]);
if (compared_column->compareAt(partition_end.row,
prev_frame_start.row, *reference_column,
1 /* nan_direction_hint */) != 0)
@ -421,26 +437,26 @@ void WindowTransform::advancePartitionEnd()
assert(!partition_ended && partition_end == blocksEnd());
}
auto WindowTransform::moveRowNumberNoCheck(const RowNumber & _x, int64_t offset) const
auto WindowTransform::moveRowNumberNoCheck(const RowNumber & original_row_number, Int64 offset) const
{
RowNumber x = _x;
RowNumber moved_row_number = original_row_number;
if (offset > 0 && x != blocksEnd())
if (offset > 0 && moved_row_number != blocksEnd())
{
for (;;)
{
assertValid(x);
assertValid(moved_row_number);
assert(offset >= 0);
const auto block_rows = blockRowsNumber(x);
x.row += offset;
if (x.row >= block_rows)
const auto block_rows = blockRowsNumber(moved_row_number);
moved_row_number.row += offset;
if (moved_row_number.row >= block_rows)
{
offset = x.row - block_rows;
x.row = 0;
x.block++;
offset = moved_row_number.row - block_rows;
moved_row_number.row = 0;
++moved_row_number.block;
if (x == blocksEnd())
if (moved_row_number == blocksEnd())
{
break;
}
@ -456,56 +472,55 @@ auto WindowTransform::moveRowNumberNoCheck(const RowNumber & _x, int64_t offset)
{
for (;;)
{
assertValid(x);
assertValid(moved_row_number);
assert(offset <= 0);
// abs(offset) is less than INT64_MAX, as checked in the parser, so
// this negation should always work.
assert(offset >= -INT64_MAX);
if (x.row >= static_cast<uint64_t>(-offset))
if (moved_row_number.row >= static_cast<UInt64>(-offset))
{
x.row -= -offset;
moved_row_number.row -= -offset;
offset = 0;
break;
}
// Move to the first row in current block. Note that the offset is
// negative.
offset += x.row;
x.row = 0;
offset += moved_row_number.row;
moved_row_number.row = 0;
// Move to the last row of the previous block, if we are not at the
// first one. Offset also is incremented by one, because we pass over
// the first row of this block.
if (x.block == first_block_number)
if (moved_row_number.block == first_block_number)
{
break;
}
--x.block;
--moved_row_number.block;
offset += 1;
x.row = blockRowsNumber(x) - 1;
moved_row_number.row = blockRowsNumber(moved_row_number) - 1;
}
}
return std::tuple<RowNumber, int64_t>{x, offset};
return std::tuple<RowNumber, Int64>{moved_row_number, offset};
}
auto WindowTransform::moveRowNumber(const RowNumber & _x, int64_t offset) const
auto WindowTransform::moveRowNumber(const RowNumber & original_row_number, Int64 offset) const
{
auto [x, o] = moveRowNumberNoCheck(_x, offset);
auto [moved_row_number, offset_after_move] = moveRowNumberNoCheck(original_row_number, offset);
#ifndef NDEBUG
// Check that it was reversible.
auto [xx, oo] = moveRowNumberNoCheck(x, -(offset - o));
/// Check that it was reversible. If we move back, we get the original row number with zero offset.
const auto [original_row_number_to_validate, offset_after_move_back]
= moveRowNumberNoCheck(moved_row_number, -(offset - offset_after_move));
// fmt::print(stderr, "{} -> {}, result {}, {}, new offset {}, twice {}, {}\n",
// _x, offset, x, o, -(offset - o), xx, oo);
assert(xx == _x);
assert(oo == 0);
assert(original_row_number_to_validate == original_row_number);
assert(0 == offset_after_move_back);
#endif
return std::tuple<RowNumber, int64_t>{x, o};
return std::tuple<RowNumber, Int64>{moved_row_number, offset_after_move};
}
@ -520,9 +535,6 @@ void WindowTransform::advanceFrameStartRowsOffset()
assertValid(frame_start);
// fmt::print(stderr, "frame start {} left {} partition start {}\n",
// frame_start, offset_left, partition_start);
if (frame_start <= partition_start)
{
// Got to the beginning of partition and can't go further back.
@ -685,8 +697,6 @@ bool WindowTransform::arePeers(const RowNumber & x, const RowNumber & y) const
void WindowTransform::advanceFrameEndCurrentRow()
{
// fmt::print(stderr, "starting from frame_end {}\n", frame_end);
// We only process one block here, and frame_end must be already in it: if
// we didn't find the end in the previous block, frame_end is now the first
// row of the current block. We need this knowledge to write a simpler loop
@ -708,7 +718,7 @@ void WindowTransform::advanceFrameEndCurrentRow()
// We advance until the partition end. It's either in the current block or
// in the next one, which is also the past-the-end block. Figure out how
// many rows we have to process.
uint64_t rows_end;
UInt64 rows_end;
if (partition_end.row == 0)
{
assert(partition_end == blocksEnd());
@ -722,14 +732,11 @@ void WindowTransform::advanceFrameEndCurrentRow()
// Equality would mean "no data to process", for which we checked above.
assert(frame_end.row < rows_end);
// fmt::print(stderr, "first row {} last {}\n", frame_end.row, rows_end);
// Advance frame_end while it is still peers with the current row.
for (; frame_end.row < rows_end; ++frame_end.row)
{
if (!arePeers(current_row, frame_end))
{
// fmt::print(stderr, "{} and {} don't match\n", reference, frame_end);
frame_ended = true;
return;
}
@ -852,8 +859,6 @@ void WindowTransform::advanceFrameEnd()
break;
}
// fmt::print(stderr, "frame_end {} -> {}\n", frame_end_before, frame_end);
// We might not have advanced the frame end if we found out we reached the
// end of input or the partition, or if we still don't know the frame start.
if (frame_end_before == frame_end)
@ -865,9 +870,6 @@ void WindowTransform::advanceFrameEnd()
// Update the aggregation states after the frame has changed.
void WindowTransform::updateAggregationState()
{
// fmt::print(stderr, "update agg states [{}, {}) -> [{}, {})\n",
// prev_frame_start, prev_frame_end, frame_start, frame_end);
// Assert that the frame boundaries are known, have proper order wrt each
// other, and have not gone back wrt the previous frame.
assert(frame_started);
@ -915,7 +917,6 @@ void WindowTransform::updateAggregationState()
if (reset_aggregation)
{
// fmt::print(stderr, "(2) reset aggregation\n");
a->destroy(buf);
a->create(buf);
}
@ -991,9 +992,6 @@ void WindowTransform::writeOutCurrentRow()
a->insertMergeResultInto(buf, *result_column, arena.get());
}
}
// fmt::print(stderr, "wrote out aggregation state for current row '{}'\n",
// current_row);
}
static void assertSameColumns(const Columns & left_all,
@ -1030,10 +1028,6 @@ static void assertSameColumns(const Columns & left_all,
void WindowTransform::appendChunk(Chunk & chunk)
{
// fmt::print(stderr, "new chunk, {} rows, finished={}\n", chunk.getNumRows(),
// input_is_finished);
// fmt::print(stderr, "chunk structure '{}'\n", chunk.dumpStructure());
// First, prepare the new input block and add it to the queue. We might not
// have it if it's end of data, though.
if (!input_is_finished)
@ -1093,9 +1087,6 @@ void WindowTransform::appendChunk(Chunk & chunk)
for (;;)
{
advancePartitionEnd();
// fmt::print(stderr, "partition [{}, {}), {}\n",
// partition_start, partition_end, partition_ended);
// Either we ran out of data or we found the end of partition (maybe
// both, but this only happens at the total end of data).
assert(partition_ended || partition_end == blocksEnd());
@ -1109,10 +1100,6 @@ void WindowTransform::appendChunk(Chunk & chunk)
// which is precisely the definition of `partition_end`.
while (current_row < partition_end)
{
// fmt::print(stderr, "(1) row {} frame [{}, {}) {}, {}\n",
// current_row, frame_start, frame_end,
// frame_started, frame_ended);
// We now know that the current row is valid, so we can update the
// peer group start.
if (!arePeers(peer_group_start, current_row))
@ -1152,10 +1139,6 @@ void WindowTransform::appendChunk(Chunk & chunk)
return;
}
// fmt::print(stderr, "(2) row {} frame [{}, {}) {}, {}\n",
// current_row, frame_start, frame_end,
// frame_started, frame_ended);
// The frame can be empty sometimes, e.g. the boundaries coincide
// or the start is after the partition end. But hopefully start is
// not after end.
@ -1236,8 +1219,6 @@ void WindowTransform::appendChunk(Chunk & chunk)
peer_group_start_row_number = 1;
peer_group_number = 1;
// fmt::print(stderr, "reinitialize agg data at start of {}\n",
// partition_start);
// Reinitialize the aggregate function states because the new partition
// has started.
for (auto & ws : workspaces)
@ -1278,10 +1259,6 @@ void WindowTransform::appendChunk(Chunk & chunk)
IProcessor::Status WindowTransform::prepare()
{
// fmt::print(stderr, "prepare, next output {}, not ready row {}, first block {}, hold {} blocks\n",
// next_output_block_number, first_not_ready_row, first_block_number,
// blocks.size());
if (output.isFinished() || isCancelled())
{
// The consumer asked us not to continue (or we decided it ourselves),
@ -1325,10 +1302,6 @@ IProcessor::Status WindowTransform::prepare()
}
output_data.chunk.setColumns(columns, block.rows);
// fmt::print(stderr, "output block {} as chunk '{}'\n",
// next_output_block_number,
// output_data.chunk.dumpStructure());
++next_output_block_number;
output.pushData(std::move(output_data));
@ -1428,9 +1401,6 @@ void WindowTransform::work()
std::min(prev_frame_start.block, current_row.block));
if (first_block_number < first_used_block)
{
// fmt::print(stderr, "will drop blocks from {} to {}\n", first_block_number,
// first_used_block);
blocks.erase(blocks.begin(),
blocks.begin() + (first_used_block - first_block_number));
first_block_number = first_used_block;
@ -2196,7 +2166,7 @@ struct WindowFunctionLagLeadInFrame final : public WindowFunction
IColumn & to = *current_block.output_columns[function_index];
const auto & workspace = transform->workspaces[function_index];
int64_t offset = 1;
Int64 offset = 1;
if (argument_types.size() > 1)
{
offset = (*current_block.input_columns[
@ -2286,7 +2256,7 @@ struct WindowFunctionNthValue final : public WindowFunction
IColumn & to = *current_block.output_columns[function_index];
const auto & workspace = transform->workspaces[function_index];
int64_t offset = (*current_block.input_columns[
Int64 offset = (*current_block.input_columns[
workspace.argument_column_indices[1]])[
transform->current_row.row].get<Int64>();

View File

@ -8,6 +8,10 @@
#include <deque>
/// See https://stackoverflow.com/questions/72533435/error-zero-as-null-pointer-constant-while-comparing-template-class-using-spaces
#pragma clang diagnostic push
#pragma clang diagnostic ignored "-Wzero-as-null-pointer-constant"
namespace DB
{
@ -34,7 +38,7 @@ struct WindowFunctionWorkspace
// Argument columns. Be careful, this is a per-block cache.
std::vector<const IColumn *> argument_columns;
uint64_t cached_block_number = std::numeric_limits<uint64_t>::max();
UInt64 cached_block_number = std::numeric_limits<UInt64>::max();
};
struct WindowTransformBlock
@ -48,28 +52,14 @@ struct WindowTransformBlock
struct RowNumber
{
uint64_t block = 0;
uint64_t row = 0;
UInt64 block = 0;
UInt64 row = 0;
bool operator < (const RowNumber & other) const
{
return block < other.block
|| (block == other.block && row < other.row);
}
bool operator == (const RowNumber & other) const
{
return block == other.block && row == other.row;
}
bool operator <= (const RowNumber & other) const
{
return *this < other || *this == other;
}
auto operator <=>(const RowNumber &) const = default;
};
/*
* Computes several window functions that share the same window. The input must
/* Computes several window functions that share the same window. The input must
* be sorted by PARTITION BY (in any order), then by ORDER BY.
* We need to track the following pointers:
* 1) boundaries of partition -- rows that compare equal w/PARTITION BY.
@ -103,19 +93,16 @@ public:
static Block transformHeader(Block header, const ExpressionActionsPtr & expression);
/*
* (former) Implementation of ISimpleTransform.
/* (former) Implementation of ISimpleTransform.
*/
void appendChunk(Chunk & chunk) /*override*/;
/*
* Implementation of IProcessor;
/* Implementation of IProcessor;
*/
Status prepare() override;
void work() override;
/*
* Implementation details.
/* Implementation details.
*/
void advancePartitionEnd();
@ -146,14 +133,14 @@ public:
return const_cast<WindowTransform *>(this)->inputAt(x);
}
auto & blockAt(const uint64_t block_number)
auto & blockAt(const UInt64 block_number)
{
assert(block_number >= first_block_number);
assert(block_number - first_block_number < blocks.size());
return blocks[block_number - first_block_number];
}
const auto & blockAt(const uint64_t block_number) const
const auto & blockAt(const UInt64 block_number) const
{
return const_cast<WindowTransform *>(this)->blockAt(block_number);
}
@ -188,7 +175,7 @@ public:
const auto block_rows = blockAt(x).rows;
assert(x.row < block_rows);
x.row++;
++x.row;
if (x.row < block_rows)
{
return;
@ -237,20 +224,16 @@ public:
return result;
}
auto moveRowNumber(const RowNumber & _x, int64_t offset) const;
auto moveRowNumberNoCheck(const RowNumber & _x, int64_t offset) const;
auto moveRowNumber(const RowNumber & original_row_number, Int64 offset) const;
auto moveRowNumberNoCheck(const RowNumber & original_row_number, Int64 offset) const;
void assertValid(const RowNumber & x) const
{
assert(x.block >= first_block_number);
if (x.block == first_block_number + blocks.size())
{
assert(x.row == 0);
}
else
{
assert(x.row < blockRowsNumber(x));
}
}
RowNumber blocksEnd() const
@ -263,8 +246,7 @@ public:
return RowNumber{first_block_number, 0};
}
/*
* Data (formerly) inherited from ISimpleTransform, needed for the
/* Data (formerly) inherited from ISimpleTransform, needed for the
* implementation of the IProcessor interface.
*/
InputPort & input;
@ -276,8 +258,7 @@ public:
bool has_output = false;
Port::Data output_data;
/*
* Data for window transform itself.
/* Data for window transform itself.
*/
Block input_header;
@ -300,9 +281,9 @@ public:
// have an always-incrementing index. The index of the first block is in
// `first_block_number`.
std::deque<WindowTransformBlock> blocks;
uint64_t first_block_number = 0;
UInt64 first_block_number = 0;
// The next block we are going to pass to the consumer.
uint64_t next_output_block_number = 0;
UInt64 next_output_block_number = 0;
// The first row for which we still haven't calculated the window functions.
// Used to determine which resulting blocks we can pass to the consumer.
RowNumber first_not_ready_row;
@ -326,9 +307,9 @@ public:
RowNumber peer_group_start;
// Row and group numbers in partition for calculating rank() and friends.
uint64_t current_row_number = 1;
uint64_t peer_group_start_row_number = 1;
uint64_t peer_group_number = 1;
UInt64 current_row_number = 1;
UInt64 peer_group_start_row_number = 1;
UInt64 peer_group_number = 1;
// The frame is [frame_start, frame_end) if frame_ended && frame_started,
// and unknown otherwise. Note that when we move to the next row, both the
@ -353,34 +334,13 @@ public:
// Comparison function for RANGE OFFSET frames. We choose the appropriate
// overload once, based on the type of the ORDER BY column. Choosing it for
// each row would be slow.
int (* compare_values_with_offset) (
std::function<int(
const IColumn * compared_column, size_t compared_row,
const IColumn * reference_column, size_t reference_row,
const Field & offset,
bool offset_is_preceding);
bool offset_is_preceding)> compare_values_with_offset;
};
}
/// See https://fmt.dev/latest/api.html#formatting-user-defined-types
template <>
struct fmt::formatter<DB::RowNumber>
{
static constexpr auto parse(format_parse_context & ctx)
{
const auto * it = ctx.begin();
const auto * end = ctx.end();
/// Only support {}.
if (it != end && *it != '}')
throw fmt::format_error("invalid format");
return it;
}
template <typename FormatContext>
auto format(const DB::RowNumber & x, FormatContext & ctx)
{
return fmt::format_to(ctx.out(), "{}:{}", x.block, x.row);
}
};
#pragma clang diagnostic pop

View File

@ -99,6 +99,14 @@ void Chain::addSink(ProcessorPtr processor)
processors.emplace_back(std::move(processor));
}
void Chain::appendChain(Chain chain)
{
connect(getOutputPort(), chain.getInputPort());
processors.splice(processors.end(), std::move(chain.processors));
attachResources(chain.detachResources());
num_threads += chain.num_threads;
}
IProcessor & Chain::getSource()
{
checkInitialized(processors);

View File

@ -7,6 +7,10 @@
namespace DB
{
/// Has one unconnected input port and one unconnected output port.
/// There may be other ports on the processors, but they must all be connected.
/// The unconnected input must be on the first processor, output - on the last.
/// The processors don't necessarily form an actual chain.
class Chain
{
public:
@ -27,6 +31,7 @@ public:
void addSource(ProcessorPtr processor);
void addSink(ProcessorPtr processor);
void appendChain(Chain chain);
IProcessor & getSource();
IProcessor & getSink();
@ -44,7 +49,11 @@ public:
void addStorageHolder(StoragePtr storage) { holder.storage_holders.emplace_back(std::move(storage)); }
void addInterpreterContext(ContextPtr context) { holder.interpreter_context.emplace_back(std::move(context)); }
void attachResources(QueryPlanResourceHolder holder_) { holder = std::move(holder_); }
void attachResources(QueryPlanResourceHolder holder_)
{
/// This operator "=" actually merges holder_ into holder, doesn't replace.
holder = std::move(holder_);
}
QueryPlanResourceHolder detachResources() { return std::move(holder); }
void reset();

View File

@ -1,7 +1,7 @@
#include <Server/ProtocolServerAdapter.h>
#include <Server/TCPServer.h>
#if USE_GRPC && !defined(KEEPER_STANDALONE_BUILD)
#if USE_GRPC && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
#include <Server/GRPCServer.h>
#endif
@ -37,7 +37,7 @@ ProtocolServerAdapter::ProtocolServerAdapter(
{
}
#if USE_GRPC && !defined(KEEPER_STANDALONE_BUILD)
#if USE_GRPC && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
class ProtocolServerAdapter::GRPCServerAdapterImpl : public Impl
{
public:

View File

@ -23,7 +23,7 @@ public:
ProtocolServerAdapter & operator =(ProtocolServerAdapter && src) = default;
ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr<TCPServer> tcp_server_);
#if USE_GRPC && !defined(KEEPER_STANDALONE_BUILD)
#if USE_GRPC && !defined(CLICKHOUSE_PROGRAM_STANDALONE_BUILD)
ProtocolServerAdapter(const std::string & listen_host_, const char * port_name_, const std::string & description_, std::unique_ptr<GRPCServer> grpc_server_);
#endif

View File

@ -67,7 +67,7 @@ bool AsynchronousReadBufferFromHDFS::hasPendingDataToRead()
std::future<IAsynchronousReader::Result> AsynchronousReadBufferFromHDFS::asyncReadInto(char * data, size_t size, int64_t priority)
{
IAsynchronousReader::Request request;
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl);
request.descriptor = std::make_shared<RemoteFSFileDescriptor>(*impl, nullptr);
request.buf = data;
request.size = size;
request.offset = file_offset_of_buffer_end;

View File

@ -21,6 +21,7 @@
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <IO/ReadBufferFromString.h>
#include <Disks/IO/getThreadPoolReader.h>
#include <Storages/Cache/ExternalDataSourceCache.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTCreateQuery.h>
@ -232,7 +233,7 @@ public:
if (thread_pool_read)
{
return std::make_unique<AsynchronousReadBufferFromHDFS>(
IObjectStorage::getThreadPoolReader(), read_settings, std::move(buf));
getThreadPoolReader(FilesystemReaderType::ASYNCHRONOUS_REMOTE_FS_READER), read_settings, std::move(buf));
}
else
{

View File

@ -146,6 +146,8 @@ public:
virtual bool supportsReplication() const { return false; }
/// Returns true if the storage supports parallel insert.
/// If false, each INSERT query will call write() only once.
/// Different INSERT queries may write in parallel regardless of this value.
virtual bool supportsParallelInsert() const { return false; }
/// Returns true if the storage supports deduplication of inserted data blocks.

View File

@ -80,4 +80,9 @@ MergeInfo MergeListElement::getInfo() const
return res;
}
MergeListElement::~MergeListElement()
{
background_memory_tracker.adjustOnBackgroundTaskEnd(&getMemoryTracker());
}
}

View File

@ -115,6 +115,8 @@ struct MergeListElement : boost::noncopyable
MergeListElement * ptr() { return this; }
MergeListElement & ref() { return *this; }
~MergeListElement();
};
/** Maintains a list of currently running merges.

View File

@ -118,10 +118,8 @@ namespace ProfileEvents
extern const Event DelayedInsertsMilliseconds;
extern const Event InsertedWideParts;
extern const Event InsertedCompactParts;
extern const Event InsertedInMemoryParts;
extern const Event MergedIntoWideParts;
extern const Event MergedIntoCompactParts;
extern const Event MergedIntoInMemoryParts;
extern const Event RejectedMutations;
extern const Event DelayedMutations;
extern const Event DelayedMutationsMilliseconds;
@ -385,8 +383,7 @@ MergeTreeData::MergeTreeData(
String reason;
if (!canUsePolymorphicParts(*settings, &reason) && !reason.empty())
LOG_WARNING(log, "{} Settings 'min_rows_for_wide_part', 'min_bytes_for_wide_part', "
"'min_rows_for_compact_part' and 'min_bytes_for_compact_part' will be ignored.", reason);
LOG_WARNING(log, "{} Settings 'min_rows_for_wide_part'and 'min_bytes_for_wide_part' will be ignored.", reason);
#if !USE_ROCKSDB
if (use_metadata_cache)
@ -2354,22 +2351,6 @@ void MergeTreeData::removePartsFinally(const MergeTreeData::DataPartsVector & pa
}
}
void MergeTreeData::flushAllInMemoryPartsIfNeeded()
{
if (getSettings()->in_memory_parts_enable_wal)
return;
auto metadata_snapshot = getInMemoryMetadataPtr();
DataPartsVector parts = getDataPartsVectorForInternalUsage();
for (const auto & part : parts)
{
if (auto part_in_memory = asInMemoryPart(part))
{
part_in_memory->flushToDisk(part_in_memory->getDataPartStorage().getPartDirectory(), metadata_snapshot);
}
}
}
size_t MergeTreeData::clearOldPartsFromFilesystem(bool force)
{
DataPartsVector parts_to_remove = grabOldParts(force);
@ -2451,17 +2432,19 @@ void MergeTreeData::clearPartsFromFilesystemImpl(const DataPartsVector & parts_t
std::mutex part_names_mutex;
ThreadPool pool(CurrentMetrics::MergeTreePartsCleanerThreads, CurrentMetrics::MergeTreePartsCleanerThreadsActive, num_threads);
bool has_zero_copy_parts = false;
/// This flag disallow straightforward concurrent parts removal. It's required only in case
/// when we have parts on zero-copy disk + at least some of them were mutated.
bool remove_parts_in_order = false;
if (settings->allow_remote_fs_zero_copy_replication && dynamic_cast<StorageReplicatedMergeTree *>(this) != nullptr)
{
has_zero_copy_parts = std::any_of(
remove_parts_in_order = std::any_of(
parts_to_remove.begin(), parts_to_remove.end(),
[] (const auto & data_part) { return data_part->isStoredOnRemoteDiskWithZeroCopySupport(); }
[] (const auto & data_part) { return data_part->isStoredOnRemoteDiskWithZeroCopySupport() && data_part->info.getMutationVersion() > 0; }
);
}
if (!has_zero_copy_parts)
if (!remove_parts_in_order)
{
/// NOTE: Under heavy system load you may get "Cannot schedule a task" from ThreadPool.
LOG_DEBUG(
@ -3340,7 +3323,7 @@ void MergeTreeData::checkMutationIsPossible(const MutationCommands & /*commands*
/// Some validation will be added
}
MergeTreeDataPartFormat MergeTreeData::choosePartFormat(size_t bytes_uncompressed, size_t rows_count, bool only_on_disk) const
MergeTreeDataPartFormat MergeTreeData::choosePartFormat(size_t bytes_uncompressed, size_t rows_count) const
{
using PartType = MergeTreeDataPartType;
using PartStorageType = MergeTreeDataPartStorageType;
@ -3354,9 +3337,6 @@ MergeTreeDataPartFormat MergeTreeData::choosePartFormat(size_t bytes_uncompresse
return bytes_uncompressed < min_bytes_for || rows_count < min_rows_for;
};
if (!only_on_disk && satisfies(settings->min_bytes_for_compact_part, settings->min_rows_for_compact_part))
return {PartType::InMemory, PartStorageType::Full};
auto part_type = PartType::Wide;
if (satisfies(settings->min_bytes_for_wide_part, settings->min_rows_for_wide_part))
part_type = PartType::Compact;
@ -3366,7 +3346,7 @@ MergeTreeDataPartFormat MergeTreeData::choosePartFormat(size_t bytes_uncompresse
MergeTreeDataPartFormat MergeTreeData::choosePartFormatOnDisk(size_t bytes_uncompressed, size_t rows_count) const
{
return choosePartFormat(bytes_uncompressed, rows_count, true);
return choosePartFormat(bytes_uncompressed, rows_count);
}
MergeTreeDataPartBuilder MergeTreeData::getDataPartBuilder(
@ -6144,19 +6124,6 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
}
}
MergeTreeData::WriteAheadLogPtr wal;
auto get_inited_wal = [&] ()
{
if (!wal)
wal = data.getWriteAheadLog();
return wal;
};
if (settings->in_memory_parts_enable_wal)
for (const auto & part : precommitted_parts)
if (auto part_in_memory = asInMemoryPart(part))
get_inited_wal()->addPart(part_in_memory);
NOEXCEPT_SCOPE({
auto current_time = time(nullptr);
@ -6200,10 +6167,6 @@ MergeTreeData::DataPartsVector MergeTreeData::Transaction::commit(MergeTreeData:
data.modifyPartState(covered_part, DataPartState::Outdated);
data.removePartContributionToColumnAndSecondaryIndexSizes(covered_part);
if (settings->in_memory_parts_enable_wal)
if (isInMemoryPart(covered_part))
get_inited_wal()->dropPart(covered_part->name);
}
reduce_parts += covered_parts.size();
@ -7884,11 +7847,8 @@ bool MergeTreeData::canUsePolymorphicParts(const MergeTreeSettings & settings, S
"Table can't create parts with adaptive granularity, but settings"
" min_rows_for_wide_part = {}"
", min_bytes_for_wide_part = {}"
", min_rows_for_compact_part = {}"
", min_bytes_for_compact_part = {}"
". Parts with non-adaptive granularity can be stored only in Wide (default) format.",
settings.min_rows_for_wide_part, settings.min_bytes_for_wide_part,
settings.min_rows_for_compact_part, settings.min_bytes_for_compact_part);
settings.min_rows_for_wide_part, settings.min_bytes_for_wide_part);
}
return false;
@ -8268,9 +8228,6 @@ void MergeTreeData::incrementInsertedPartsProfileEvent(MergeTreeDataPartType typ
case MergeTreeDataPartType::Compact:
ProfileEvents::increment(ProfileEvents::InsertedCompactParts);
break;
case MergeTreeDataPartType::InMemory:
ProfileEvents::increment(ProfileEvents::InsertedInMemoryParts);
break;
default:
break;
}
@ -8286,9 +8243,6 @@ void MergeTreeData::incrementMergedPartsProfileEvent(MergeTreeDataPartType type)
case MergeTreeDataPartType::Compact:
ProfileEvents::increment(ProfileEvents::MergedIntoCompactParts);
break;
case MergeTreeDataPartType::InMemory:
ProfileEvents::increment(ProfileEvents::MergedIntoInMemoryParts);
break;
default:
break;
}

View File

@ -226,7 +226,7 @@ public:
using OperationDataPartsLock = std::unique_lock<std::mutex>;
OperationDataPartsLock lockOperationsWithParts() const { return OperationDataPartsLock(operation_with_data_parts_mutex); }
MergeTreeDataPartFormat choosePartFormat(size_t bytes_uncompressed, size_t rows_count, bool only_on_disk = false) const;
MergeTreeDataPartFormat choosePartFormat(size_t bytes_uncompressed, size_t rows_count) const;
MergeTreeDataPartFormat choosePartFormatOnDisk(size_t bytes_uncompressed, size_t rows_count) const;
MergeTreeDataPartBuilder getDataPartBuilder(const String & name, const VolumePtr & volume, const String & part_dir) const;
@ -661,9 +661,6 @@ public:
/// Removes parts from data_parts, they should be in Deleting state
void removePartsFinally(const DataPartsVector & parts);
/// When WAL is not enabled, the InMemoryParts need to be persistent.
void flushAllInMemoryPartsIfNeeded();
/// Delete irrelevant parts from memory and disk.
/// If 'force' - don't wait for old_parts_lifetime.
size_t clearOldPartsFromFilesystem(bool force = false);

View File

@ -44,7 +44,7 @@ public:
/// Data of all columns is stored in one file. Marks are also stored in single file.
Compact,
/// Format with buffering data in RAM.
/// Format with buffering data in RAM. Obsolete - new parts cannot be created in this format.
InMemory,
Unknown,

Some files were not shown because too many files have changed in this diff Show More