Merge branch 'master' into async-loader-integration

This commit is contained in:
Sergei Trifonov 2023-07-19 18:45:41 +02:00 committed by GitHub
commit b8a46ff822
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
169 changed files with 3803 additions and 1113 deletions

View File

@ -2870,6 +2870,216 @@ jobs:
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"
IntegrationTestsAnalyzerAsan0:
needs: [BuilderDebAsan]
runs-on: [self-hosted, stress-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/integration_tests_asan
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Integration tests (asan, analyzer)
REPO_COPY=${{runner.temp}}/integration_tests_asan/ClickHouse
RUN_BY_HASH_NUM=0
RUN_BY_HASH_TOTAL=6
EOF
- name: Download json reports
uses: actions/download-artifact@v3
with:
path: ${{ env.REPORTS_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
- name: Integration test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 integration_test_check.py "$CHECK_NAME"
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"
IntegrationTestsAnalyzerAsan1:
needs: [BuilderDebAsan]
runs-on: [self-hosted, stress-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/integration_tests_asan
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Integration tests (asan, analyzer)
REPO_COPY=${{runner.temp}}/integration_tests_asan/ClickHouse
RUN_BY_HASH_NUM=1
RUN_BY_HASH_TOTAL=6
EOF
- name: Download json reports
uses: actions/download-artifact@v3
with:
path: ${{ env.REPORTS_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
- name: Integration test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 integration_test_check.py "$CHECK_NAME"
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"
IntegrationTestsAnalyzerAsan2:
needs: [BuilderDebAsan]
runs-on: [self-hosted, stress-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/integration_tests_asan
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Integration tests (asan, analyzer)
REPO_COPY=${{runner.temp}}/integration_tests_asan/ClickHouse
RUN_BY_HASH_NUM=2
RUN_BY_HASH_TOTAL=6
EOF
- name: Download json reports
uses: actions/download-artifact@v3
with:
path: ${{ env.REPORTS_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
- name: Integration test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 integration_test_check.py "$CHECK_NAME"
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"
IntegrationTestsAnalyzerAsan3:
needs: [BuilderDebAsan]
runs-on: [self-hosted, stress-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/integration_tests_asan
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Integration tests (asan, analyzer)
REPO_COPY=${{runner.temp}}/integration_tests_asan/ClickHouse
RUN_BY_HASH_NUM=3
RUN_BY_HASH_TOTAL=6
EOF
- name: Download json reports
uses: actions/download-artifact@v3
with:
path: ${{ env.REPORTS_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
- name: Integration test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 integration_test_check.py "$CHECK_NAME"
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"
IntegrationTestsAnalyzerAsan4:
needs: [BuilderDebAsan]
runs-on: [self-hosted, stress-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/integration_tests_asan
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Integration tests (asan, analyzer)
REPO_COPY=${{runner.temp}}/integration_tests_asan/ClickHouse
RUN_BY_HASH_NUM=4
RUN_BY_HASH_TOTAL=6
EOF
- name: Download json reports
uses: actions/download-artifact@v3
with:
path: ${{ env.REPORTS_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
- name: Integration test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 integration_test_check.py "$CHECK_NAME"
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"
IntegrationTestsAnalyzerAsan5:
needs: [BuilderDebAsan]
runs-on: [self-hosted, stress-tester]
steps:
- name: Set envs
run: |
cat >> "$GITHUB_ENV" << 'EOF'
TEMP_PATH=${{runner.temp}}/integration_tests_asan
REPORTS_PATH=${{runner.temp}}/reports_dir
CHECK_NAME=Integration tests (asan, analyzer)
REPO_COPY=${{runner.temp}}/integration_tests_asan/ClickHouse
RUN_BY_HASH_NUM=5
RUN_BY_HASH_TOTAL=6
EOF
- name: Download json reports
uses: actions/download-artifact@v3
with:
path: ${{ env.REPORTS_PATH }}
- name: Check out repository code
uses: ClickHouse/checkout@v1
with:
clear-repository: true
- name: Integration test
run: |
sudo rm -fr "$TEMP_PATH"
mkdir -p "$TEMP_PATH"
cp -r "$GITHUB_WORKSPACE" "$TEMP_PATH"
cd "$REPO_COPY/tests/ci"
python3 integration_test_check.py "$CHECK_NAME"
- name: Cleanup
if: always()
run: |
docker ps --quiet | xargs --no-run-if-empty docker kill ||:
docker ps --all --quiet | xargs --no-run-if-empty docker rm -f ||:
sudo rm -fr "$TEMP_PATH"
IntegrationTestsTsan0:
needs: [BuilderDebTsan]
runs-on: [self-hosted, stress-tester]
@ -3963,6 +4173,12 @@ jobs:
- IntegrationTestsAsan3
- IntegrationTestsAsan4
- IntegrationTestsAsan5
- IntegrationTestsAnalyzerAsan0
- IntegrationTestsAnalyzerAsan1
- IntegrationTestsAnalyzerAsan2
- IntegrationTestsAnalyzerAsan3
- IntegrationTestsAnalyzerAsan4
- IntegrationTestsAnalyzerAsan5
- IntegrationTestsRelease0
- IntegrationTestsRelease1
- IntegrationTestsRelease2

View File

@ -5099,6 +5099,12 @@ jobs:
- IntegrationTestsAsan3
- IntegrationTestsAsan4
- IntegrationTestsAsan5
- IntegrationTestsAnalyzerAsan0
- IntegrationTestsAnalyzerAsan1
- IntegrationTestsAnalyzerAsan2
- IntegrationTestsAnalyzerAsan3
- IntegrationTestsAnalyzerAsan4
- IntegrationTestsAnalyzerAsan5
- IntegrationTestsRelease0
- IntegrationTestsRelease1
- IntegrationTestsRelease2

View File

@ -1,43 +1,38 @@
# Usage:
# set (MAX_COMPILER_MEMORY 2000 CACHE INTERNAL "") # In megabytes
# set (MAX_LINKER_MEMORY 3500 CACHE INTERNAL "")
# include (cmake/limit_jobs.cmake)
# Limit compiler/linker job concurrency to avoid OOMs on subtrees where compilation/linking is memory-intensive.
#
# Usage from CMake:
# set (MAX_COMPILER_MEMORY 2000 CACHE INTERNAL "") # megabyte
# set (MAX_LINKER_MEMORY 3500 CACHE INTERNAL "") # megabyte
# include (cmake/limit_jobs.cmake)
#
# (bigger values mean fewer jobs)
cmake_host_system_information(RESULT TOTAL_PHYSICAL_MEMORY QUERY TOTAL_PHYSICAL_MEMORY) # Not available under freebsd
cmake_host_system_information(RESULT TOTAL_PHYSICAL_MEMORY QUERY TOTAL_PHYSICAL_MEMORY)
cmake_host_system_information(RESULT NUMBER_OF_LOGICAL_CORES QUERY NUMBER_OF_LOGICAL_CORES)
# 1 if not set
option(PARALLEL_COMPILE_JOBS "Maximum number of concurrent compilation jobs" "")
# Set to disable the automatic job-limiting
option(PARALLEL_COMPILE_JOBS "Maximum number of concurrent compilation jobs" OFF)
option(PARALLEL_LINK_JOBS "Maximum number of concurrent link jobs" OFF)
# 1 if not set
option(PARALLEL_LINK_JOBS "Maximum number of concurrent link jobs" "")
if (NOT PARALLEL_COMPILE_JOBS AND TOTAL_PHYSICAL_MEMORY AND MAX_COMPILER_MEMORY)
if (NOT PARALLEL_COMPILE_JOBS AND MAX_COMPILER_MEMORY)
math(EXPR PARALLEL_COMPILE_JOBS ${TOTAL_PHYSICAL_MEMORY}/${MAX_COMPILER_MEMORY})
if (NOT PARALLEL_COMPILE_JOBS)
set (PARALLEL_COMPILE_JOBS 1)
endif ()
if (NOT NUMBER_OF_LOGICAL_CORES OR PARALLEL_COMPILE_JOBS LESS NUMBER_OF_LOGICAL_CORES)
set (PARALLEL_COMPILE_JOBS_LESS TRUE)
if (PARALLEL_COMPILE_JOBS LESS NUMBER_OF_LOGICAL_CORES)
message(WARNING "The auto-calculated compile jobs limit (${PARALLEL_COMPILE_JOBS}) underutilizes CPU cores (${NUMBER_OF_LOGICAL_CORES}). Set PARALLEL_COMPILE_JOBS to override.")
endif()
endif ()
if (PARALLEL_COMPILE_JOBS AND (NOT NUMBER_OF_LOGICAL_CORES OR PARALLEL_COMPILE_JOBS LESS NUMBER_OF_LOGICAL_CORES))
set(CMAKE_JOB_POOL_COMPILE compile_job_pool${CMAKE_CURRENT_SOURCE_DIR})
string (REGEX REPLACE "[^a-zA-Z0-9]+" "_" CMAKE_JOB_POOL_COMPILE ${CMAKE_JOB_POOL_COMPILE})
set_property(GLOBAL APPEND PROPERTY JOB_POOLS ${CMAKE_JOB_POOL_COMPILE}=${PARALLEL_COMPILE_JOBS})
endif ()
if (NOT PARALLEL_LINK_JOBS AND TOTAL_PHYSICAL_MEMORY AND MAX_LINKER_MEMORY)
if (NOT PARALLEL_LINK_JOBS AND MAX_LINKER_MEMORY)
math(EXPR PARALLEL_LINK_JOBS ${TOTAL_PHYSICAL_MEMORY}/${MAX_LINKER_MEMORY})
if (NOT PARALLEL_LINK_JOBS)
set (PARALLEL_LINK_JOBS 1)
endif ()
if (NOT NUMBER_OF_LOGICAL_CORES OR PARALLEL_LINK_JOBS LESS NUMBER_OF_LOGICAL_CORES)
set (PARALLEL_LINK_JOBS_LESS TRUE)
if (PARALLEL_LINK_JOBS LESS NUMBER_OF_LOGICAL_CORES)
message(WARNING "The auto-calculated link jobs limit (${PARALLEL_LINK_JOBS}) underutilizes CPU cores (${NUMBER_OF_LOGICAL_CORES}). Set PARALLEL_LINK_JOBS to override.")
endif()
endif ()
@ -52,20 +47,16 @@ if (CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO" AND ENABLE_THINLTO AND PARALLE
set (PARALLEL_LINK_JOBS 2)
endif()
if (PARALLEL_LINK_JOBS AND (NOT NUMBER_OF_LOGICAL_CORES OR PARALLEL_LINK_JOBS LESS NUMBER_OF_LOGICAL_CORES))
message(STATUS "Building sub-tree with ${PARALLEL_COMPILE_JOBS} compile jobs and ${PARALLEL_LINK_JOBS} linker jobs (system: ${NUMBER_OF_LOGICAL_CORES} cores, ${TOTAL_PHYSICAL_MEMORY} MB DRAM, 'OFF' means the native core count).")
if (PARALLEL_COMPILE_JOBS LESS NUMBER_OF_LOGICAL_CORES)
set(CMAKE_JOB_POOL_COMPILE compile_job_pool${CMAKE_CURRENT_SOURCE_DIR})
string (REGEX REPLACE "[^a-zA-Z0-9]+" "_" CMAKE_JOB_POOL_COMPILE ${CMAKE_JOB_POOL_COMPILE})
set_property(GLOBAL APPEND PROPERTY JOB_POOLS ${CMAKE_JOB_POOL_COMPILE}=${PARALLEL_COMPILE_JOBS})
endif ()
if (PARALLEL_LINK_JOBS LESS NUMBER_OF_LOGICAL_CORES)
set(CMAKE_JOB_POOL_LINK link_job_pool${CMAKE_CURRENT_SOURCE_DIR})
string (REGEX REPLACE "[^a-zA-Z0-9]+" "_" CMAKE_JOB_POOL_LINK ${CMAKE_JOB_POOL_LINK})
set_property(GLOBAL APPEND PROPERTY JOB_POOLS ${CMAKE_JOB_POOL_LINK}=${PARALLEL_LINK_JOBS})
endif ()
if (PARALLEL_COMPILE_JOBS OR PARALLEL_LINK_JOBS)
message(STATUS
"${CMAKE_CURRENT_SOURCE_DIR}: Have ${TOTAL_PHYSICAL_MEMORY} megabytes of memory.
Limiting concurrent linkers jobs to ${PARALLEL_LINK_JOBS} and compiler jobs to ${PARALLEL_COMPILE_JOBS} (system has ${NUMBER_OF_LOGICAL_CORES} logical cores)")
if (PARALLEL_COMPILE_JOBS_LESS)
message(WARNING "The autocalculated compile jobs limit (${PARALLEL_COMPILE_JOBS}) underutilizes CPU cores (${NUMBER_OF_LOGICAL_CORES}). Set PARALLEL_COMPILE_JOBS to override.")
endif()
if (PARALLEL_LINK_JOBS_LESS)
message(WARNING "The autocalculated link jobs limit (${PARALLEL_LINK_JOBS}) underutilizes CPU cores (${NUMBER_OF_LOGICAL_CORES}). Set PARALLEL_LINK_JOBS to override.")
endif()
endif ()

2
contrib/cctz vendored

@ -1 +1 @@
Subproject commit 5e05432420f9692418e2e12aff09859e420b14a2
Subproject commit 8529bcef5cd996b7c0f4d7475286b76b5d126c4c

View File

@ -30,7 +30,7 @@ description: In order to effectively mitigate possible human errors, you should
```
:::note ALL
`ALL` is only applicable to the `RESTORE` command prior to version 23.4 of Clickhouse.
Prior to version 23.4 of ClickHouse, `ALL` was only applicable to the `RESTORE` command.
:::
## Background

View File

@ -4524,6 +4524,7 @@ This setting allows to specify renaming pattern for files processed by `file` ta
### Placeholders
- `%a` — Full original filename (e.g., "sample.csv").
- `%f` — Original filename without extension (e.g., "sample").
- `%e` — Original file extension with dot (e.g., ".csv").
- `%t` — Timestamp (in microseconds).

View File

@ -0,0 +1,32 @@
---
slug: /en/sql-reference/aggregate-functions/reference/array_concat_agg
sidebar_position: 110
---
# array_concat_agg
- Alias of `groupArrayArray`. The function is case insensitive.
**Example**
```text
SELECT *
FROM t
┌─a───────┐
│ [1,2,3] │
│ [4,5] │
│ [6] │
└─────────┘
```
Query:
```sql
SELECT array_concat_agg(a) AS a
FROM t
┌─a─────────────┐
│ [1,2,3,4,5,6] │
└───────────────┘
```

View File

@ -722,7 +722,7 @@ SELECT toDate('2016-12-27') AS date, toYearWeek(date) AS yearWeek0, toYearWeek(d
## age
Returns the `unit` component of the difference between `startdate` and `enddate`. The difference is calculated using a precision of 1 second.
Returns the `unit` component of the difference between `startdate` and `enddate`. The difference is calculated using a precision of 1 microsecond.
E.g. the difference between `2021-12-29` and `2022-01-01` is 3 days for `day` unit, 0 months for `month` unit, 0 years for `year` unit.
For an alternative to `age`, see function `date\_diff`.
@ -738,6 +738,8 @@ age('unit', startdate, enddate, [timezone])
- `unit` — The type of interval for result. [String](../../sql-reference/data-types/string.md).
Possible values:
- `microsecond` (possible abbreviations: `us`, `u`)
- `millisecond` (possible abbreviations: `ms`)
- `second` (possible abbreviations: `ss`, `s`)
- `minute` (possible abbreviations: `mi`, `n`)
- `hour` (possible abbreviations: `hh`, `h`)
@ -813,6 +815,8 @@ Aliases: `dateDiff`, `DATE_DIFF`, `timestampDiff`, `timestamp_diff`, `TIMESTAMP_
- `unit` — The type of interval for result. [String](../../sql-reference/data-types/string.md).
Possible values:
- `microsecond` (possible abbreviations: `us`, `u`)
- `millisecond` (possible abbreviations: `ms`)
- `second` (possible abbreviations: `ss`, `s`)
- `minute` (possible abbreviations: `mi`, `n`)
- `hour` (possible abbreviations: `hh`, `h`)

View File

@ -1267,3 +1267,36 @@ Like [initcap](#initcap), assuming that the string contains valid UTF-8 encoded
Does not detect the language, e.g. for Turkish the result might not be exactly correct (i/İ vs. i/I).
If the length of the UTF-8 byte sequence is different for upper and lower case of a code point, the result may be incorrect for this code point.
## firstLine
Returns the first line from a multi-line string.
**Syntax**
```sql
firstLine(val)
```
**Arguments**
- `val` - Input value. [String](../data-types/string.md)
**Returned value**
- The first line of the input value or the whole value if there is no line
separators. [String](../data-types/string.md)
**Example**
```sql
select firstLine('foo\nbar\nbaz');
```
Result:
```result
┌─firstLine('foo\nbar\nbaz')─┐
│ foo │
└────────────────────────────┘
```

View File

@ -5,7 +5,27 @@ sidebar_label: WITH
# WITH Clause
ClickHouse supports Common Table Expressions ([CTE](https://en.wikipedia.org/wiki/Hierarchical_and_recursive_queries_in_SQL)), that is provides to use results of `WITH` clause in the rest of `SELECT` query. Named subqueries can be included to the current and child query context in places where table objects are allowed. Recursion is prevented by hiding the current level CTEs from the WITH expression.
ClickHouse supports Common Table Expressions ([CTE](https://en.wikipedia.org/wiki/Hierarchical_and_recursive_queries_in_SQL)) and substitutes the code defined in the `WITH` clause in all places of use for the rest of `SELECT` query. Named subqueries can be included to the current and child query context in places where table objects are allowed. Recursion is prevented by hiding the current level CTEs from the WITH expression.
Please note that CTEs do not guarantee the same results in all places they are called because the query will be re-executed for each use case.
An example of such behavior is below
``` sql
with cte_numbers as
(
select
num
from generateRandom('num UInt64', NULL)
limit 1000000
)
select
count()
from cte_numbers
where num in (select num from cte_numbers)
```
If CTEs were to pass exactly the results and not just a piece of code, you would always see `1000000`
However, due to the fact that we are referring `cte_numbers` twice, random numbers are generated each time and, accordingly, we see different random results, `280501, 392454, 261636, 196227` and so on...
## Syntax

View File

@ -134,7 +134,7 @@ Multiple path components can have globs. For being processed file must exist and
- `*` — Substitutes any number of any characters except `/` including empty string.
- `?` — Substitutes any single character.
- `{some_string,another_string,yet_another_one}` — Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`.
- `{some_string,another_string,yet_another_one}` — Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`, including `/`.
- `{N..M}` — Substitutes any number in range from N to M including both borders.
- `**` - Fetches all files inside the folder recursively.

View File

@ -4201,6 +4201,7 @@ SELECT *, timezone() FROM test_tz WHERE d = '2000-01-01 00:00:00' SETTINGS sessi
### Шаблон
Шаблон поддерживает следующие виды плейсхолдеров:
- `%a` — Полное исходное имя файла (например "sample.csv").
- `%f` — Исходное имя файла без расширения (например "sample").
- `%e` — Оригинальное расширение файла с точкой (например ".csv").
- `%t` — Текущее время (в микросекундах).

View File

@ -625,7 +625,7 @@ SELECT toDate('2016-12-27') AS date, toYearWeek(date) AS yearWeek0, toYearWeek(d
## age
Вычисляет компонент `unit` разницы между `startdate` и `enddate`. Разница вычисляется с точностью в 1 секунду.
Вычисляет компонент `unit` разницы между `startdate` и `enddate`. Разница вычисляется с точностью в 1 микросекунду.
Например, разница между `2021-12-29` и `2022-01-01` 3 дня для единицы `day`, 0 месяцев для единицы `month`, 0 лет для единицы `year`.
**Синтаксис**
@ -639,6 +639,8 @@ age('unit', startdate, enddate, [timezone])
- `unit` — единица измерения времени, в которой будет выражено возвращаемое значение функции. [String](../../sql-reference/data-types/string.md).
Возможные значения:
- `microsecond` (возможные сокращения: `us`, `u`)
- `millisecond` (возможные сокращения: `ms`)
- `second` (возможные сокращения: `ss`, `s`)
- `minute` (возможные сокращения: `mi`, `n`)
- `hour` (возможные сокращения: `hh`, `h`)
@ -712,6 +714,8 @@ date_diff('unit', startdate, enddate, [timezone])
- `unit` — единица измерения времени, в которой будет выражено возвращаемое значение функции. [String](../../sql-reference/data-types/string.md).
Возможные значения:
- `microsecond` (возможные сокращения: `us`, `u`)
- `millisecond` (возможные сокращения: `ms`)
- `second` (возможные сокращения: `ss`, `s`)
- `minute` (возможные сокращения: `mi`, `n`)
- `hour` (возможные сокращения: `hh`, `h`)

View File

@ -1124,3 +1124,39 @@ Do Nothing for 2 Minutes 2:00 &nbsp;
Не учитывает язык. То есть, для турецкого языка, результат может быть не совсем верным.
Если длина UTF-8 последовательности байтов различна для верхнего и нижнего регистра кодовой точки, то для этой кодовой точки результат работы может быть некорректным.
Если строка содержит набор байтов, не являющийся UTF-8, то поведение не определено.
## firstLine
Возвращает первую строку в многострочном тексте.
**Синтаксис**
```sql
firstLine(val)
```
**Аргументы**
- `val` - текст для обработки. [String](../data-types/string.md)
**Returned value**
- Первая строка текста или весь текст, если переносы строк отсутствуют.
Тип: [String](../data-types/string.md)
**Пример**
Запрос:
```sql
select firstLine('foo\nbar\nbaz');
```
Результат:
```result
┌─firstLine('foo\nbar\nbaz')─┐
│ foo │
└────────────────────────────┘
```

View File

@ -79,7 +79,7 @@ SELECT * FROM file('test.csv', 'CSV', 'column1 UInt32, column2 UInt32, column3 U
- `*` — заменяет любое количество любых символов кроме `/`, включая отсутствие символов.
- `?` — заменяет ровно один любой символ.
- `{some_string,another_string,yet_another_one}` — заменяет любую из строк `'some_string', 'another_string', 'yet_another_one'`.
- `{some_string,another_string,yet_another_one}` — заменяет любую из строк `'some_string', 'another_string', 'yet_another_one'`, причём строка может содержать `/`.
- `{N..M}` — заменяет любое число в интервале от `N` до `M` включительно (может содержать ведущие нули).
Конструкция с `{}` аналогична табличной функции [remote](remote.md).

View File

@ -643,6 +643,8 @@ date_diff('unit', startdate, enddate, [timezone])
- `unit``value`对应的时间单位。类型为[String](../../sql-reference/data-types/string.md)。
可能的值:
- `microsecond`
- `millisecond`
- `second`
- `minute`
- `hour`

View File

@ -888,6 +888,7 @@ try
#endif
global_context->setRemoteHostFilter(config());
global_context->setHTTPHeaderFilter(config());
std::string path_str = getCanonicalPath(config().getString("path", DBMS_DEFAULT_PATH));
fs::path path = path_str;
@ -1201,6 +1202,7 @@ try
}
global_context->setRemoteHostFilter(*config);
global_context->setHTTPHeaderFilter(*config);
global_context->setMaxTableSizeToDrop(server_settings_.max_table_size_to_drop);
global_context->setMaxPartitionSizeToDrop(server_settings_.max_partition_size_to_drop);

View File

@ -873,6 +873,14 @@
-->
<!--</remote_url_allow_hosts>-->
<!-- The list of HTTP headers forbidden to use in HTTP-related storage engines and table functions.
If this section is not present in configuration, all headers are allowed.
-->
<!-- <http_forbid_headers>
<header>exact_header</header>
<header_regexp>(?i)(case_insensitive_header)</header_regexp>
</http_forbid_headers> -->
<!-- If element has 'incl' attribute, then for it's value will be used corresponding substitution from another file.
By default, path to file with substitutions is /etc/metrika.xml. It could be changed in config in 'include_from' element.
Values for substitutions are specified in /clickhouse/name_of_substitution elements in that file.

View File

@ -222,7 +222,6 @@ AggregateFunctionPtr AggregateFunctionFactory::tryGet(
: nullptr;
}
std::optional<AggregateFunctionProperties> AggregateFunctionFactory::tryGetProperties(String name) const
{
if (name.size() > MAX_AGGREGATE_FUNCTION_NAME_LENGTH)

View File

@ -126,6 +126,7 @@ void registerAggregateFunctionGroupArray(AggregateFunctionFactory & factory)
factory.registerFunction("groupArray", { createAggregateFunctionGroupArray<false>, properties });
factory.registerAlias("array_agg", "groupArray", AggregateFunctionFactory::CaseInsensitive);
factory.registerAliasUnchecked("array_concat_agg", "groupArrayArray", AggregateFunctionFactory::CaseInsensitive);
factory.registerFunction("groupArraySample", { createAggregateFunctionGroupArraySample, properties });
factory.registerFunction("groupArrayLast", { createAggregateFunctionGroupArray<true>, properties });
}

View File

@ -1,10 +1,25 @@
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionGroupArrayMoving.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnArray.h>
#include <Common/ArenaAllocator.h>
#include <Common/assert_cast.h>
#include <type_traits>
#define AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE 0xFFFFFF
namespace DB
@ -13,11 +28,186 @@ struct Settings;
namespace ErrorCodes
{
extern const int TOO_LARGE_ARRAY_SIZE;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
template <typename T>
struct MovingData
{
/// For easy serialization.
static_assert(std::has_unique_object_representations_v<T> || std::is_floating_point_v<T>);
using Accumulator = T;
/// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
using Array = PODArray<T, 32, Allocator>;
Array value; /// Prefix sums.
T sum{};
void NO_SANITIZE_UNDEFINED add(T val, Arena * arena)
{
sum += val;
value.push_back(sum, arena);
}
};
template <typename T>
struct MovingSumData : public MovingData<T>
{
static constexpr auto name = "groupArrayMovingSum";
T NO_SANITIZE_UNDEFINED get(size_t idx, UInt64 window_size) const
{
if (idx < window_size)
return this->value[idx];
else
return this->value[idx] - this->value[idx - window_size];
}
};
template <typename T>
struct MovingAvgData : public MovingData<T>
{
static constexpr auto name = "groupArrayMovingAvg";
T NO_SANITIZE_UNDEFINED get(size_t idx, UInt64 window_size) const
{
if (idx < window_size)
return this->value[idx] / T(window_size);
else
return (this->value[idx] - this->value[idx - window_size]) / T(window_size);
}
};
template <typename T, typename LimitNumElements, typename Data>
class MovingImpl final
: public IAggregateFunctionDataHelper<Data, MovingImpl<T, LimitNumElements, Data>>
{
static constexpr bool limit_num_elems = LimitNumElements::value;
UInt64 window_size;
public:
using ResultT = typename Data::Accumulator;
using ColumnSource = ColumnVectorOrDecimal<T>;
/// Probably for overflow function in the future.
using ColumnResult = ColumnVectorOrDecimal<ResultT>;
explicit MovingImpl(const DataTypePtr & data_type_, UInt64 window_size_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<Data, MovingImpl<T, LimitNumElements, Data>>({data_type_}, {}, createResultType(data_type_))
, window_size(window_size_) {}
String getName() const override { return Data::name; }
static DataTypePtr createResultType(const DataTypePtr & argument)
{
return std::make_shared<DataTypeArray>(getReturnTypeElement(argument));
}
void NO_SANITIZE_UNDEFINED add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
auto value = static_cast<const ColumnSource &>(*columns[0]).getData()[row_num];
this->data(place).add(static_cast<ResultT>(value), arena);
}
void NO_SANITIZE_UNDEFINED merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & cur_elems = this->data(place);
auto & rhs_elems = this->data(rhs);
size_t cur_size = cur_elems.value.size();
if (rhs_elems.value.size())
cur_elems.value.insert(rhs_elems.value.begin(), rhs_elems.value.end(), arena);
for (size_t i = cur_size; i < cur_elems.value.size(); ++i)
{
cur_elems.value[i] += cur_elems.sum;
}
cur_elems.sum += rhs_elems.sum;
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
const auto & value = this->data(place).value;
size_t size = value.size();
writeVarUInt(size, buf);
buf.write(reinterpret_cast<const char *>(value.data()), size * sizeof(value[0]));
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
{
size_t size = 0;
readVarUInt(size, buf);
if (unlikely(size > AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE,
"Too large array size (maximum: {})", AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE);
if (size > 0)
{
auto & value = this->data(place).value;
value.resize(size, arena);
buf.readStrict(reinterpret_cast<char *>(value.data()), size * sizeof(value[0]));
this->data(place).sum = value.back();
}
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
const auto & data = this->data(place);
size_t size = data.value.size();
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
offsets_to.push_back(offsets_to.back() + size);
if (size)
{
typename ColumnResult::Container & data_to = assert_cast<ColumnResult &>(arr_to.getData()).getData();
for (size_t i = 0; i < size; ++i)
{
if (!limit_num_elems)
{
data_to.push_back(data.get(i, size));
}
else
{
data_to.push_back(data.get(i, window_size));
}
}
}
}
bool allocatesMemoryInArena() const override
{
return true;
}
private:
static auto getReturnTypeElement(const DataTypePtr & argument)
{
if constexpr (!is_decimal<ResultT>)
return std::make_shared<DataTypeNumber<ResultT>>();
else
{
using Res = DataTypeDecimal<ResultT>;
return std::make_shared<Res>(Res::maxPrecision(), getDecimalScale(*argument));
}
}
};
namespace
{
@ -79,7 +269,7 @@ AggregateFunctionPtr createAggregateFunctionMoving(
if (type != Field::Types::Int64 && type != Field::Types::UInt64)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter for aggregate function {} should be positive integer", name);
if ((type == Field::Types::Int64 && parameters[0].get<Int64>() < 0) ||
if ((type == Field::Types::Int64 && parameters[0].get<Int64>() <= 0) ||
(type == Field::Types::UInt64 && parameters[0].get<UInt64>() == 0))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter for aggregate function {} should be positive integer", name);

View File

@ -1,207 +0,0 @@
#pragma once
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypesDecimal.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnArray.h>
#include <Common/ArenaAllocator.h>
#include <Common/assert_cast.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <type_traits>
#define AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE 0xFFFFFF
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int TOO_LARGE_ARRAY_SIZE;
}
template <typename T>
struct MovingData
{
/// For easy serialization.
static_assert(std::has_unique_object_representations_v<T> || std::is_floating_point_v<T>);
using Accumulator = T;
/// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
using Array = PODArray<T, 32, Allocator>;
Array value; /// Prefix sums.
T sum{};
void NO_SANITIZE_UNDEFINED add(T val, Arena * arena)
{
sum += val;
value.push_back(sum, arena);
}
};
template <typename T>
struct MovingSumData : public MovingData<T>
{
static constexpr auto name = "groupArrayMovingSum";
T NO_SANITIZE_UNDEFINED get(size_t idx, UInt64 window_size) const
{
if (idx < window_size)
return this->value[idx];
else
return this->value[idx] - this->value[idx - window_size];
}
};
template <typename T>
struct MovingAvgData : public MovingData<T>
{
static constexpr auto name = "groupArrayMovingAvg";
T NO_SANITIZE_UNDEFINED get(size_t idx, UInt64 window_size) const
{
if (idx < window_size)
return this->value[idx] / T(window_size);
else
return (this->value[idx] - this->value[idx - window_size]) / T(window_size);
}
};
template <typename T, typename LimitNumElements, typename Data>
class MovingImpl final
: public IAggregateFunctionDataHelper<Data, MovingImpl<T, LimitNumElements, Data>>
{
static constexpr bool limit_num_elems = LimitNumElements::value;
UInt64 window_size;
public:
using ResultT = typename Data::Accumulator;
using ColumnSource = ColumnVectorOrDecimal<T>;
/// Probably for overflow function in the future.
using ColumnResult = ColumnVectorOrDecimal<ResultT>;
explicit MovingImpl(const DataTypePtr & data_type_, UInt64 window_size_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<Data, MovingImpl<T, LimitNumElements, Data>>({data_type_}, {}, createResultType(data_type_))
, window_size(window_size_) {}
String getName() const override { return Data::name; }
static DataTypePtr createResultType(const DataTypePtr & argument)
{
return std::make_shared<DataTypeArray>(getReturnTypeElement(argument));
}
void NO_SANITIZE_UNDEFINED add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
auto value = static_cast<const ColumnSource &>(*columns[0]).getData()[row_num];
this->data(place).add(static_cast<ResultT>(value), arena);
}
void NO_SANITIZE_UNDEFINED merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & cur_elems = this->data(place);
auto & rhs_elems = this->data(rhs);
size_t cur_size = cur_elems.value.size();
if (rhs_elems.value.size())
cur_elems.value.insert(rhs_elems.value.begin(), rhs_elems.value.end(), arena);
for (size_t i = cur_size; i < cur_elems.value.size(); ++i)
{
cur_elems.value[i] += cur_elems.sum;
}
cur_elems.sum += rhs_elems.sum;
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
const auto & value = this->data(place).value;
size_t size = value.size();
writeVarUInt(size, buf);
buf.write(reinterpret_cast<const char *>(value.data()), size * sizeof(value[0]));
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
{
size_t size = 0;
readVarUInt(size, buf);
if (unlikely(size > AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE,
"Too large array size (maximum: {})", AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE);
if (size > 0)
{
auto & value = this->data(place).value;
value.resize(size, arena);
buf.readStrict(reinterpret_cast<char *>(value.data()), size * sizeof(value[0]));
this->data(place).sum = value.back();
}
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
const auto & data = this->data(place);
size_t size = data.value.size();
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
offsets_to.push_back(offsets_to.back() + size);
if (size)
{
typename ColumnResult::Container & data_to = assert_cast<ColumnResult &>(arr_to.getData()).getData();
for (size_t i = 0; i < size; ++i)
{
if (!limit_num_elems)
{
data_to.push_back(data.get(i, size));
}
else
{
data_to.push_back(data.get(i, window_size));
}
}
}
}
bool allocatesMemoryInArena() const override
{
return true;
}
private:
static auto getReturnTypeElement(const DataTypePtr & argument)
{
if constexpr (!is_decimal<ResultT>)
return std::make_shared<DataTypeNumber<ResultT>>();
else
{
using Res = DataTypeDecimal<ResultT>;
return std::make_shared<Res>(Res::maxPrecision(), getDecimalScale(*argument));
}
}
};
#undef AGGREGATE_FUNCTION_MOVING_MAX_ARRAY_SIZE
}

View File

@ -319,24 +319,21 @@ Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callbac
throw Exception(ErrorCodes::NO_AVAILABLE_REPLICA, "Logical error: no available replica");
Packet packet;
try
{
AsyncCallbackSetter async_setter(current_connection, std::move(async_callback));
try
packet = current_connection->receivePacket();
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_SERVER)
{
packet = current_connection->receivePacket();
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_PACKET_FROM_SERVER)
{
/// Exception may happen when packet is received, e.g. when got unknown packet.
/// In this case, invalidate replica, so that we would not read from it anymore.
current_connection->disconnect();
invalidateReplica(state);
}
throw;
/// Exception may happen when packet is received, e.g. when got unknown packet.
/// In this case, invalidate replica, so that we would not read from it anymore.
current_connection->disconnect();
invalidateReplica(state);
}
throw;
}
switch (packet.type)

View File

@ -848,6 +848,9 @@ ASTs QueryFuzzer::getDropQueriesForFuzzedTables(const ASTDropQuery & drop_query)
void QueryFuzzer::notifyQueryFailed(ASTPtr ast)
{
if (ast == nullptr)
return;
auto remove_fuzzed_table = [this](const auto & table_name)
{
auto pos = table_name.find("__fuzz_");

View File

@ -5,7 +5,6 @@ namespace DB
AsyncTaskExecutor::AsyncTaskExecutor(std::unique_ptr<AsyncTask> task_) : task(std::move(task_))
{
createFiber();
}
void AsyncTaskExecutor::resume()
@ -13,6 +12,10 @@ void AsyncTaskExecutor::resume()
if (routine_is_finished)
return;
/// Create fiber lazily on first resume() call.
if (!fiber)
createFiber();
if (!checkBeforeTaskResume())
return;
@ -22,6 +25,11 @@ void AsyncTaskExecutor::resume()
return;
resumeUnlocked();
/// Destroy fiber when it's finished.
if (routine_is_finished)
destroyFiber();
if (exception)
processException(exception);
}
@ -46,9 +54,8 @@ void AsyncTaskExecutor::cancel()
void AsyncTaskExecutor::restart()
{
std::lock_guard guard(fiber_lock);
if (fiber)
if (!routine_is_finished)
destroyFiber();
createFiber();
routine_is_finished = false;
}

View File

@ -10,7 +10,6 @@
#include <cassert>
#include <chrono>
#include <cstring>
#include <iostream>
#include <memory>

View File

@ -47,6 +47,7 @@ String FileRenamer::generateNewFilename(const String & filename) const
// Define placeholders and their corresponding values
std::map<String, String> placeholders =
{
{"%a", filename},
{"%f", file_base},
{"%e", file_ext},
{"%t", timestamp},
@ -69,16 +70,17 @@ bool FileRenamer::isEmpty() const
bool FileRenamer::validateRenamingRule(const String & rule, bool throw_on_error)
{
// Check if the rule contains invalid placeholders
re2::RE2 invalid_placeholder_pattern("^([^%]|%[fet%])*$");
re2::RE2 invalid_placeholder_pattern("^([^%]|%[afet%])*$");
if (!re2::RE2::FullMatch(rule, invalid_placeholder_pattern))
{
if (throw_on_error)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid renaming rule: Allowed placeholders only %f, %e, %t, and %%");
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Invalid renaming rule: Allowed placeholders only %a, %f, %e, %t, and %%");
return false;
}
// Replace valid placeholders with empty strings and count remaining percentage signs.
String replaced_rule = rule;
boost::replace_all(replaced_rule, "%a", "");
boost::replace_all(replaced_rule, "%f", "");
boost::replace_all(replaced_rule, "%e", "");
boost::replace_all(replaced_rule, "%t", "");

View File

@ -9,6 +9,7 @@ namespace DB
/**
* The FileRenamer class provides functionality for renaming files based on given pattern with placeholders
* The supported placeholders are:
* %a - Full original file name ("sample.csv")
* %f - Original filename without extension ("sample")
* %e - Original file extension with dot (".csv")
* %t - Timestamp (in microseconds)

View File

@ -0,0 +1,56 @@
#include <Common/HTTPHeaderFilter.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/Exception.h>
#include <re2/re2.h>
namespace DB
{
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
void HTTPHeaderFilter::checkHeaders(const HTTPHeaderEntries & entries) const
{
std::lock_guard guard(mutex);
for (const auto & entry : entries)
{
if (forbidden_headers.contains(entry.name))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HTTP header \"{}\" is forbidden in configuration file, "
"see <http_forbid_headers>", entry.name);
for (const auto & header_regex : forbidden_headers_regexp)
if (re2::RE2::FullMatch(entry.name, header_regex))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "HTTP header \"{}\" is forbidden in configuration file, "
"see <http_forbid_headers>", entry.name);
}
}
void HTTPHeaderFilter::setValuesFromConfig(const Poco::Util::AbstractConfiguration & config)
{
std::lock_guard guard(mutex);
if (config.has("http_forbid_headers"))
{
std::vector<std::string> keys;
config.keys("http_forbid_headers", keys);
for (const auto & key : keys)
{
if (startsWith(key, "header_regexp"))
forbidden_headers_regexp.push_back(config.getString("http_forbid_headers." + key));
else if (startsWith(key, "header"))
forbidden_headers.insert(config.getString("http_forbid_headers." + key));
}
}
else
{
forbidden_headers.clear();
forbidden_headers_regexp.clear();
}
}
}

View File

@ -0,0 +1,27 @@
#pragma once
#include <IO/HTTPHeaderEntries.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <vector>
#include <unordered_set>
#include <mutex>
namespace DB
{
class HTTPHeaderFilter
{
public:
void setValuesFromConfig(const Poco::Util::AbstractConfiguration & config);
void checkHeaders(const HTTPHeaderEntries & entries) const;
private:
std::unordered_set<std::string> forbidden_headers;
std::vector<std::string> forbidden_headers_regexp;
mutable std::mutex mutex;
};
}

View File

@ -52,35 +52,38 @@ public:
{
const auto & creator_map = getMap();
const auto & case_insensitive_creator_map = getCaseInsensitiveMap();
const String factory_name = getFactoryName();
String real_dict_name;
if (creator_map.count(real_name))
real_dict_name = real_name;
else if (auto real_name_lowercase = Poco::toLower(real_name); case_insensitive_creator_map.count(real_name_lowercase))
real_dict_name = real_name_lowercase;
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}: can't create alias '{}', the real name '{}' is not registered",
factory_name, alias_name, real_name);
auto real_name_lowercase = Poco::toLower(real_name);
if (!creator_map.contains(real_name) && !case_insensitive_creator_map.contains(real_name_lowercase))
throw Exception(
ErrorCodes::LOGICAL_ERROR,
"{}: can't create alias '{}', the real name '{}' is not registered",
getFactoryName(),
alias_name,
real_name);
registerAliasUnchecked(alias_name, real_name, case_sensitiveness);
}
/// We need sure the real_name exactly exists when call the function directly.
void registerAliasUnchecked(const String & alias_name, const String & real_name, CaseSensitiveness case_sensitiveness = CaseSensitive)
{
String alias_name_lowercase = Poco::toLower(alias_name);
if (creator_map.count(alias_name) || case_insensitive_creator_map.count(alias_name_lowercase))
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}: the alias name '{}' is already registered as real name",
factory_name, alias_name);
String real_name_lowercase = Poco::toLower(real_name);
const String factory_name = getFactoryName();
if (case_sensitiveness == CaseInsensitive)
{
if (!case_insensitive_aliases.emplace(alias_name_lowercase, real_dict_name).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}: case insensitive alias name '{}' is not unique",
factory_name, alias_name);
if (!case_insensitive_aliases.emplace(alias_name_lowercase, real_name).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}: case insensitive alias name '{}' is not unique", factory_name, alias_name);
case_insensitive_name_mapping[alias_name_lowercase] = real_name;
}
if (!aliases.emplace(alias_name, real_dict_name).second)
if (!aliases.emplace(alias_name, real_name).second)
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}: alias name '{}' is not unique", factory_name, alias_name);
}
std::vector<String> getAllRegisteredNames() const override
{
std::vector<String> result;
@ -93,7 +96,7 @@ public:
bool isCaseInsensitive(const String & name) const
{
String name_lowercase = Poco::toLower(name);
return getCaseInsensitiveMap().count(name_lowercase) || case_insensitive_aliases.count(name_lowercase);
return getCaseInsensitiveMap().contains(name_lowercase) || case_insensitive_aliases.contains(name_lowercase);
}
const String & aliasTo(const String & name) const
@ -106,14 +109,11 @@ public:
throw Exception(ErrorCodes::LOGICAL_ERROR, "{}: name '{}' is not alias", getFactoryName(), name);
}
bool isAlias(const String & name) const
{
return aliases.count(name) || case_insensitive_aliases.contains(name);
}
bool isAlias(const String & name) const { return aliases.contains(name) || case_insensitive_aliases.contains(name); }
bool hasNameOrAlias(const String & name) const
{
return getMap().count(name) || getCaseInsensitiveMap().count(name) || isAlias(name);
return getMap().contains(name) || getCaseInsensitiveMap().contains(name) || isAlias(name);
}
/// Return the canonical name (the name used in registration) if it's different from `name`.
@ -129,7 +129,7 @@ public:
private:
using InnerMap = std::unordered_map<String, Value>; // name -> creator
using AliasMap = std::unordered_map<String, String>; // alias -> original type
using AliasMap = std::unordered_map<String, String>; // alias -> original name
virtual const InnerMap & getMap() const = 0;
virtual const InnerMap & getCaseInsensitiveMap() const = 0;

View File

@ -25,8 +25,6 @@ void Pool::Entry::incrementRefCount()
/// First reference, initialize thread
if (data->ref_count.fetch_add(1) == 0)
mysql_thread_init();
chassert(!data->removed_from_pool);
}
@ -43,7 +41,10 @@ void Pool::Entry::decrementRefCount()
/// In Pool::Entry::disconnect() we remove connection from the list of pool's connections.
/// So now we must deallocate the memory.
if (data->removed_from_pool)
{
data->conn.disconnect();
::delete data;
}
}
}
@ -230,8 +231,6 @@ void Pool::removeConnection(Connection* connection)
std::lock_guard lock(mutex);
if (connection)
{
if (!connection->removed_from_pool)
connection->conn.disconnect();
connections.remove(connection);
connection->removed_from_pool = true;
}
@ -240,6 +239,7 @@ void Pool::removeConnection(Connection* connection)
void Pool::Entry::disconnect()
{
// Remove the Entry from the Pool. Actual disconnection is delayed until refcount == 0.
pool->removeConnection(data);
}

View File

@ -48,7 +48,11 @@ inline auto scaleMultiplier(UInt32 scale)
/** Components of DecimalX value:
* whole - represents whole part of decimal, can be negative or positive.
* fractional - for fractional part of decimal, always positive.
* fractional - for fractional part of decimal.
*
* 0.123 represents 0 / 0.123
* -0.123 represents 0 / -0.123
* -1.123 represents -1 / 0.123
*/
template <typename DecimalType>
struct DecimalComponents

View File

@ -577,6 +577,7 @@ class IColumn;
M(Bool, optimize_skip_merged_partitions, false, "Skip partitions with one part with level > 0 in optimize final", 0) \
M(Bool, optimize_on_insert, true, "Do the same transformation for inserted block of data as if merge was done on this block.", 0) \
M(Bool, optimize_use_projections, true, "Automatically choose projections to perform SELECT query", 0) ALIAS(allow_experimental_projection_optimization) \
M(Bool, optimize_use_implicit_projections, false, "Automatically choose implicit projections to perform SELECT query", 0) \
M(Bool, force_optimize_projection, false, "If projection optimization is enabled, SELECT queries need to use projection", 0) \
M(Bool, async_socket_for_remote, true, "Asynchronously read from socket executing remote query", 0) \
M(Bool, async_query_sending_for_remote, true, "Asynchronously create connections and send query to shards in remote query", 0) \
@ -736,7 +737,7 @@ class IColumn;
M(String, workload, "default", "Name of workload to be used to access resources", 0) \
M(Milliseconds, storage_system_stack_trace_pipe_read_timeout_ms, 100, "Maximum time to read from a pipe for receiving information from the threads when querying the `system.stack_trace` table. This setting is used for testing purposes and not meant to be changed by users.", 0) \
\
M(String, rename_files_after_processing, "", "Rename successfully processed files according to the specified pattern; Pattern can include the following placeholders: `%f` (original filename without extension), `%e` (file extension with dot), `%t` (current timestamp in µs), and `%%` (% sign)", 0) \
M(String, rename_files_after_processing, "", "Rename successfully processed files according to the specified pattern; Pattern can include the following placeholders: `%a` (full original file name), `%f` (original filename without extension), `%e` (file extension with dot), `%t` (current timestamp in µs), and `%%` (% sign)", 0) \
\
M(Bool, parallelize_output_from_storages, true, "Parallelize output for reading step from storage. It allows parallelizing query processing right after reading from storage if possible", 0) \
M(String, insert_deduplication_token, "", "If not empty, used for duplicate detection instead of data digest", 0) \
@ -774,6 +775,7 @@ class IColumn;
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(UInt64, extract_kvp_max_pairs_per_row, 1000, "Max number pairs that can be produced by extractKeyValuePairs function. Used to safeguard against consuming too much memory.", 0) \
M(Timezone, session_timezone, "", "The default timezone for current session or query. The server default timezone if empty.", 0) \
M(Bool, allow_create_index_without_type, false, "Allow CREATE INDEX query without TYPE. Query will be ignored. Made for SQL compatibility tests.", 0)\
// End of COMMON_SETTINGS
// Please add settings related to formats into the FORMAT_FACTORY_SETTINGS and move obsolete settings to OBSOLETE_SETTINGS.

View File

@ -80,6 +80,7 @@ namespace SettingsChangesHistory
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> settings_changes_history =
{
{"23.7", {{"optimize_use_implicit_projections", true, false, "Disable implicit projections due to unexpected results."}}},
{"23.6", {{"http_send_timeout", 180, 30, "3 minutes seems crazy long. Note that this is timeout for a single network write call, not for the whole upload operation."},
{"http_receive_timeout", 180, 30, "See http_send_timeout."}}},
{"23.5", {{"input_format_parquet_preserve_order", true, false, "Allow Parquet reader to reorder rows for better parallelism."},

View File

@ -3,6 +3,7 @@
#if USE_MYSQL
#include <Databases/MySQL/MaterializedMySQLSyncThread.h>
#include <Databases/MySQL/tryParseTableIDFromDDL.h>
#include <cstdlib>
#include <random>
#include <string_view>
@ -151,61 +152,6 @@ static void checkMySQLVariables(const mysqlxx::Pool::Entry & connection, const S
}
}
static std::tuple<String, String> tryExtractTableNameFromDDL(const String & ddl)
{
String table_name;
String database_name;
if (ddl.empty()) return std::make_tuple(database_name, table_name);
bool parse_failed = false;
Tokens tokens(ddl.data(), ddl.data() + ddl.size());
IParser::Pos pos(tokens, 0);
Expected expected;
ASTPtr res;
ASTPtr table;
if (ParserKeyword("CREATE TEMPORARY TABLE").ignore(pos, expected) || ParserKeyword("CREATE TABLE").ignore(pos, expected))
{
ParserKeyword("IF NOT EXISTS").ignore(pos, expected);
if (!ParserCompoundIdentifier(true).parse(pos, table, expected))
parse_failed = true;
}
else if (ParserKeyword("ALTER TABLE").ignore(pos, expected))
{
if (!ParserCompoundIdentifier(true).parse(pos, table, expected))
parse_failed = true;
}
else if (ParserKeyword("DROP TABLE").ignore(pos, expected) || ParserKeyword("DROP TEMPORARY TABLE").ignore(pos, expected))
{
ParserKeyword("IF EXISTS").ignore(pos, expected);
if (!ParserCompoundIdentifier(true).parse(pos, table, expected))
parse_failed = true;
}
else if (ParserKeyword("TRUNCATE").ignore(pos, expected))
{
ParserKeyword("TABLE").ignore(pos, expected);
if (!ParserCompoundIdentifier(true).parse(pos, table, expected))
parse_failed = true;
}
else if (ParserKeyword("RENAME TABLE").ignore(pos, expected))
{
if (!ParserCompoundIdentifier(true).parse(pos, table, expected))
parse_failed = true;
}
else
{
parse_failed = true;
}
if (!parse_failed)
{
if (auto table_id = table->as<ASTTableIdentifier>()->getTableId())
{
database_name = table_id.database_name;
table_name = table_id.table_name;
}
}
return std::make_tuple(database_name, table_name);
}
MaterializedMySQLSyncThread::MaterializedMySQLSyncThread(
ContextPtr context_,
const String & database_name_,
@ -868,14 +814,12 @@ void MaterializedMySQLSyncThread::executeDDLAtomic(const QueryEvent & query_even
String query = query_event.query;
if (!materialized_tables_list.empty())
{
auto [ddl_database_name, ddl_table_name] = tryExtractTableNameFromDDL(query_event.query);
if (!ddl_table_name.empty())
auto table_id = tryParseTableIDFromDDL(query, query_event.schema);
if (!table_id.table_name.empty())
{
ddl_database_name = ddl_database_name.empty() ? query_event.schema: ddl_database_name;
if (ddl_database_name != mysql_database_name || !materialized_tables_list.contains(ddl_table_name))
if (table_id.database_name != mysql_database_name || !materialized_tables_list.contains(table_id.table_name))
{
LOG_DEBUG(log, "Skip MySQL DDL: \n {}", query_event.query);
LOG_DEBUG(log, "Skip MySQL DDL for {}.{}:\n{}", table_id.database_name, table_id.table_name, query);
return;
}
}

View File

@ -0,0 +1,185 @@
#include "config.h"
#include <gtest/gtest.h>
#include <Databases/MySQL/tryParseTableIDFromDDL.h>
using namespace DB;
struct ParseTableIDFromDDLTestCase
{
String query;
String database_name;
String table_name;
ParseTableIDFromDDLTestCase(
const String & query_,
const String & database_name_,
const String & table_name_)
: query(query_)
, database_name(database_name_)
, table_name(table_name_)
{
}
};
std::ostream & operator<<(std::ostream & ostr, const ParseTableIDFromDDLTestCase & test_case)
{
return ostr << '"' << test_case.query << "\" extracts `" << test_case.database_name << "`.`" << test_case.table_name << "`";
}
class ParseTableIDFromDDLTest : public ::testing::TestWithParam<ParseTableIDFromDDLTestCase>
{
};
TEST_P(ParseTableIDFromDDLTest, parse)
{
const auto & [query, expected_database_name, expected_table_name] = GetParam();
auto table_id = tryParseTableIDFromDDL(query, "default");
EXPECT_EQ(expected_database_name, table_id.database_name);
EXPECT_EQ(expected_table_name, table_id.table_name);
}
INSTANTIATE_TEST_SUITE_P(MaterializedMySQL, ParseTableIDFromDDLTest, ::testing::ValuesIn(std::initializer_list<ParseTableIDFromDDLTestCase>{
{
"SELECT * FROM db.table",
"",
""
},
{
"CREATE TEMPORARY TABLE db.table",
"db",
"table"
},
{
"CREATE TEMPORARY TABLE IF NOT EXISTS db.table",
"db",
"table"
},
{
"CREATE TEMPORARY TABLE table",
"default",
"table"
},
{
"CREATE TEMPORARY TABLE IF NOT EXISTS table",
"default",
"table"
},
{
"CREATE TABLE db.table",
"db",
"table"
},
{
"CREATE TABLE IF NOT EXISTS db.table",
"db",
"table"
},
{
"CREATE TABLE table",
"default",
"table"
},
{
"CREATE TABLE IF NOT EXISTS table",
"default",
"table"
},
{
"ALTER TABLE db.table",
"db",
"table"
},
{
"ALTER TABLE table",
"default",
"table"
},
{
"DROP TABLE db.table",
"db",
"table"
},
{
"DROP TABLE IF EXISTS db.table",
"db",
"table"
},
{
"DROP TABLE table",
"default",
"table"
},
{
"DROP TABLE IF EXISTS table",
"default",
"table"
},
{
"DROP TEMPORARY TABLE db.table",
"db",
"table"
},
{
"DROP TEMPORARY TABLE IF EXISTS db.table",
"db",
"table"
},
{
"DROP TEMPORARY TABLE table",
"default",
"table"
},
{
"DROP TEMPORARY TABLE IF EXISTS table",
"default",
"table"
},
{
"TRUNCATE db.table",
"db",
"table"
},
{
"TRUNCATE TABLE db.table",
"db",
"table"
},
{
"TRUNCATE table1",
"default",
"table1"
},
{
"TRUNCATE TABLE table",
"default",
"table"
},
{
"RENAME TABLE db.table",
"db",
"table"
},
{
"RENAME TABLE table",
"default",
"table"
},
{
"DROP DATABASE db",
"",
""
},
{
"DROP DATA`BASE db",
"",
""
},
{
"NOT A SQL",
"",
""
},
}));

View File

@ -0,0 +1,44 @@
#include <Databases/MySQL/tryParseTableIDFromDDL.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/CommonParsers.h>
#include <Parsers/ExpressionElementParsers.h>
namespace DB
{
StorageID tryParseTableIDFromDDL(const String & query, const String & default_database_name)
{
bool is_ddl = false;
Tokens tokens(query.data(), query.data() + query.size());
IParser::Pos pos(tokens, 0);
Expected expected;
if (ParserKeyword("CREATE TEMPORARY TABLE").ignore(pos, expected) || ParserKeyword("CREATE TABLE").ignore(pos, expected))
{
ParserKeyword("IF NOT EXISTS").ignore(pos, expected);
is_ddl = true;
}
else if (ParserKeyword("ALTER TABLE").ignore(pos, expected) || ParserKeyword("RENAME TABLE").ignore(pos, expected))
{
is_ddl = true;
}
else if (ParserKeyword("DROP TABLE").ignore(pos, expected) || ParserKeyword("DROP TEMPORARY TABLE").ignore(pos, expected))
{
ParserKeyword("IF EXISTS").ignore(pos, expected);
is_ddl = true;
}
else if (ParserKeyword("TRUNCATE").ignore(pos, expected))
{
ParserKeyword("TABLE").ignore(pos, expected);
is_ddl = true;
}
ASTPtr table;
if (!is_ddl || !ParserCompoundIdentifier(true).parse(pos, table, expected))
return StorageID::createEmpty();
auto table_id = table->as<ASTTableIdentifier>()->getTableId();
if (table_id.database_name.empty())
table_id.database_name = default_database_name;
return table_id;
}
}

View File

@ -0,0 +1,11 @@
#pragma once
#include <base/types.h>
#include <Storages/IStorage.h>
namespace DB
{
StorageID tryParseTableIDFromDDL(const String & query, const String & default_database_name);
}

View File

@ -257,7 +257,6 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
const auto & headers_prefix = settings_config_prefix + ".headers";
if (config.has(headers_prefix))
{
Poco::Util::AbstractConfiguration::Keys config_keys;
@ -297,7 +296,10 @@ void registerDictionarySourceHTTP(DictionarySourceFactory & factory)
auto context = copyContextAndApplySettingsFromDictionaryConfig(global_context, config, config_prefix);
if (created_from_ddl)
{
context->getRemoteHostFilter().checkURL(Poco::URI(configuration.url));
context->getHTTPHeaderFilter().checkHeaders(configuration.header_entries);
}
return std::make_unique<HTTPDictionarySource>(dict_struct, configuration, credentials, sample_block, context);
};

View File

@ -19,6 +19,9 @@
namespace DB
{
static constexpr auto microsecond_multiplier = 1000000;
static constexpr auto millisecond_multiplier = 1000;
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
@ -1377,6 +1380,36 @@ struct ToRelativeSecondNumImpl
using FactorTransform = ZeroTransform;
};
template <Int64 scale_multiplier>
struct ToRelativeSubsecondNumImpl
{
static constexpr auto name = "toRelativeSubsecondNumImpl";
static inline Int64 execute(const DateTime64 & t, DateTime64::NativeType scale, const DateLUTImpl &)
{
static_assert(scale_multiplier == 1000 || scale_multiplier == 1000000);
if (scale == scale_multiplier)
return t.value;
if (scale > scale_multiplier)
return t.value / (scale / scale_multiplier);
return t.value * (scale_multiplier / scale);
}
static inline Int64 execute(UInt32 t, const DateLUTImpl &)
{
return t * scale_multiplier;
}
static inline Int64 execute(Int32 d, const DateLUTImpl & time_zone)
{
return static_cast<Int64>(time_zone.fromDayNum(ExtendedDayNum(d))) * scale_multiplier;
}
static inline Int64 execute(UInt16 d, const DateLUTImpl & time_zone)
{
return static_cast<Int64>(time_zone.fromDayNum(DayNum(d)) * scale_multiplier);
}
using FactorTransform = ZeroTransform;
};
struct ToYYYYMMImpl
{
static constexpr auto name = "toYYYYMM";
@ -1476,25 +1509,47 @@ struct ToYYYYMMDDhhmmssImpl
using FactorTransform = ZeroTransform;
};
struct DateTimeComponentsWithFractionalPart : public DateLUTImpl::DateTimeComponents
{
UInt16 millisecond;
UInt16 microsecond;
};
struct ToDateTimeComponentsImpl
{
static constexpr auto name = "toDateTimeComponents";
static inline DateLUTImpl::DateTimeComponents execute(Int64 t, const DateLUTImpl & time_zone)
static inline DateTimeComponentsWithFractionalPart execute(const DateTime64 & t, DateTime64::NativeType scale_multiplier, const DateLUTImpl & time_zone)
{
return time_zone.toDateTimeComponents(t);
auto components = DecimalUtils::splitWithScaleMultiplier(t, scale_multiplier);
if (t.value < 0 && components.fractional)
{
components.fractional = scale_multiplier + (components.whole ? Int64(-1) : Int64(1)) * components.fractional;
--components.whole;
}
Int64 fractional = components.fractional;
if (scale_multiplier > microsecond_multiplier)
fractional = fractional / (scale_multiplier / microsecond_multiplier);
else if (scale_multiplier < microsecond_multiplier)
fractional = fractional * (microsecond_multiplier / scale_multiplier);
constexpr Int64 divider = microsecond_multiplier/ millisecond_multiplier;
UInt16 millisecond = static_cast<UInt16>(fractional / divider);
UInt16 microsecond = static_cast<UInt16>(fractional % divider);
return DateTimeComponentsWithFractionalPart{time_zone.toDateTimeComponents(components.whole), millisecond, microsecond};
}
static inline DateLUTImpl::DateTimeComponents execute(UInt32 t, const DateLUTImpl & time_zone)
static inline DateTimeComponentsWithFractionalPart execute(UInt32 t, const DateLUTImpl & time_zone)
{
return time_zone.toDateTimeComponents(static_cast<DateLUTImpl::Time>(t));
return DateTimeComponentsWithFractionalPart{time_zone.toDateTimeComponents(static_cast<DateLUTImpl::Time>(t)), 0, 0};
}
static inline DateLUTImpl::DateTimeComponents execute(Int32 d, const DateLUTImpl & time_zone)
static inline DateTimeComponentsWithFractionalPart execute(Int32 d, const DateLUTImpl & time_zone)
{
return time_zone.toDateTimeComponents(ExtendedDayNum(d));
return DateTimeComponentsWithFractionalPart{time_zone.toDateTimeComponents(ExtendedDayNum(d)), 0, 0};
}
static inline DateLUTImpl::DateTimeComponents execute(UInt16 d, const DateLUTImpl & time_zone)
static inline DateTimeComponentsWithFractionalPart execute(UInt16 d, const DateLUTImpl & time_zone)
{
return time_zone.toDateTimeComponents(DayNum(d));
return DateTimeComponentsWithFractionalPart{time_zone.toDateTimeComponents(DayNum(d)), 0, 0};
}
using FactorTransform = ZeroTransform;

View File

@ -1112,6 +1112,11 @@ private:
bool c0_const = isColumnConst(*c0);
bool c1_const = isColumnConst(*c1);
/// This is a paranoid check to protect from a broken query analysis.
if (c0->isNullable() != c1->isNullable())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Logical error: columns are assumed to be of identical types, but they are different in Nullable");
if (c0_const && c1_const)
{
UInt8 res = 0;

View File

@ -39,6 +39,9 @@ struct HasTokenImpl
if (start_pos != nullptr)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Function '{}' does not support start_pos argument", name);
if (pattern.empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Needle cannot be empty, because empty string isn't a token");
if (haystack_offsets.empty())
return;

View File

@ -7,8 +7,8 @@
namespace DB
{
/** URL processing functions. See implementation in separate .cpp files.
* All functions are not strictly follow RFC, instead they are maximally simplified for performance reasons.
/** These helpers are used by URL processing functions. See implementation in separate .cpp files.
* All functions do not strictly follow RFC, instead they are maximally simplified for performance reasons.
*
* Functions for extraction parts of URL.
* If URL has nothing like, then empty string is returned.
@ -101,7 +101,7 @@ struct ExtractSubstringImpl
static void vectorFixed(const ColumnString::Chars &, size_t, ColumnString::Chars &)
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Column of type FixedString is not supported by URL functions");
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Column of type FixedString is not supported by this function");
}
};
@ -156,7 +156,7 @@ struct CutSubstringImpl
static void vectorFixed(const ColumnString::Chars &, size_t, ColumnString::Chars &)
{
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Column of type FixedString is not supported by URL functions");
throw Exception(ErrorCodes::ILLEGAL_COLUMN, "Column of type FixedString is not supported by this function");
}
};

View File

@ -5,7 +5,7 @@
namespace DB
{
/** Tansform-type wrapper for DateTime64, simplifies DateTime64 support for given Transform.
/** Transform-type wrapper for DateTime64, simplifies DateTime64 support for given Transform.
*
* Depending on what overloads of Transform::execute() are available, when called with DateTime64 value,
* invokes Transform::execute() with either:
@ -80,7 +80,10 @@ public:
}
else
{
const auto components = DecimalUtils::splitWithScaleMultiplier(t, scale_multiplier);
auto components = DecimalUtils::splitWithScaleMultiplier(t, scale_multiplier);
if (t.value < 0 && components.fractional)
--components.whole;
return wrapped_transform.execute(static_cast<Int64>(components.whole), std::forward<Args>(args)...);
}
}

View File

@ -1,8 +1,8 @@
#pragma once
#include <Functions/FunctionFactory.h>
#include <Functions/URL/FunctionsURL.h>
#include <Functions/FunctionHelpers.h>
#include <Functions/StringHelpers.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnFixedString.h>

View File

@ -1,7 +1,7 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionStringToString.h>
#include <Functions/StringHelpers.h>
#include <base/find_symbols.h>
#include "FunctionsURL.h"
namespace DB
{

View File

@ -1,7 +1,7 @@
#pragma once
#include "FunctionsURL.h"
#include <base/find_symbols.h>
#include <Functions/StringHelpers.h>
namespace DB
{

View File

@ -1,7 +1,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionStringToString.h>
#include <Functions/URL/FunctionsURL.h>
#include <Functions/StringHelpers.h>
namespace DB
@ -154,4 +154,3 @@ REGISTER_FUNCTION(Netloc)
}
}

View File

@ -1,6 +1,6 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionStringToString.h>
#include "FunctionsURL.h"
#include <Functions/StringHelpers.h>
#include "path.h"
#include <base/find_symbols.h>

View File

@ -1,7 +1,7 @@
#pragma once
#include <base/find_symbols.h>
#include <Functions/URL/FunctionsURL.h>
#include <Functions/StringHelpers.h>
namespace DB

View File

@ -1,6 +1,6 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionStringToString.h>
#include "FunctionsURL.h"
#include <Functions/StringHelpers.h>
#include "path.h"
#include <base/find_symbols.h>

View File

@ -1,7 +1,7 @@
#pragma once
#include "FunctionsURL.h"
#include <Common/StringUtils/StringUtils.h>
#include <Functions/StringHelpers.h>
namespace DB
@ -54,4 +54,3 @@ struct ExtractProtocol
};
}

View File

@ -1,7 +1,7 @@
#pragma once
#include "FunctionsURL.h"
#include <base/find_symbols.h>
#include <Functions/StringHelpers.h>
namespace DB

View File

@ -1,7 +1,7 @@
#pragma once
#include "FunctionsURL.h"
#include <base/find_symbols.h>
#include <Functions/StringHelpers.h>
namespace DB
@ -34,4 +34,3 @@ struct ExtractQueryStringAndFragment
};
}

View File

@ -174,12 +174,13 @@ public:
{
auto res = static_cast<Int64>(transform_y.execute(y, timezone_y))
- static_cast<Int64>(transform_x.execute(x, timezone_x));
DateLUTImpl::DateTimeComponents a_comp;
DateLUTImpl::DateTimeComponents b_comp;
DateTimeComponentsWithFractionalPart a_comp;
DateTimeComponentsWithFractionalPart b_comp;
Int64 adjust_value;
auto x_seconds = TransformDateTime64<ToRelativeSecondNumImpl<ResultPrecision::Extended>>(transform_x.getScaleMultiplier()).execute(x, timezone_x);
auto y_seconds = TransformDateTime64<ToRelativeSecondNumImpl<ResultPrecision::Extended>>(transform_y.getScaleMultiplier()).execute(y, timezone_y);
if (x_seconds <= y_seconds)
auto x_microseconds = TransformDateTime64<ToRelativeSubsecondNumImpl<microsecond_multiplier>>(transform_x.getScaleMultiplier()).execute(x, timezone_x);
auto y_microseconds = TransformDateTime64<ToRelativeSubsecondNumImpl<microsecond_multiplier>>(transform_y.getScaleMultiplier()).execute(y, timezone_y);
if (x_microseconds <= y_microseconds)
{
a_comp = TransformDateTime64<ToDateTimeComponentsImpl>(transform_x.getScaleMultiplier()).execute(x, timezone_x);
b_comp = TransformDateTime64<ToDateTimeComponentsImpl>(transform_y.getScaleMultiplier()).execute(y, timezone_y);
@ -192,14 +193,16 @@ public:
adjust_value = 1;
}
if constexpr (std::is_same_v<TransformX, TransformDateTime64<ToRelativeYearNumImpl<ResultPrecision::Extended>>>)
{
if ((a_comp.date.month > b_comp.date.month)
|| ((a_comp.date.month == b_comp.date.month) && ((a_comp.date.day > b_comp.date.day)
|| ((a_comp.date.day == b_comp.date.day) && ((a_comp.time.hour > b_comp.time.hour)
|| ((a_comp.time.hour == b_comp.time.hour) && ((a_comp.time.minute > b_comp.time.minute)
|| ((a_comp.time.minute == b_comp.time.minute) && (a_comp.time.second > b_comp.time.second))))
)))))
|| ((a_comp.time.minute == b_comp.time.minute) && ((a_comp.time.second > b_comp.time.second)
|| ((a_comp.time.second == b_comp.time.second) && ((a_comp.millisecond > b_comp.millisecond)
|| ((a_comp.millisecond == b_comp.millisecond) && (a_comp.microsecond > b_comp.microsecond)))))))))))))
res += adjust_value;
}
else if constexpr (std::is_same_v<TransformX, TransformDateTime64<ToRelativeQuarterNumImpl<ResultPrecision::Extended>>>)
@ -210,8 +213,9 @@ public:
|| ((x_month_in_quarter == y_month_in_quarter) && ((a_comp.date.day > b_comp.date.day)
|| ((a_comp.date.day == b_comp.date.day) && ((a_comp.time.hour > b_comp.time.hour)
|| ((a_comp.time.hour == b_comp.time.hour) && ((a_comp.time.minute > b_comp.time.minute)
|| ((a_comp.time.minute == b_comp.time.minute) && (a_comp.time.second > b_comp.time.second))))
)))))
|| ((a_comp.time.minute == b_comp.time.minute) && ((a_comp.time.second > b_comp.time.second)
|| ((a_comp.time.second == b_comp.time.second) && ((a_comp.millisecond > b_comp.millisecond)
|| ((a_comp.millisecond == b_comp.millisecond) && (a_comp.microsecond > b_comp.microsecond)))))))))))))
res += adjust_value;
}
else if constexpr (std::is_same_v<TransformX, TransformDateTime64<ToRelativeMonthNumImpl<ResultPrecision::Extended>>>)
@ -219,8 +223,9 @@ public:
if ((a_comp.date.day > b_comp.date.day)
|| ((a_comp.date.day == b_comp.date.day) && ((a_comp.time.hour > b_comp.time.hour)
|| ((a_comp.time.hour == b_comp.time.hour) && ((a_comp.time.minute > b_comp.time.minute)
|| ((a_comp.time.minute == b_comp.time.minute) && (a_comp.time.second > b_comp.time.second))))
)))
|| ((a_comp.time.minute == b_comp.time.minute) && ((a_comp.time.second > b_comp.time.second)
|| ((a_comp.time.second == b_comp.time.second) && ((a_comp.millisecond > b_comp.millisecond)
|| ((a_comp.millisecond == b_comp.millisecond) && (a_comp.microsecond > b_comp.microsecond)))))))))))
res += adjust_value;
}
else if constexpr (std::is_same_v<TransformX, TransformDateTime64<ToRelativeWeekNumImpl<ResultPrecision::Extended>>>)
@ -230,25 +235,44 @@ public:
if ((x_day_of_week > y_day_of_week)
|| ((x_day_of_week == y_day_of_week) && (a_comp.time.hour > b_comp.time.hour))
|| ((a_comp.time.hour == b_comp.time.hour) && ((a_comp.time.minute > b_comp.time.minute)
|| ((a_comp.time.minute == b_comp.time.minute) && (a_comp.time.second > b_comp.time.second)))))
|| ((a_comp.time.minute == b_comp.time.minute) && ((a_comp.time.second > b_comp.time.second)
|| ((a_comp.time.second == b_comp.time.second) && ((a_comp.millisecond > b_comp.millisecond)
|| ((a_comp.millisecond == b_comp.millisecond) && (a_comp.microsecond > b_comp.microsecond)))))))))
res += adjust_value;
}
else if constexpr (std::is_same_v<TransformX, TransformDateTime64<ToRelativeDayNumImpl<ResultPrecision::Extended>>>)
{
if ((a_comp.time.hour > b_comp.time.hour)
|| ((a_comp.time.hour == b_comp.time.hour) && ((a_comp.time.minute > b_comp.time.minute)
|| ((a_comp.time.minute == b_comp.time.minute) && (a_comp.time.second > b_comp.time.second)))))
|| ((a_comp.time.minute == b_comp.time.minute) && ((a_comp.time.second > b_comp.time.second)
|| ((a_comp.time.second == b_comp.time.second) && ((a_comp.millisecond > b_comp.millisecond)
|| ((a_comp.millisecond == b_comp.millisecond) && (a_comp.microsecond > b_comp.microsecond)))))))))
res += adjust_value;
}
else if constexpr (std::is_same_v<TransformX, TransformDateTime64<ToRelativeHourNumImpl<ResultPrecision::Extended>>>)
{
if ((a_comp.time.minute > b_comp.time.minute)
|| ((a_comp.time.minute == b_comp.time.minute) && (a_comp.time.second > b_comp.time.second)))
|| ((a_comp.time.minute == b_comp.time.minute) && ((a_comp.time.second > b_comp.time.second)
|| ((a_comp.time.second == b_comp.time.second) && ((a_comp.millisecond > b_comp.millisecond)
|| ((a_comp.millisecond == b_comp.millisecond) && (a_comp.microsecond > b_comp.microsecond)))))))
res += adjust_value;
}
else if constexpr (std::is_same_v<TransformX, TransformDateTime64<ToRelativeMinuteNumImpl<ResultPrecision::Extended>>>)
{
if (a_comp.time.second > b_comp.time.second)
if ((a_comp.time.second > b_comp.time.second)
|| ((a_comp.time.second == b_comp.time.second) && ((a_comp.millisecond > b_comp.millisecond)
|| ((a_comp.millisecond == b_comp.millisecond) && (a_comp.microsecond > b_comp.microsecond)))))
res += adjust_value;
}
else if constexpr (std::is_same_v<TransformX, TransformDateTime64<ToRelativeSecondNumImpl<ResultPrecision::Extended>>>)
{
if ((a_comp.millisecond > b_comp.millisecond)
|| ((a_comp.millisecond == b_comp.millisecond) && (a_comp.microsecond > b_comp.microsecond)))
res += adjust_value;
}
else if constexpr (std::is_same_v<TransformX, TransformDateTime64<ToRelativeSubsecondNumImpl<1000>>>)
{
if (a_comp.microsecond > b_comp.microsecond)
res += adjust_value;
}
return res;
@ -373,6 +397,10 @@ public:
impl.template dispatchForColumns<ToRelativeMinuteNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "second" || unit == "ss" || unit == "s")
impl.template dispatchForColumns<ToRelativeSecondNumImpl<ResultPrecision::Extended>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "millisecond" || unit == "ms")
impl.template dispatchForColumns<ToRelativeSubsecondNumImpl<millisecond_multiplier>>(x, y, timezone_x, timezone_y, res->getData());
else if (unit == "microsecond" || unit == "us" || unit == "u")
impl.template dispatchForColumns<ToRelativeSubsecondNumImpl<microsecond_multiplier>>(x, y, timezone_x, timezone_y, res->getData());
else
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Function {} does not support '{}' unit", getName(), unit);

View File

@ -0,0 +1,42 @@
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionStringToString.h>
#include <Functions/StringHelpers.h>
#include <base/find_symbols.h>
namespace DB
{
struct FirstLine
{
static size_t getReserveLengthForElement() { return 16; }
static void execute(Pos data, size_t size, Pos & res_data, size_t & res_size)
{
res_data = data;
const Pos end = data + size;
const Pos pos = find_first_symbols<'\r', '\n'>(data, end);
res_size = pos - data;
}
};
struct NameFirstLine
{
static constexpr auto name = "firstLine";
};
using FunctionFirstLine = FunctionStringToString<ExtractSubstringImpl<FirstLine>, NameFirstLine>;
REGISTER_FUNCTION(FirstLine)
{
factory.registerFunction<FunctionFirstLine>(FunctionDocumentation{
.description = "Returns first line of a multi-line string.",
.syntax = "firstLine(string)",
.arguments = {{.name = "string", .description = "The string to process."}},
.returned_value = {"The first line of the string or the whole string if there is no line separators."},
.examples = {
{.name = "Return first line", .query = "firstLine('Hello\\nWorld')", .result = "'Hello'"},
{.name = "Return whole string", .query = "firstLine('Hello World')", .result = "'Hello World'"},
}});
}
}

View File

@ -10,7 +10,6 @@
#include <Functions/DateTimeTransforms.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>
#include <Functions/TransformDateTime64.h>
#include <IO/WriteHelpers.h>

View File

@ -16,19 +16,15 @@
#include <DataTypes/DataTypeSet.h>
#include <DataTypes/DataTypeFunction.h>
#include <DataTypes/DataTypeFixedString.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeTuple.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/FieldToDataType.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeFactory.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnFixedString.h>
#include <Columns/ColumnSet.h>
#include <Storages/StorageSet.h>
@ -47,7 +43,6 @@
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/misc.h>
#include <Interpreters/ActionsVisitor.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/Set.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/convertFieldToType.h>
@ -61,6 +56,7 @@
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
#include <Parsers/queryToString.h>
namespace DB
{
@ -715,7 +711,7 @@ bool ActionsMatcher::needChildVisit(const ASTPtr & node, const ASTPtr & child)
node->as<ASTExpressionList>())
return false;
/// Do not go to FROM, JOIN, UNION.
/// Do not go to FROM, JOIN, UNION
if (child->as<ASTTableExpression>() ||
child->as<ASTSelectQuery>())
return false;

View File

@ -97,6 +97,10 @@ UInt128 AsynchronousInsertQueue::InsertQuery::calculateHash() const
for (const auto & setting : settings.allChanged())
{
/// We don't consider this setting because it is only for deduplication,
/// which means we can put two inserts with different tokens in the same block safely.
if (setting.getName() == "insert_deduplication_token")
continue;
siphash.update(setting.getName());
applyVisitor(FieldVisitorHash(siphash), setting.getValue());
}
@ -111,9 +115,10 @@ bool AsynchronousInsertQueue::InsertQuery::operator==(const InsertQuery & other)
return query_str == other.query_str && settings == other.settings;
}
AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && query_id_, MemoryTracker * user_memory_tracker_)
AsynchronousInsertQueue::InsertData::Entry::Entry(String && bytes_, String && query_id_, const String & async_dedup_token_, MemoryTracker * user_memory_tracker_)
: bytes(std::move(bytes_))
, query_id(std::move(query_id_))
, async_dedup_token(async_dedup_token_)
, user_memory_tracker(user_memory_tracker_)
, create_time(std::chrono::system_clock::now())
{
@ -227,7 +232,7 @@ AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
/// to avoid buffering of huge amount of data in memory.
auto read_buf = getReadBufferFromASTInsertQuery(query);
LimitReadBuffer limit_buf(*read_buf, settings.async_insert_max_data_size, /* trow_exception */ false, /* exact_limit */ {});
LimitReadBuffer limit_buf(*read_buf, settings.async_insert_max_data_size, /* throw_exception */ false, /* exact_limit */ {});
WriteBufferFromString write_buf(bytes);
copyData(limit_buf, write_buf);
@ -253,7 +258,7 @@ AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
if (auto quota = query_context->getQuota())
quota->used(QuotaType::WRITTEN_BYTES, bytes.size());
auto entry = std::make_shared<InsertData::Entry>(std::move(bytes), query_context->getCurrentQueryId(), CurrentThread::getUserMemoryTracker());
auto entry = std::make_shared<InsertData::Entry>(std::move(bytes), query_context->getCurrentQueryId(), settings.insert_deduplication_token, CurrentThread::getUserMemoryTracker());
InsertQuery key{query, settings};
InsertDataPtr data_to_process;
@ -517,7 +522,7 @@ try
StreamingFormatExecutor executor(header, format, std::move(on_error), std::move(adding_defaults_transform));
std::unique_ptr<ReadBuffer> last_buffer;
auto chunk_info = std::make_shared<ChunkOffsets>();
auto chunk_info = std::make_shared<AsyncInsertInfo>();
for (const auto & entry : data->entries)
{
auto buffer = std::make_unique<ReadBufferFromString>(entry->bytes);
@ -526,6 +531,7 @@ try
size_t num_rows = executor.execute(*buffer);
total_rows += num_rows;
chunk_info->offsets.push_back(total_rows);
chunk_info->tokens.push_back(entry->async_dedup_token);
/// Keep buffer, because it still can be used
/// in destructor, while resetting buffer at next iteration.

View File

@ -69,10 +69,11 @@ private:
public:
String bytes;
const String query_id;
const String async_dedup_token;
MemoryTracker * const user_memory_tracker;
const std::chrono::time_point<std::chrono::system_clock> create_time;
Entry(String && bytes_, String && query_id_, MemoryTracker * user_memory_tracker_);
Entry(String && bytes_, String && query_id_, const String & async_dedup_token, MemoryTracker * user_memory_tracker_);
void finish(std::exception_ptr exception_ = nullptr);
std::future<void> getFuture() { return promise.get_future(); }

View File

@ -101,6 +101,7 @@
#include <Common/logger_useful.h>
#include <base/EnumReflection.h>
#include <Common/RemoteHostFilter.h>
#include <Common/HTTPHeaderFilter.h>
#include <Interpreters/AsynchronousInsertQueue.h>
#include <Interpreters/DatabaseCatalog.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
@ -336,9 +337,10 @@ struct ContextSharedPart : boost::noncopyable
OrdinaryBackgroundExecutorPtr fetch_executor;
OrdinaryBackgroundExecutorPtr common_executor;
RemoteHostFilter remote_host_filter; /// Allowed URL from config.xml
RemoteHostFilter remote_host_filter; /// Allowed URL from config.xml
HTTPHeaderFilter http_header_filter; /// Forbidden HTTP headers from config.xml
std::optional<TraceCollector> trace_collector; /// Thread collecting traces from threads executing queries
std::optional<TraceCollector> trace_collector; /// Thread collecting traces from threads executing queries
/// Clusters for distributed tables
/// Initialized on demand (on distributed storages initialization) since Settings should be initialized
@ -3007,6 +3009,16 @@ const RemoteHostFilter & Context::getRemoteHostFilter() const
return shared->remote_host_filter;
}
void Context::setHTTPHeaderFilter(const Poco::Util::AbstractConfiguration & config)
{
shared->http_header_filter.setValuesFromConfig(config);
}
const HTTPHeaderFilter & Context::getHTTPHeaderFilter() const
{
return shared->http_header_filter;
}
UInt16 Context::getTCPPort() const
{
auto lock = getLock();

View File

@ -6,6 +6,7 @@
#include <Common/isLocalAddress.h>
#include <Common/MultiVersion.h>
#include <Common/RemoteHostFilter.h>
#include <Common/HTTPHeaderFilter.h>
#include <Common/ThreadPool_fwd.h>
#include <Common/Throttler_fwd.h>
#include <Core/NamesAndTypes.h>
@ -769,6 +770,10 @@ public:
void setRemoteHostFilter(const Poco::Util::AbstractConfiguration & config);
const RemoteHostFilter & getRemoteHostFilter() const;
/// Storage of forbidden HTTP headers from config.xml
void setHTTPHeaderFilter(const Poco::Util::AbstractConfiguration & config);
const HTTPHeaderFilter & getHTTPHeaderFilter() const;
/// The port that the server listens for executing SQL queries.
UInt16 getTCPPort() const;

View File

@ -15,6 +15,7 @@ namespace DB
namespace ErrorCodes
{
extern const int TABLE_IS_READ_ONLY;
extern const int INCORRECT_QUERY;
}
@ -23,6 +24,21 @@ BlockIO InterpreterCreateIndexQuery::execute()
auto current_context = getContext();
const auto & create_index = query_ptr->as<ASTCreateIndexQuery &>();
// Noop if allow_create_index_without_type = true. throw otherwise
if (!create_index.index_decl->as<ASTIndexDeclaration>()->type)
{
if (!current_context->getSettingsRef().allow_create_index_without_type)
{
throw Exception(ErrorCodes::INCORRECT_QUERY, "CREATE INDEX without TYPE is forbidden."
" SET allow_create_index_without_type=1 to ignore this statements.");
}
else
{
// Nothing to do
return {};
}
}
AccessRightsElements required_access;
required_access.emplace_back(AccessType::ALTER_ADD_INDEX, create_index.getDatabase(), create_index.getTable());

View File

@ -1,27 +1,24 @@
#include <Interpreters/evaluateConstantExpression.h>
#include <Columns/ColumnConst.h>
#include <Columns/ColumnsNumber.h>
#include <Core/Block.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/FieldToDataType.h>
#include <Interpreters/Context.h>
#include <Interpreters/convertFieldToType.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/TreeRewriter.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSubquery.h>
#include <Parsers/ExpressionElementParsers.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Common/typeid_cast.h>
#include <Interpreters/FunctionNameNormalizer.h>
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Poco/Util/AbstractConfiguration.h>
#include <unordered_map>
namespace DB
{
@ -94,18 +91,18 @@ std::pair<Field, std::shared_ptr<const IDataType>> evaluateConstantExpression(co
if (!result_column)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Element of set in IN, VALUES or LIMIT or aggregate function parameter "
"Element of set in IN, VALUES, or LIMIT, or aggregate function parameter, or a table function argument "
"is not a constant expression (result column not found): {}", result_name);
if (result_column->empty())
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Logical error: empty result column after evaluation "
"of constant expression for IN, VALUES or LIMIT or aggregate function parameter");
"of constant expression for IN, VALUES, or LIMIT, or aggregate function parameter, or a table function argument");
/// Expressions like rand() or now() are not constant
if (!isColumnConst(*result_column))
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Element of set in IN, VALUES or LIMIT or aggregate function parameter "
"Element of set in IN, VALUES, or LIMIT, or aggregate function parameter, or a table function argument "
"is not a constant expression (result column is not const): {}", result_name);
return std::make_pair((*result_column)[0], result_type);

View File

@ -56,8 +56,7 @@ void ASTCreateIndexQuery::formatQueryImpl(const FormatSettings & settings, Forma
formatOnCluster(settings);
if (!cluster.empty())
settings.ostr << " ";
settings.ostr << " ";
index_decl->formatImpl(settings, state, frame);
}

View File

@ -13,8 +13,8 @@ ASTPtr ASTIndexDeclaration::clone() const
auto res = std::make_shared<ASTIndexDeclaration>();
res->name = name;
res->granularity = granularity;
if (granularity)
res->granularity = granularity;
if (expr)
res->set(res->expr, expr->clone());
if (type)
@ -25,23 +25,37 @@ ASTPtr ASTIndexDeclaration::clone() const
void ASTIndexDeclaration::formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const
{
if (part_of_create_index_query)
if (expr)
{
s.ostr << "(";
expr->formatImpl(s, state, frame);
s.ostr << ")";
}
else
{
s.ostr << backQuoteIfNeed(name);
s.ostr << " ";
expr->formatImpl(s, state, frame);
if (part_of_create_index_query)
{
if (expr->as<ASTExpressionList>())
{
s.ostr << "(";
expr->formatImpl(s, state, frame);
s.ostr << ")";
}
else
expr->formatImpl(s, state, frame);
}
else
{
s.ostr << backQuoteIfNeed(name);
s.ostr << " ";
expr->formatImpl(s, state, frame);
}
}
s.ostr << (s.hilite ? hilite_keyword : "") << " TYPE " << (s.hilite ? hilite_none : "");
type->formatImpl(s, state, frame);
s.ostr << (s.hilite ? hilite_keyword : "") << " GRANULARITY " << (s.hilite ? hilite_none : "");
s.ostr << granularity;
if (type)
{
s.ostr << (s.hilite ? hilite_keyword : "") << " TYPE " << (s.hilite ? hilite_none : "");
type->formatImpl(s, state, frame);
}
if (granularity)
{
s.ostr << (s.hilite ? hilite_keyword : "") << " GRANULARITY " << (s.hilite ? hilite_none : "");
s.ostr << granularity;
}
}
}

View File

@ -64,4 +64,14 @@ void ASTSetQuery::formatImpl(const FormatSettings & format, FormatState &, Forma
}
}
void ASTSetQuery::appendColumnName(WriteBuffer & ostr) const
{
Hash hash = getTreeHash();
writeCString("__settings_", ostr);
writeText(hash.first, ostr);
ostr.write('_');
writeText(hash.second, ostr);
}
}

View File

@ -37,6 +37,9 @@ public:
void updateTreeHashImpl(SipHash & hash_state) const override;
QueryKind getQueryKind() const override { return QueryKind::Set; }
void appendColumnName(WriteBuffer & ostr) const override;
void appendColumnNameWithoutAlias(WriteBuffer & ostr) const override { return appendColumnName(ostr); }
};
}

View File

@ -17,24 +17,36 @@ bool ParserCreateIndexDeclaration::parseImpl(Pos & pos, ASTPtr & node, Expected
{
ParserKeyword s_type("TYPE");
ParserKeyword s_granularity("GRANULARITY");
ParserToken open(TokenType::OpeningRoundBracket);
ParserToken close(TokenType::ClosingRoundBracket);
ParserOrderByExpressionList order_list;
ParserDataType data_type_p;
ParserExpression expression_p;
ParserUnsignedInteger granularity_p;
ASTPtr expr;
ASTPtr order;
ASTPtr type;
ASTPtr granularity;
/// Skip name parser for SQL-standard CREATE INDEX
if (!expression_p.parse(pos, expr, expected))
return false;
if (expression_p.parse(pos, expr, expected))
{
}
else if (open.ignore(pos, expected))
{
if (!order_list.parse(pos, order, expected))
return false;
if (!s_type.ignore(pos, expected))
return false;
if (!close.ignore(pos, expected))
return false;
}
if (!data_type_p.parse(pos, type, expected))
return false;
if (s_type.ignore(pos, expected))
{
if (!data_type_p.parse(pos, type, expected))
return false;
}
if (s_granularity.ignore(pos, expected))
{
@ -45,13 +57,14 @@ bool ParserCreateIndexDeclaration::parseImpl(Pos & pos, ASTPtr & node, Expected
auto index = std::make_shared<ASTIndexDeclaration>();
index->part_of_create_index_query = true;
index->set(index->expr, expr);
index->set(index->type, type);
if (type)
index->set(index->type, type);
if (granularity)
index->granularity = granularity->as<ASTLiteral &>().value.safeGet<UInt64>();
else
{
if (index->type->name == "annoy")
if (index->type && index->type->name == "annoy")
index->granularity = ASTIndexDeclaration::DEFAULT_ANNOY_INDEX_GRANULARITY;
else
index->granularity = ASTIndexDeclaration::DEFAULT_INDEX_GRANULARITY;

View File

@ -114,16 +114,20 @@ private:
using Chunks = std::vector<Chunk>;
/// ChunkOffsets marks offsets of different sub-chunks, which will be used by async inserts.
class ChunkOffsets : public ChunkInfo
/// AsyncInsert needs two kinds of information:
/// - offsets of different sub-chunks
/// - tokens of different sub-chunks, which are assigned by setting `insert_deduplication_token`.
class AsyncInsertInfo : public ChunkInfo
{
public:
ChunkOffsets() = default;
explicit ChunkOffsets(const std::vector<size_t> & offsets_) : offsets(offsets_) {}
AsyncInsertInfo() = default;
explicit AsyncInsertInfo(const std::vector<size_t> & offsets_, const std::vector<String> & tokens_) : offsets(offsets_), tokens(tokens_) {}
std::vector<size_t> offsets;
std::vector<String> tokens;
};
using ChunkOffsetsPtr = std::shared_ptr<ChunkOffsets>;
using AsyncInsertInfoPtr = std::shared_ptr<AsyncInsertInfo>;
/// Extension to support delayed defaults. AddingDefaultsProcessor uses it to replace missing values with column defaults.
class ChunkMissingValues : public ChunkInfo

View File

@ -75,7 +75,7 @@ public:
{
if (!allow_missing_columns)
throw Exception(
ErrorCodes::THERE_IS_NO_COLUMN, "Not found field({}) in arrow schema:{}.", named_col.name, schema.ToString());
ErrorCodes::THERE_IS_NO_COLUMN, "Not found field ({}) in the following Arrow schema:\n{}\n", named_col.name, schema.ToString());
else
continue;
}
@ -168,4 +168,3 @@ private:
};
}
#endif

View File

@ -111,7 +111,7 @@ void optimizePrimaryKeyCondition(const Stack & stack);
void optimizePrewhere(Stack & stack, QueryPlan::Nodes & nodes);
void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
void optimizeAggregationInOrder(QueryPlan::Node & node, QueryPlan::Nodes &);
bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes);
bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections);
bool optimizeUseNormalProjections(Stack & stack, QueryPlan::Nodes & nodes);
bool addPlansForSets(QueryPlan::Node & node, QueryPlan::Nodes & nodes);

View File

@ -19,6 +19,7 @@ QueryPlanOptimizationSettings QueryPlanOptimizationSettings::fromSettings(const
settings.remove_redundant_distinct = from.query_plan_remove_redundant_distinct;
settings.optimize_projection = from.optimize_use_projections && from.query_plan_optimize_projection;
settings.force_use_projection = settings.optimize_projection && from.force_optimize_projection;
settings.optimize_use_implicit_projections = settings.optimize_projection && from.optimize_use_implicit_projections;
return settings;
}

View File

@ -41,6 +41,7 @@ struct QueryPlanOptimizationSettings
/// If reading from projection can be applied
bool optimize_projection = false;
bool force_use_projection = false;
bool optimize_use_implicit_projections = false;
static QueryPlanOptimizationSettings fromSettings(const Settings & from);
static QueryPlanOptimizationSettings fromContext(ContextPtr from);

View File

@ -4,6 +4,7 @@
#include <Processors/QueryPlan/Optimizations/Optimizations.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Common/Exception.h>
#include <DataTypes/IDataType.h>
namespace DB
{
@ -28,6 +29,20 @@ const DB::DataStream & getChildOutputStream(DB::QueryPlan::Node & node)
namespace DB::QueryPlanOptimizations
{
/// This is a check that output columns does not have the same name
/// This is ok for DAG, but may introduce a bug in a SotringStep cause columns are selected by name.
static bool areOutputsConvertableToBlock(const ActionsDAG::NodeRawConstPtrs & outputs)
{
std::unordered_set<std::string_view> names;
for (const auto & output : outputs)
{
if (!names.emplace(output->result_name).second)
return false;
}
return true;
}
size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan::Nodes & nodes)
{
if (parent_node->children.size() != 1)
@ -57,6 +72,9 @@ size_t tryExecuteFunctionsAfterSorting(QueryPlan::Node * parent_node, QueryPlan:
if (unneeded_for_sorting->trivial())
return 0;
if (!areOutputsConvertableToBlock(needed_for_sorting->getOutputs()))
return 0;
// Sorting (parent_node) -> Expression (child_node)
auto & node_with_needed = nodes.emplace_back();
std::swap(node_with_needed.children, child_node->children);

View File

@ -126,7 +126,8 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
optimizeReadInOrder(*frame.node, nodes);
if (optimization_settings.optimize_projection)
num_applied_projection += optimizeUseAggregateProjections(*frame.node, nodes);
num_applied_projection
+= optimizeUseAggregateProjections(*frame.node, nodes, optimization_settings.optimize_use_implicit_projections);
if (optimization_settings.aggregation_in_order)
optimizeAggregationInOrder(*frame.node, nodes);

View File

@ -433,7 +433,8 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
QueryPlan::Node & node,
AggregatingStep & aggregating,
ReadFromMergeTree & reading,
const std::shared_ptr<PartitionIdToMaxBlock> & max_added_blocks)
const std::shared_ptr<PartitionIdToMaxBlock> & max_added_blocks,
bool allow_implicit_projections)
{
const auto & keys = aggregating.getParams().keys;
const auto & aggregates = aggregating.getParams().aggregates;
@ -453,7 +454,8 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
if (projection.type == ProjectionDescription::Type::Aggregate)
agg_projections.push_back(&projection);
bool can_use_minmax_projection = metadata->minmax_count_projection && !reading.getMergeTreeData().has_lightweight_delete_parts.load();
bool can_use_minmax_projection = allow_implicit_projections && metadata->minmax_count_projection
&& !reading.getMergeTreeData().has_lightweight_delete_parts.load();
if (!can_use_minmax_projection && agg_projections.empty())
return candidates;
@ -543,7 +545,7 @@ static QueryPlan::Node * findReadingStep(QueryPlan::Node & node)
return nullptr;
}
bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes & nodes, bool allow_implicit_projections)
{
if (node.children.size() != 1)
return false;
@ -568,7 +570,7 @@ bool optimizeUseAggregateProjections(QueryPlan::Node & node, QueryPlan::Nodes &
std::shared_ptr<PartitionIdToMaxBlock> max_added_blocks = getMaxAddedBlocks(reading);
auto candidates = getAggregateProjectionCandidates(node, *aggregating, *reading, max_added_blocks);
auto candidates = getAggregateProjectionCandidates(node, *aggregating, *reading, max_added_blocks, allow_implicit_projections);
AggregateProjectionCandidate * best_candidate = nullptr;
if (candidates.minmax_projection)

View File

@ -64,23 +64,131 @@ namespace ErrorCodes
}
namespace
{
/// Forward-declared to use in LSWithFoldedRegexpMatching w/o circular dependency.
std::vector<StorageHDFS::PathWithInfo> LSWithRegexpMatching(const String & path_for_ls,
const HDFSFSPtr & fs,
const String & for_match);
/*
* When `{...}` has any `/`s, it must be processed in a different way:
* Basically, a path with globs is processed by LSWithRegexpMatching. In case it detects multi-dir glob {.../..., .../...},
* LSWithFoldedRegexpMatching is in charge from now on.
* It works a bit different: it still recursively goes through subdirectories, but does not match every directory to glob.
* Instead, it goes many levels down (until the approximate max_depth is reached) and compares this multi-dir path to a glob.
* StorageFile.cpp has the same logic.
*/
std::vector<StorageHDFS::PathWithInfo> LSWithFoldedRegexpMatching(const String & path_for_ls,
const HDFSFSPtr & fs,
const String & processed_suffix,
const String & suffix_with_globs,
re2::RE2 & matcher,
const size_t max_depth,
const size_t next_slash_after_glob_pos)
{
/// We don't need to go all the way in every directory if max_depth is reached
/// as it is upper limit of depth by simply counting `/`s in curly braces
if (!max_depth)
return {};
HDFSFileInfo ls;
ls.file_info = hdfsListDirectory(fs.get(), path_for_ls.data(), &ls.length);
if (ls.file_info == nullptr && errno != ENOENT) // NOLINT
{
// ignore file not found exception, keep throw other exception, libhdfs3 doesn't have function to get exception type, so use errno.
throw Exception(
ErrorCodes::ACCESS_DENIED, "Cannot list directory {}: {}", path_for_ls, String(hdfsGetLastError()));
}
std::vector<StorageHDFS::PathWithInfo> result;
if (!ls.file_info && ls.length > 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "file_info shouldn't be null");
for (int i = 0; i < ls.length; ++i)
{
const String full_path = String(ls.file_info[i].mName);
const size_t last_slash = full_path.rfind('/');
const String dir_or_file_name = full_path.substr(last_slash);
const bool is_directory = ls.file_info[i].mKind == 'D';
if (re2::RE2::FullMatch(processed_suffix + dir_or_file_name, matcher))
{
if (next_slash_after_glob_pos == std::string::npos)
{
result.emplace_back(
String(ls.file_info[i].mName),
StorageHDFS::PathInfo{ls.file_info[i].mLastMod, static_cast<size_t>(ls.file_info[i].mSize)});
}
else
{
std::vector<StorageHDFS::PathWithInfo> result_part = LSWithRegexpMatching(
fs::path(full_path) / "" , fs, suffix_with_globs.substr(next_slash_after_glob_pos));
std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
}
}
else if (is_directory)
{
std::vector<StorageHDFS::PathWithInfo> result_part = LSWithFoldedRegexpMatching(
fs::path(full_path), fs, processed_suffix + dir_or_file_name,
suffix_with_globs, matcher, max_depth - 1, next_slash_after_glob_pos);
std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
}
}
return result;
}
/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageFile.
*/
std::vector<StorageHDFS::PathWithInfo> LSWithRegexpMatching(const String & path_for_ls, const HDFSFSPtr & fs, const String & for_match)
std::vector<StorageHDFS::PathWithInfo> LSWithRegexpMatching(
const String & path_for_ls,
const HDFSFSPtr & fs,
const String & for_match)
{
const size_t first_glob = for_match.find_first_of("*?{");
const size_t first_glob_pos = for_match.find_first_of("*?{");
const bool has_glob = first_glob_pos != std::string::npos;
const size_t end_of_path_without_globs = for_match.substr(0, first_glob).rfind('/');
const size_t end_of_path_without_globs = for_match.substr(0, first_glob_pos).rfind('/');
const String suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/'
const String prefix_without_globs = path_for_ls + for_match.substr(1, end_of_path_without_globs); /// ends with '/'
const size_t next_slash = suffix_with_globs.find('/', 1);
re2::RE2 matcher(makeRegexpPatternFromGlobs(suffix_with_globs.substr(0, next_slash)));
size_t slashes_in_glob = 0;
const size_t next_slash_after_glob_pos = [&]()
{
if (!has_glob)
return suffix_with_globs.find('/', 1);
size_t in_curly = 0;
for (std::string::const_iterator it = ++suffix_with_globs.begin(); it != suffix_with_globs.end(); it++)
{
if (*it == '{')
++in_curly;
else if (*it == '/')
{
if (in_curly)
++slashes_in_glob;
else
return size_t(std::distance(suffix_with_globs.begin(), it));
}
else if (*it == '}')
--in_curly;
}
return std::string::npos;
}();
const std::string current_glob = suffix_with_globs.substr(0, next_slash_after_glob_pos);
re2::RE2 matcher(makeRegexpPatternFromGlobs(current_glob));
if (!matcher.ok())
throw Exception(ErrorCodes::CANNOT_COMPILE_REGEXP,
"Cannot compile regex from glob ({}): {}", for_match, matcher.error());
if (slashes_in_glob)
{
return LSWithFoldedRegexpMatching(fs::path(prefix_without_globs), fs, "", suffix_with_globs,
matcher, slashes_in_glob, next_slash_after_glob_pos);
}
HDFSFileInfo ls;
ls.file_info = hdfsListDirectory(fs.get(), prefix_without_globs.data(), &ls.length);
if (ls.file_info == nullptr && errno != ENOENT) // NOLINT
@ -97,7 +205,7 @@ namespace
const String full_path = String(ls.file_info[i].mName);
const size_t last_slash = full_path.rfind('/');
const String file_name = full_path.substr(last_slash);
const bool looking_for_directory = next_slash != std::string::npos;
const bool looking_for_directory = next_slash_after_glob_pos != std::string::npos;
const bool is_directory = ls.file_info[i].mKind == 'D';
/// Condition with type of current file_info means what kind of path is it in current iteration of ls
if (!is_directory && !looking_for_directory)
@ -111,7 +219,7 @@ namespace
{
if (re2::RE2::FullMatch(file_name, matcher))
{
std::vector<StorageHDFS::PathWithInfo> result_part = LSWithRegexpMatching(fs::path(full_path) / "", fs, suffix_with_globs.substr(next_slash));
std::vector<StorageHDFS::PathWithInfo> result_part = LSWithRegexpMatching(fs::path(full_path) / "", fs, suffix_with_globs.substr(next_slash_after_glob_pos));
/// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
std::move(result_part.begin(), result_part.end(), std::back_inserter(result));
}

View File

@ -71,15 +71,12 @@ TableLockHolder IStorage::tryLockForShare(const String & query_id, const std::ch
return result;
}
IStorage::AlterLockHolder IStorage::lockForAlter(const std::chrono::milliseconds & acquire_timeout)
std::optional<IStorage::AlterLockHolder> IStorage::tryLockForAlter(const std::chrono::milliseconds & acquire_timeout)
{
AlterLockHolder lock{alter_lock, std::defer_lock};
if (!lock.try_lock_for(acquire_timeout))
throw Exception(ErrorCodes::DEADLOCK_AVOIDED,
"Locking attempt for ALTER on \"{}\" has timed out! ({} ms) "
"Possible deadlock avoided. Client should retry.",
getStorageID().getFullTableName(), acquire_timeout.count());
return {};
if (is_dropped || is_detached)
throw Exception(ErrorCodes::TABLE_IS_DROPPED, "Table {} is dropped or detached", getStorageID());
@ -87,6 +84,18 @@ IStorage::AlterLockHolder IStorage::lockForAlter(const std::chrono::milliseconds
return lock;
}
IStorage::AlterLockHolder IStorage::lockForAlter(const std::chrono::milliseconds & acquire_timeout)
{
if (auto lock = tryLockForAlter(acquire_timeout); lock == std::nullopt)
throw Exception(ErrorCodes::DEADLOCK_AVOIDED,
"Locking attempt for ALTER on \"{}\" has timed out! ({} ms) "
"Possible deadlock avoided. Client should retry.",
getStorageID().getFullTableName(), acquire_timeout.count());
else
return std::move(*lock);
}
TableExclusiveLockHolder IStorage::lockExclusively(const String & query_id, const std::chrono::milliseconds & acquire_timeout)
{

View File

@ -283,6 +283,7 @@ public:
/// sure, that we execute only one simultaneous alter. Doesn't affect share lock.
using AlterLockHolder = std::unique_lock<std::timed_mutex>;
AlterLockHolder lockForAlter(const std::chrono::milliseconds & acquire_timeout);
std::optional<AlterLockHolder> tryLockForAlter(const std::chrono::milliseconds & acquire_timeout);
/// Lock table exclusively. This lock must be acquired if you want to be
/// sure, that no other thread (SELECT, merge, ALTER, etc.) doing something

View File

@ -11,6 +11,7 @@
#include <Storages/extractKeyExpressionList.h>
#include <Core/Defines.h>
#include "Common/Exception.h"
namespace DB
@ -89,8 +90,16 @@ IndexDescription IndexDescription::getIndexFromAST(const ASTPtr & definition_ast
result.type = Poco::toLower(index_definition->type->name);
result.granularity = index_definition->granularity;
ASTPtr expr_list = extractKeyExpressionList(index_definition->expr->clone());
result.expression_list_ast = expr_list->clone();
ASTPtr expr_list;
if (index_definition->expr)
{
expr_list = extractKeyExpressionList(index_definition->expr->clone());
result.expression_list_ast = expr_list->clone();
}
else
{
throw Exception(ErrorCodes::LOGICAL_ERROR, "Expression is not set");
}
auto syntax = TreeRewriter(context).analyze(expr_list, columns.getAllPhysical());
result.expression = ExpressionAnalyzer(expr_list, syntax, context).getActions(true);

View File

@ -564,7 +564,17 @@ static const ActionsDAG::Node & cloneASTWithInversionPushDown(
}
case (ActionsDAG::ActionType::COLUMN):
{
res = &inverted_dag.addColumn({node.column, node.result_type, node.result_name});
String name;
if (const auto * column_const = typeid_cast<const ColumnConst *>(node.column.get()))
/// Re-generate column name for constant.
/// DAG form query (with enabled analyzer) uses suffixes for constants, like 1_UInt8.
/// DAG from PK does not use it. This is breakig match by column name sometimes.
/// Ideally, we should not compare manes, but DAG subtrees instead.
name = ASTLiteral(column_const->getDataColumn()[0]).getColumnName();
else
name = node.result_name;
res = &inverted_dag.addColumn({node.column, node.result_type, name});
break;
}
case (ActionsDAG::ActionType::ALIAS):

View File

@ -6994,7 +6994,8 @@ std::optional<ProjectionCandidate> MergeTreeData::getQueryProcessingStageWithAgg
ProjectionCandidate * selected_candidate = nullptr;
size_t min_sum_marks = std::numeric_limits<size_t>::max();
if (metadata_snapshot->minmax_count_projection && !has_lightweight_delete_parts.load(std::memory_order_relaxed)) /// Disable ReadFromStorage for parts with lightweight.
if (settings.optimize_use_implicit_projections && metadata_snapshot->minmax_count_projection
&& !has_lightweight_delete_parts.load(std::memory_order_relaxed)) /// Disable ReadFromStorage for parts with lightweight.
add_projection_candidate(*metadata_snapshot->minmax_count_projection, true);
std::optional<ProjectionCandidate> minmax_count_projection_candidate;
if (!candidates.empty())

View File

@ -171,23 +171,23 @@ void MergeTreeDataWriter::TemporaryPart::finalize()
projection->getDataPartStorage().precommitTransaction();
}
std::vector<ChunkOffsetsPtr> scatterOffsetsBySelector(ChunkOffsetsPtr chunk_offsets, const IColumn::Selector & selector, size_t partition_num)
std::vector<AsyncInsertInfoPtr> scatterAsyncInsertInfoBySelector(AsyncInsertInfoPtr async_insert_info, const IColumn::Selector & selector, size_t partition_num)
{
if (nullptr == chunk_offsets)
if (nullptr == async_insert_info)
{
return {};
}
if (selector.empty())
{
return {chunk_offsets};
return {async_insert_info};
}
std::vector<ChunkOffsetsPtr> result(partition_num);
std::vector<AsyncInsertInfoPtr> result(partition_num);
std::vector<Int64> last_row_for_partition(partition_num, -1);
size_t offset_idx = 0;
for (size_t i = 0; i < selector.size(); ++i)
{
++last_row_for_partition[selector[i]];
if (i + 1 == chunk_offsets->offsets[offset_idx])
if (i + 1 == async_insert_info->offsets[offset_idx])
{
for (size_t part_id = 0; part_id < last_row_for_partition.size(); ++part_id)
{
@ -196,9 +196,12 @@ std::vector<ChunkOffsetsPtr> scatterOffsetsBySelector(ChunkOffsetsPtr chunk_offs
continue;
size_t offset = static_cast<size_t>(last_row + 1);
if (result[part_id] == nullptr)
result[part_id] = std::make_shared<ChunkOffsets>();
result[part_id] = std::make_shared<AsyncInsertInfo>();
if (result[part_id]->offsets.empty() || offset > *result[part_id]->offsets.rbegin())
{
result[part_id]->offsets.push_back(offset);
result[part_id]->tokens.push_back(async_insert_info->tokens[offset_idx]);
}
}
++offset_idx;
}
@ -207,7 +210,7 @@ std::vector<ChunkOffsetsPtr> scatterOffsetsBySelector(ChunkOffsetsPtr chunk_offs
}
BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, ChunkOffsetsPtr chunk_offsets)
const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, AsyncInsertInfoPtr async_insert_info)
{
BlocksWithPartition result;
if (!block || !block.rows())
@ -218,8 +221,11 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
if (!metadata_snapshot->hasPartitionKey()) /// Table is not partitioned.
{
result.emplace_back(Block(block), Row{});
if (chunk_offsets != nullptr)
result[0].offsets = std::move(chunk_offsets->offsets);
if (async_insert_info != nullptr)
{
result[0].offsets = std::move(async_insert_info->offsets);
result[0].tokens = std::move(async_insert_info->tokens);
}
return result;
}
@ -236,7 +242,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
IColumn::Selector selector;
buildScatterSelector(partition_columns, partition_num_to_first_row, selector, max_parts);
auto chunk_offsets_with_partition = scatterOffsetsBySelector(chunk_offsets, selector, partition_num_to_first_row.size());
auto async_insert_info_with_partition = scatterAsyncInsertInfoBySelector(async_insert_info, selector, partition_num_to_first_row.size());
size_t partitions_count = partition_num_to_first_row.size();
result.reserve(partitions_count);
@ -255,8 +261,11 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
/// NOTE: returning a copy of the original block so that calculated partition key columns
/// do not interfere with possible calculated primary key columns of the same name.
result.emplace_back(Block(block), get_partition(0));
if (!chunk_offsets_with_partition.empty())
result[0].offsets = std::move(chunk_offsets_with_partition[0]->offsets);
if (!async_insert_info_with_partition.empty())
{
result[0].offsets = std::move(async_insert_info_with_partition[0]->offsets);
result[0].tokens = std::move(async_insert_info_with_partition[0]->tokens);
}
return result;
}
@ -270,8 +279,11 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
result[i].block.getByPosition(col).column = std::move(scattered[i]);
}
for (size_t i = 0; i < chunk_offsets_with_partition.size(); ++i)
result[i].offsets = std::move(chunk_offsets_with_partition[i]->offsets);
for (size_t i = 0; i < async_insert_info_with_partition.size(); ++i)
{
result[i].offsets = std::move(async_insert_info_with_partition[i]->offsets);
result[i].tokens = std::move(async_insert_info_with_partition[i]->tokens);
}
return result;
}

View File

@ -23,14 +23,15 @@ struct BlockWithPartition
Block block;
Row partition;
std::vector<size_t> offsets;
std::vector<String> tokens;
BlockWithPartition(Block && block_, Row && partition_)
: block(block_), partition(std::move(partition_))
{
}
BlockWithPartition(Block && block_, Row && partition_, std::vector<size_t> && offsets_)
: block(block_), partition(std::move(partition_)), offsets(std::move(offsets_))
BlockWithPartition(Block && block_, Row && partition_, std::vector<size_t> && offsets_, std::vector<String> && tokens_)
: block(block_), partition(std::move(partition_)), offsets(std::move(offsets_)), tokens(std::move(tokens_))
{
}
};
@ -51,7 +52,7 @@ public:
* (split rows by partition)
* Works deterministically: if same block was passed, function will return same result in same order.
*/
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, ChunkOffsetsPtr chunk_offsets = nullptr);
static BlocksWithPartition splitBlockIntoParts(const Block & block, size_t max_parts, const StorageMetadataPtr & metadata_snapshot, ContextPtr context, AsyncInsertInfoPtr async_insert_info = nullptr);
/// This structure contains not completely written temporary part.
/// Some writes may happen asynchronously, e.g. for blob storages.

View File

@ -78,7 +78,7 @@ struct ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk
unmerged_block_with_partition(std::move(unmerged_block_with_partition_)),
part_counters(std::move(part_counters_))
{
initBlockIDMap();
initBlockIDMap();
}
void initBlockIDMap()
@ -209,8 +209,8 @@ std::vector<Int64> testSelfDeduplicate(std::vector<Int64> data, std::vector<size
column->insert(datum);
}
Block block({ColumnWithTypeAndName(std::move(column), DataTypePtr(new DataTypeInt64()), "a")});
BlockWithPartition block1(std::move(block), Row(), std::move(offsets));
std::vector<String> tokens(offsets.size());
BlockWithPartition block1(std::move(block), Row(), std::move(offsets), std::move(tokens));
ProfileEvents::Counters profile_counters;
ReplicatedMergeTreeSinkImpl<true>::DelayedChunk::Partition part(
&Poco::Logger::get("testSelfDeduplicate"), MergeTreeDataWriter::TemporaryPart(), 0, std::move(hashes), std::move(block1), std::nullopt, std::move(profile_counters));
@ -242,22 +242,29 @@ namespace
size_t start = 0;
auto cols = block.block.getColumns();
std::vector<String> block_id_vec;
for (auto offset : block.offsets)
for (size_t i = 0; i < block.offsets.size(); ++i)
{
SipHash hash;
for (size_t i = start; i < offset; ++i)
size_t offset = block.offsets[i];
std::string_view token = block.tokens[i];
if (token.empty())
{
for (const auto & col : cols)
col->updateHashWithValue(i, hash);
}
union
{
char bytes[16];
UInt64 words[2];
} hash_value;
hash.get128(hash_value.bytes);
SipHash hash;
for (size_t j = start; j < offset; ++j)
{
for (const auto & col : cols)
col->updateHashWithValue(j, hash);
}
union
{
char bytes[16];
UInt64 words[2];
} hash_value;
hash.get128(hash_value.bytes);
block_id_vec.push_back(partition_id + "_" + DB::toString(hash_value.words[0]) + "_" + DB::toString(hash_value.words[1]));
block_id_vec.push_back(partition_id + "_" + DB::toString(hash_value.words[0]) + "_" + DB::toString(hash_value.words[1]));
}
else
block_id_vec.push_back(partition_id + "_" + std::string(token));
start = offset;
}
@ -418,18 +425,18 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
convertDynamicColumnsToTuples(block, storage_snapshot);
ChunkOffsetsPtr chunk_offsets;
AsyncInsertInfoPtr async_insert_info;
if constexpr (async_insert)
{
const auto & chunk_info = chunk.getChunkInfo();
if (const auto * chunk_offsets_ptr = typeid_cast<const ChunkOffsets *>(chunk_info.get()))
chunk_offsets = std::make_shared<ChunkOffsets>(chunk_offsets_ptr->offsets);
if (const auto * async_insert_info_ptr = typeid_cast<const AsyncInsertInfo *>(chunk_info.get()))
async_insert_info = std::make_shared<AsyncInsertInfo>(async_insert_info_ptr->offsets, async_insert_info_ptr->tokens);
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "No chunk info for async inserts");
}
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context, chunk_offsets);
auto part_blocks = storage.writer.splitBlockIntoParts(block, max_parts_per_block, metadata_snapshot, context, async_insert_info);
using DelayedPartition = typename ReplicatedMergeTreeSinkImpl<async_insert>::DelayedChunk::Partition;
using DelayedPartitions = std::vector<DelayedPartition>;
@ -453,7 +460,7 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
{
/// we copy everything but offsets which we move because they are only used by async insert
if (settings.optimize_on_insert && storage.writer.getMergingMode() != MergeTreeData::MergingParams::Mode::Ordinary)
unmerged_block.emplace(Block(current_block.block), Row(current_block.partition), std::move(current_block.offsets));
unmerged_block.emplace(Block(current_block.block), Row(current_block.partition), std::move(current_block.offsets), std::move(current_block.tokens));
}
/// Write part to the filesystem under temporary name. Calculate a checksum.
@ -468,7 +475,6 @@ void ReplicatedMergeTreeSinkImpl<async_insert>::consume(Chunk chunk)
if constexpr (async_insert)
{
/// TODO consider insert_deduplication_token
block_id = getHashesForBlocks(unmerged_block.has_value() ? *unmerged_block : current_block, temp_part.part->info.partition_id);
LOG_TRACE(log, "async insert part, part id {}, block id {}, offsets {}, size {}", temp_part.part->info.partition_id, toString(block_id), toString(current_block.offsets), current_block.offsets.size());
}

View File

@ -8,7 +8,7 @@
namespace DB {
std::vector<ChunkOffsetsPtr> scatterOffsetsBySelector(ChunkOffsetsPtr chunk_offsets, const IColumn::Selector & selector, size_t partition_num);
std::vector<AsyncInsertInfoPtr> scatterAsyncInsertInfoBySelector(AsyncInsertInfoPtr chunk_offsets, const IColumn::Selector & selector, size_t partition_num);
class AsyncInsertsTest : public ::testing::TestPartResult
{};
@ -16,31 +16,36 @@ class AsyncInsertsTest : public ::testing::TestPartResult
TEST(AsyncInsertsTest, testScatterOffsetsBySelector)
{
auto test_impl = [](std::vector<size_t> offsets, std::vector<size_t> selector_data, size_t part_num, std::vector<std::vector<size_t>> expected)
auto test_impl = [](std::vector<size_t> offsets, std::vector<size_t> selector_data, std::vector<String> tokens, size_t part_num, std::vector<std::vector<std::tuple<size_t, String>>> expected)
{
auto offset_ptr = std::make_shared<ChunkOffsets>(offsets);
auto offset_ptr = std::make_shared<AsyncInsertInfo>(offsets, tokens);
IColumn::Selector selector(selector_data.size());
size_t num_rows = selector_data.size();
for (size_t i = 0; i < num_rows; i++)
selector[i] = selector_data[i];
auto results = scatterOffsetsBySelector(offset_ptr, selector, part_num);
auto results = scatterAsyncInsertInfoBySelector(offset_ptr, selector, part_num);
ASSERT_EQ(results.size(), expected.size());
for (size_t i = 0; i < results.size(); i++)
{
auto result = results[i]->offsets;
auto result = results[i];
auto expect = expected[i];
ASSERT_EQ(result.size(), expect.size());
for (size_t j = 0; j < result.size(); j++)
ASSERT_EQ(result[j], expect[j]);
ASSERT_EQ(result->offsets.size(), expect.size());
ASSERT_EQ(result->tokens.size(), expect.size());
for (size_t j = 0; j < expect.size(); j++)
{
ASSERT_EQ(result->offsets[j], std::get<0>(expect[j]));
ASSERT_EQ(result->tokens[j], std::get<1>(expect[j]));
}
}
};
test_impl({5}, {0,1,0,1,0}, 2, {{3},{2}});
test_impl({5,10}, {0,1,0,1,0,1,0,1,0,1}, 2, {{3,5},{2,5}});
test_impl({4,8,12}, {0,1,0,1,0,2,0,2,1,2,1,2}, 3, {{2,4},{2,4},{2,4}});
test_impl({1,2,3,4,5}, {0,1,2,3,4}, 5, {{1},{1},{1},{1},{1}});
test_impl({3,6,10}, {1,1,1,2,2,2,0,0,0,0}, 3, {{4},{3},{3}});
test_impl({1}, {0}, {"a"}, 1, {{{1,"a"}}});
test_impl({5}, {0,1,0,1,0}, {"a"}, 2, {{{3,"a"}},{{2,"a"}}});
test_impl({5,10}, {0,1,0,1,0,1,0,1,0,1}, {"a", "b"}, 2, {{{3,"a"},{5,"b"}},{{2,"a"},{5,"b"}}});
test_impl({4,8,12}, {0,1,0,1,0,2,0,2,1,2,1,2}, {"a", "b", "c"}, 3, {{{2, "a"},{4, "b"}},{{2,"a"},{4,"c"}},{{2,"b"},{4,"c"}}});
test_impl({1,2,3,4,5}, {0,1,2,3,4}, {"a", "b", "c", "d", "e"}, 5, {{{1,"a"}},{{1,"b"}},{{1, "c"}},{{1, "d"}},{{1, "e"}}});
test_impl({3,6,10}, {1,1,1,2,2,2,0,0,0,0}, {"a", "b", "c"}, 3, {{{4, "c"}},{{3, "a"}},{{3, "b"}}});
}
std::vector<Int64> testSelfDeduplicate(std::vector<Int64> data, std::vector<size_t> offsets, std::vector<String> hashes);

View File

@ -93,6 +93,65 @@ namespace ErrorCodes
namespace
{
/// Forward-declare to use in listFilesWithFoldedRegexpMatchingImpl()
void listFilesWithRegexpMatchingImpl(
const std::string & path_for_ls,
const std::string & for_match,
size_t & total_bytes_to_read,
std::vector<std::string> & result,
bool recursive = false);
/*
* When `{...}` has any `/`s, it must be processed in a different way:
* Basically, a path with globs is processed by listFilesWithRegexpMatchingImpl. In case it detects multi-dir glob {.../..., .../...},
* listFilesWithFoldedRegexpMatchingImpl is in charge from now on.
* It works a bit different: it still recursively goes through subdirectories, but does not match every directory to glob.
* Instead, it goes many levels down (until the approximate max_depth is reached) and compares this multi-dir path to a glob.
* StorageHDFS.cpp has the same logic.
*/
void listFilesWithFoldedRegexpMatchingImpl(const std::string & path_for_ls,
const std::string & processed_suffix,
const std::string & suffix_with_globs,
re2::RE2 & matcher,
size_t & total_bytes_to_read,
const size_t max_depth,
const size_t next_slash_after_glob_pos,
std::vector<std::string> & result)
{
if (!max_depth)
return;
const fs::directory_iterator end;
for (fs::directory_iterator it(path_for_ls); it != end; ++it)
{
const std::string full_path = it->path().string();
const size_t last_slash = full_path.rfind('/');
const String dir_or_file_name = full_path.substr(last_slash);
if (re2::RE2::FullMatch(processed_suffix + dir_or_file_name, matcher))
{
if (next_slash_after_glob_pos == std::string::npos)
{
total_bytes_to_read += it->file_size();
result.push_back(it->path().string());
}
else
{
listFilesWithRegexpMatchingImpl(fs::path(full_path) / "" ,
suffix_with_globs.substr(next_slash_after_glob_pos),
total_bytes_to_read, result);
}
}
else if (it->is_directory())
{
listFilesWithFoldedRegexpMatchingImpl(fs::path(full_path), processed_suffix + dir_or_file_name,
suffix_with_globs, matcher, total_bytes_to_read,
max_depth - 1, next_slash_after_glob_pos, result);
}
}
}
/* Recursive directory listing with matched paths as a result.
* Have the same method in StorageHDFS.
*/
@ -101,15 +160,42 @@ void listFilesWithRegexpMatchingImpl(
const std::string & for_match,
size_t & total_bytes_to_read,
std::vector<std::string> & result,
bool recursive = false)
bool recursive)
{
const size_t first_glob = for_match.find_first_of("*?{");
const size_t first_glob_pos = for_match.find_first_of("*?{");
const bool has_glob = first_glob_pos != std::string::npos;
const size_t end_of_path_without_globs = for_match.substr(0, first_glob).rfind('/');
const size_t end_of_path_without_globs = for_match.substr(0, first_glob_pos).rfind('/');
const std::string suffix_with_globs = for_match.substr(end_of_path_without_globs); /// begin with '/'
const size_t next_slash = suffix_with_globs.find('/', 1);
const std::string current_glob = suffix_with_globs.substr(0, next_slash);
/// slashes_in_glob counter is a upper-bound estimate of recursion depth
/// needed to process complex cases when `/` is included into glob, e.g. /pa{th1/a,th2/b}.csv
size_t slashes_in_glob = 0;
const size_t next_slash_after_glob_pos = [&]()
{
if (!has_glob)
return suffix_with_globs.find('/', 1);
size_t in_curly = 0;
for (std::string::const_iterator it = ++suffix_with_globs.begin(); it != suffix_with_globs.end(); it++)
{
if (*it == '{')
++in_curly;
else if (*it == '/')
{
if (in_curly)
++slashes_in_glob;
else
return size_t(std::distance(suffix_with_globs.begin(), it));
}
else if (*it == '}')
--in_curly;
}
return std::string::npos;
}();
const std::string current_glob = suffix_with_globs.substr(0, next_slash_after_glob_pos);
auto regexp = makeRegexpPatternFromGlobs(current_glob);
re2::RE2 matcher(regexp);
@ -126,13 +212,22 @@ void listFilesWithRegexpMatchingImpl(
if (!fs::exists(prefix_without_globs))
return;
const bool looking_for_directory = next_slash_after_glob_pos != std::string::npos;
if (slashes_in_glob)
{
listFilesWithFoldedRegexpMatchingImpl(fs::path(prefix_without_globs), "", suffix_with_globs,
matcher, total_bytes_to_read, slashes_in_glob,
next_slash_after_glob_pos, result);
return;
}
const fs::directory_iterator end;
for (fs::directory_iterator it(prefix_without_globs); it != end; ++it)
{
const std::string full_path = it->path().string();
const size_t last_slash = full_path.rfind('/');
const String file_name = full_path.substr(last_slash);
const bool looking_for_directory = next_slash != std::string::npos;
/// Condition is_directory means what kind of path is it in current iteration of ls
if (!it->is_directory() && !looking_for_directory)
@ -148,14 +243,12 @@ void listFilesWithRegexpMatchingImpl(
if (recursive)
{
listFilesWithRegexpMatchingImpl(fs::path(full_path).append(it->path().string()) / "" ,
looking_for_directory ? suffix_with_globs.substr(next_slash) : current_glob ,
looking_for_directory ? suffix_with_globs.substr(next_slash_after_glob_pos) : current_glob ,
total_bytes_to_read, result, recursive);
}
else if (looking_for_directory && re2::RE2::FullMatch(file_name, matcher))
{
/// Recursion depth is limited by pattern. '*' works only for depth = 1, for depth = 2 pattern path is '*/*'. So we do not need additional check.
listFilesWithRegexpMatchingImpl(fs::path(full_path) / "", suffix_with_globs.substr(next_slash), total_bytes_to_read, result);
}
listFilesWithRegexpMatchingImpl(fs::path(full_path) / "", suffix_with_globs.substr(next_slash_after_glob_pos), total_bytes_to_read, result);
}
}
}

View File

@ -599,7 +599,20 @@ void StorageMergeTree::mutate(const MutationCommands & commands, ContextPtr quer
/// Validate partition IDs (if any) before starting mutation
getPartitionIdsAffectedByCommands(commands, query_context);
Int64 version = startMutation(commands, query_context);
Int64 version;
{
/// It's important to serialize order of mutations with alter queries because
/// they can depend on each other.
if (auto alter_lock = tryLockForAlter(query_context->getSettings().lock_acquire_timeout); alter_lock == std::nullopt)
{
throw Exception(ErrorCodes::TIMEOUT_EXCEEDED,
"Cannot start mutation in {}ms because some metadata-changing ALTER (MODIFY|RENAME|ADD|DROP) is currently executing. "
"You can change this timeout with `lock_acquire_timeout` setting",
query_context->getSettings().lock_acquire_timeout.totalMilliseconds());
}
version = startMutation(commands, query_context);
}
if (query_context->getSettingsRef().mutations_sync > 0 || query_context->getCurrentTransaction())
waitForMutation(version, false);
}

View File

@ -1385,7 +1385,7 @@ void StorageReplicatedMergeTree::checkParts(bool skip_sanity_checks)
}
const UInt64 parts_to_fetch_blocks = std::accumulate(parts_to_fetch.cbegin(), parts_to_fetch.cend(), 0,
[&](UInt64 acc, const String& part_name)
[&](UInt64 acc, const String & part_name)
{
if (const auto part_info = MergeTreePartInfo::tryParsePartName(part_name, format_version))
return acc + part_info->getBlocksCount();
@ -2448,10 +2448,13 @@ bool StorageReplicatedMergeTree::executeReplaceRange(const LogEntry & entry)
if (part_desc->checksum_hex != part_desc->src_table_part->checksums.getTotalChecksumHex())
throw Exception(ErrorCodes::UNFINISHED, "Checksums of {} is suddenly changed", part_desc->src_table_part->name);
bool zero_copy_enabled = dynamic_cast<const MergeTreeData *>(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication;
/// Don't do hardlinks in case of zero-copy at any side (defensive programming)
bool source_zero_copy_enabled = dynamic_cast<const MergeTreeData *>(source_table.get())->getSettings()->allow_remote_fs_zero_copy_replication;
bool our_zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication;
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = zero_copy_enabled && part_desc->src_table_part->isStoredOnRemoteDiskWithZeroCopySupport(),
.copy_instead_of_hardlink = (our_zero_copy_enabled || source_zero_copy_enabled) && part_desc->src_table_part->isStoredOnRemoteDiskWithZeroCopySupport(),
.metadata_version_to_write = metadata_snapshot->getMetadataVersion()
};
auto [res_part, temporary_part_lock] = cloneAndLoadDataPartOnSameDisk(
@ -7585,8 +7588,10 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
UInt64 index = lock->getNumber();
MergeTreePartInfo dst_part_info(partition_id, index, index, src_part->info.level);
/// Don't do hardlinks in case of zero-copy at any side (defensive programming)
bool zero_copy_enabled = storage_settings_ptr->allow_remote_fs_zero_copy_replication
|| dynamic_cast<const MergeTreeData *>(dest_table.get())->getSettings()->allow_remote_fs_zero_copy_replication;
IDataPartStorage::ClonePartParams clone_params
{
.copy_instead_of_hardlink = zero_copy_enabled && src_part->isStoredOnRemoteDiskWithZeroCopySupport(),

View File

@ -974,6 +974,7 @@ StorageS3::StorageS3(
FormatFactory::instance().checkFormatName(configuration.format);
context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration.url.uri);
context_->getGlobalContext()->getHTTPHeaderFilter().checkHeaders(configuration.headers_from_ast);
StorageInMemoryMetadata storage_metadata;
if (columns_.empty())

View File

@ -44,6 +44,8 @@ StorageS3Cluster::StorageS3Cluster(
, s3_configuration{configuration_}
{
context_->getGlobalContext()->getRemoteHostFilter().checkURL(configuration_.url.uri);
context_->getGlobalContext()->getHTTPHeaderFilter().checkHeaders(configuration_.headers_from_ast);
StorageInMemoryMetadata storage_metadata;
updateConfigurationIfChanged(context_);

View File

@ -1019,6 +1019,7 @@ StorageURL::StorageURL(
distributed_processing_)
{
context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
context_->getHTTPHeaderFilter().checkHeaders(headers);
}

View File

@ -48,6 +48,7 @@ StorageURLCluster::StorageURLCluster(
, uri(uri_)
{
context_->getRemoteHostFilter().checkURL(Poco::URI(uri));
context_->getHTTPHeaderFilter().checkHeaders(configuration_.headers);
StorageInMemoryMetadata storage_metadata;

View File

@ -18,6 +18,8 @@
#include <Storages/NamedCollectionsHelpers.h>
#include <Formats/FormatFactory.h>
#include "registerTableFunctions.h"
#include <Analyzer/FunctionNode.h>
#include <Analyzer/TableFunctionNode.h>
#include <boost/algorithm/string.hpp>
@ -32,6 +34,24 @@ namespace ErrorCodes
}
std::vector<size_t> TableFunctionS3::skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr) const
{
auto & table_function_node = query_node_table_function->as<TableFunctionNode &>();
auto & table_function_arguments_nodes = table_function_node.getArguments().getNodes();
size_t table_function_arguments_size = table_function_arguments_nodes.size();
std::vector<size_t> result;
for (size_t i = 0; i < table_function_arguments_size; ++i)
{
auto * function_node = table_function_arguments_nodes[i]->as<FunctionNode>();
if (function_node && function_node->getFunctionName() == "headers")
result.push_back(i);
}
return result;
}
/// This is needed to avoid copy-pase. Because s3Cluster arguments only differ in additional argument (first) - cluster name
void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context)
{
@ -41,13 +61,14 @@ void TableFunctionS3::parseArgumentsImpl(ASTs & args, const ContextPtr & context
}
else
{
if (args.empty() || args.size() > 6)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "The signature of table function {} shall be the following:\n{}", getName(), getSignature());
auto * header_it = StorageURL::collectHeaders(args, configuration.headers_from_ast, context);
if (header_it != args.end())
args.erase(header_it);
if (args.empty() || args.size() > 6)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH, "The signature of table function {} shall be the following:\n{}", getName(), getSignature());
for (auto & arg : args)
arg = evaluateConstantExpressionOrIdentifierAsLiteral(arg, context);

View File

@ -73,6 +73,10 @@ protected:
mutable StorageS3::Configuration configuration;
ColumnsDescription structure_hint;
private:
std::vector<size_t> skipAnalysisForArguments(const QueryTreeNodePtr & query_node_table_function, ContextPtr context) const override;
};
}

View File

@ -246,6 +246,12 @@ def main():
if args.check_running_workflows:
workflows = get_workflows_for_head(repo, pr.head.sha)
logging.info(
"The PR #%s has following workflows:\n%s",
pr.number,
"\n".join(f"{wf.html_url}: status is {wf.status}" for wf in workflows),
)
workflows_in_progress = [wf for wf in workflows if wf.status != "completed"]
# At most one workflow in progress is fine. We check that there no
# cases like, e.g. PullRequestCI and DocksCheck in progress at once

View File

@ -0,0 +1,6 @@
<clickhouse>
<http_forbid_headers>
<header>exact_header</header>
<header_regexp>(?i)(case_insensitive_header)</header_regexp>
</http_forbid_headers>
</clickhouse>

View File

@ -1,64 +1,14 @@
<clickhouse>
<storage_configuration>
<disks>
<!-- s3 disks -->
<s3_common_disk>
<s3_disk>
<type>s3</type>
<path>s3_common_disk/</path>
<path>s3_disk/</path>
<endpoint>http://localhost:11111/test/common/</endpoint>
<access_key_id>clickhouse</access_key_id>
<secret_access_key>clickhouse</secret_access_key>
<request_timeout_ms>20000</request_timeout_ms>
</s3_common_disk>
<s3_disk>
<type>s3</type>
<path>s3_disk/</path>
<endpoint>http://localhost:11111/test/00170_test/</endpoint>
<access_key_id>clickhouse</access_key_id>
<secret_access_key>clickhouse</secret_access_key>
<request_timeout_ms>20000</request_timeout_ms>
</s3_disk>
<s3_disk_2>
<type>s3</type>
<path>s3_disk_2/</path>
<endpoint>http://localhost:11111/test/00170_test/</endpoint>
<access_key_id>clickhouse</access_key_id>
<secret_access_key>clickhouse</secret_access_key>
<request_timeout_ms>20000</request_timeout_ms>
</s3_disk_2>
<s3_disk_3>
<type>s3</type>
<path>s3_disk_3/</path>
<endpoint>http://localhost:11111/test/00170_test/</endpoint>
<access_key_id>clickhouse</access_key_id>
<secret_access_key>clickhouse</secret_access_key>
<request_timeout_ms>20000</request_timeout_ms>
</s3_disk_3>
<s3_disk_4>
<type>s3</type>
<path>s3_disk_4/</path>
<endpoint>http://localhost:11111/test/00170_test/</endpoint>
<access_key_id>clickhouse</access_key_id>
<secret_access_key>clickhouse</secret_access_key>
<request_timeout_ms>20000</request_timeout_ms>
</s3_disk_4>
<s3_disk_5>
<type>s3</type>
<path>s3_disk_5/</path>
<endpoint>http://localhost:11111/test/00170_test/</endpoint>
<access_key_id>clickhouse</access_key_id>
<secret_access_key>clickhouse</secret_access_key>
<request_timeout_ms>20000</request_timeout_ms>
</s3_disk_5>
<s3_disk_6>
<type>s3</type>
<path>s3_disk_6/</path>
<endpoint>http://localhost:11111/test/00170_test/</endpoint>
<access_key_id>clickhouse</access_key_id>
<secret_access_key>clickhouse</secret_access_key>
<request_timeout_ms>20000</request_timeout_ms>
</s3_disk_6>
<!-- cache for s3 disks -->
<s3_cache>
<type>cache</type>
<disk>s3_disk</disk>
@ -67,65 +17,6 @@
<cache_on_write_operations>1</cache_on_write_operations>
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
</s3_cache>
<s3_cache_2>
<type>cache</type>
<disk>s3_disk_2</disk>
<path>s3_cache_2/</path>
<max_size>128Mi</max_size>
<max_file_segment_size>100Mi</max_file_segment_size>
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
</s3_cache_2>
<s3_cache_3>
<type>cache</type>
<disk>s3_disk_3</disk>
<path>s3_disk_3_cache/</path>
<max_size>128Mi</max_size>
<data_cache_max_size>22548578304</data_cache_max_size>
<cache_on_write_operations>1</cache_on_write_operations>
<enable_cache_hits_threshold>1</enable_cache_hits_threshold>
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
</s3_cache_3>
<s3_cache_4>
<type>cache</type>
<disk>s3_disk_4</disk>
<path>s3_cache_4/</path>
<max_size>128Mi</max_size>
<cache_on_write_operations>1</cache_on_write_operations>
<enable_filesystem_query_cache_limit>1</enable_filesystem_query_cache_limit>
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
</s3_cache_4>
<s3_cache_5>
<type>cache</type>
<disk>s3_disk_5</disk>
<path>s3_cache_5/</path>
<max_size>128Mi</max_size>
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
</s3_cache_5>
<s3_cache_6>
<type>cache</type>
<disk>s3_disk_6</disk>
<path>s3_cache_6/</path>
<max_size>128Mi</max_size>
<enable_bypass_cache_with_threashold>1</enable_bypass_cache_with_threashold>
<bypass_cache_threashold>100</bypass_cache_threashold>
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
</s3_cache_6>
<s3_cache_small>
<type>cache</type>
<disk>s3_disk_6</disk>
<path>s3_cache_small/</path>
<max_size>1000</max_size>
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
</s3_cache_small>
<s3_cache_small_segment_size>
<type>cache</type>
<disk>s3_disk_6</disk>
<path>s3_cache_small_segment_size/</path>
<max_size>128Mi</max_size>
<max_file_segment_size>10Ki</max_file_segment_size>
<cache_on_write_operations>1</cache_on_write_operations>
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
</s3_cache_small_segment_size>
<!-- local disks -->
<local_disk>
<type>local_blob_storage</type>
@ -167,7 +58,7 @@
<!-- multi layer cache -->
<s3_cache_multi>
<type>cache</type>
<disk>s3_cache_5</disk>
<disk>s3_cache</disk>
<path>s3_cache_multi/</path>
<max_size>22548578304</max_size>
<delayed_cleanup_interval_ms>100</delayed_cleanup_interval_ms>
@ -188,34 +79,6 @@
</main>
</volumes>
</s3_cache>
<s3_cache_2>
<volumes>
<main>
<disk>s3_cache_2</disk>
</main>
</volumes>
</s3_cache_2>
<s3_cache_3>
<volumes>
<main>
<disk>s3_cache_3</disk>
</main>
</volumes>
</s3_cache_3>
<s3_cache_4>
<volumes>
<main>
<disk>s3_cache_4</disk>
</main>
</volumes>
</s3_cache_4>
<s3_cache_6>
<volumes>
<main>
<disk>s3_cache_6</disk>
</main>
</volumes>
</s3_cache_6>
<s3_cache_multi>
<volumes>
<main>
@ -223,13 +86,6 @@
</main>
</volumes>
</s3_cache_multi>
<s3_cache_small>
<volumes>
<main>
<disk>s3_cache_small</disk>
</main>
</volumes>
</s3_cache_small>
<local_cache>
<volumes>
<main>
@ -251,13 +107,6 @@
</main>
</volumes>
</local_cache_3>
<s3_cache_small_segment_size>
<volumes>
<main>
<disk>s3_cache_small_segment_size</disk>
</main>
</volumes>
</s3_cache_small_segment_size>
</policies>
</storage_configuration>
</clickhouse>

Some files were not shown because too many files have changed in this diff Show More