Merge branch 'master' into disk-s3-read-error-fix

This commit is contained in:
mergify[bot] 2021-06-11 15:00:43 +00:00 committed by GitHub
commit 60c094e785
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
221 changed files with 4314 additions and 1513 deletions

View File

@ -2,6 +2,7 @@
#### Upgrade Notes
* One bug has been found after release: [#25187](https://github.com/ClickHouse/ClickHouse/issues/25187).
* Do not upgrade if you have partition key with `UUID`.
* `zstd` compression library is updated to v1.5.0. You may get messages about "checksum does not match" in replication. These messages are expected due to update of compression algorithm and you can ignore them. These messages are informational and do not indicate any kinds of undesired behaviour.
* The setting `compile_expressions` is enabled by default. Although it has been heavily tested on variety of scenarios, if you find some undesired behaviour on your servers, you can try turning this setting off.

2
contrib/croaring vendored

@ -1 +1 @@
Subproject commit d8402939b5c9fc134fd4fcf058fe0f7006d2b129
Subproject commit 2c867e9f9c9e2a3a7032791f94c4c7ae3013f6e0

View File

@ -1 +1 @@
#*/10 * * * * root (which service > /dev/null 2>&1 && (service clickhouse-server condstart ||:)) || /etc/init.d/clickhouse-server condstart > /dev/null 2>&1
#*/10 * * * * root ((which service > /dev/null 2>&1 && (service clickhouse-server condstart ||:)) || /etc/init.d/clickhouse-server condstart) > /dev/null 2>&1

View File

@ -97,14 +97,10 @@ function fuzz
NEW_TESTS_OPT="${NEW_TESTS_OPT:-}"
fi
export CLICKHOUSE_WATCHDOG_ENABLE=0 # interferes with gdb
clickhouse-server --config-file db/config.xml -- --path db 2>&1 | tail -100000 > server.log &
server_pid=$!
kill -0 $server_pid
while ! clickhouse-client --query "select 1" && kill -0 $server_pid ; do echo . ; sleep 1 ; done
clickhouse-client --query "select 1"
kill -0 $server_pid
echo Server started
echo "
handle all noprint
@ -115,12 +111,31 @@ thread apply all backtrace
continue
" > script.gdb
gdb -batch -command script.gdb -p "$(pidof clickhouse-server)" &
gdb -batch -command script.gdb -p $server_pid &
# Check connectivity after we attach gdb, because it might cause the server
# to freeze and the fuzzer will fail.
for _ in {1..60}
do
sleep 1
if clickhouse-client --query "select 1"
then
break
fi
done
clickhouse-client --query "select 1" # This checks that the server is responding
kill -0 $server_pid # This checks that it is our server that is started and not some other one
echo Server started and responded
# SC2012: Use find instead of ls to better handle non-alphanumeric filenames. They are all alphanumeric.
# SC2046: Quote this to prevent word splitting. Actually I need word splitting.
# shellcheck disable=SC2012,SC2046
clickhouse-client --query-fuzzer-runs=1000 --queries-file $(ls -1 ch/tests/queries/0_stateless/*.sql | sort -R) $NEW_TESTS_OPT \
clickhouse-client \
--receive_timeout=10 \
--receive_data_timeout_ms=10000 \
--query-fuzzer-runs=1000 \
--queries-file $(ls -1 ch/tests/queries/0_stateless/*.sql | sort -R) \
$NEW_TESTS_OPT \
> >(tail -n 100000 > fuzzer.log) \
2>&1 &
fuzzer_pid=$!
@ -198,13 +213,17 @@ continue
echo "success" > status.txt
echo "OK" > description.txt
else
# The server was alive, but the fuzzer returned some error. Probably this
# is a problem in the fuzzer itself. Don't grep the server log in this
# case, because we will find a message about normal server termination
# (Received signal 15), which is confusing.
# The server was alive, but the fuzzer returned some error. This might
# be some client-side error detected by fuzzing, or a problem in the
# fuzzer itself. Don't grep the server log in this case, because we will
# find a message about normal server termination (Received signal 15),
# which is confusing.
task_exit_code=$fuzzer_exit_code
echo "failure" > status.txt
echo "Fuzzer failed ($fuzzer_exit_code). See the logs." > description.txt
{ grep -o "Found error:.*" fuzzer.log \
|| grep -o "Exception.*" fuzzer.log \
|| echo "Fuzzer failed ($fuzzer_exit_code). See the logs." ; } \
| tail -1 > description.txt
fi
}

View File

@ -554,12 +554,6 @@ create table query_metric_stats_denorm engine File(TSVWithNamesAndTypes,
" 2> >(tee -a analyze/errors.log 1>&2)
# Fetch historical query variability thresholds from the CI database
clickhouse-local --query "
left join file('analyze/report-thresholds.tsv', TSV,
'test text, report_threshold float') thresholds
on query_metric_stats.test = thresholds.test
"
if [ -v CHPC_DATABASE_URL ]
then
set +x # Don't show password in the log
@ -577,7 +571,8 @@ then
--date_time_input_format=best_effort)
# Precision is going to be 1.5 times worse for PRs. How do I know it? I ran this:
# Precision is going to be 1.5 times worse for PRs, because we run the queries
# less times. How do I know it? I ran this:
# SELECT quantilesExact(0., 0.1, 0.5, 0.75, 0.95, 1.)(p / m)
# FROM
# (
@ -592,19 +587,27 @@ then
# query_display_name
# HAVING count(*) > 100
# )
# The file can be empty if the server is inaccessible, so we can't use TSVWithNamesAndTypes.
#
# The file can be empty if the server is inaccessible, so we can't use
# TSVWithNamesAndTypes.
#
"${client[@]}" --query "
select test, query_index,
quantileExact(0.99)(abs(diff)) max_diff,
quantileExactIf(0.99)(stat_threshold, abs(diff) < stat_threshold) * 1.5 max_stat_threshold,
quantileExact(0.99)(abs(diff)) * 1.5 AS max_diff,
quantileExactIf(0.99)(stat_threshold, abs(diff) < stat_threshold) * 1.5 AS max_stat_threshold,
query_display_name
from query_metrics_v2
where event_date > now() - interval 1 month
-- We use results at least one week in the past, so that the current
-- changes do not immediately influence the statistics, and we have
-- some time to notice that something is wrong.
where event_date between now() - interval 1 month - interval 1 week
and now() - interval 1 week
and metric = 'client_time'
and pr_number = 0
group by test, query_index, query_display_name
having count(*) > 100
" > analyze/historical-thresholds.tsv
set -x
else
touch analyze/historical-thresholds.tsv
fi
@ -1224,6 +1227,55 @@ unset IFS
function upload_results
{
# Prepare info for the CI checks table.
rm ci-checks.tsv
clickhouse-local --query "
create view queries as select * from file('report/queries.tsv', TSVWithNamesAndTypes,
'changed_fail int, changed_show int, unstable_fail int, unstable_show int,
left float, right float, diff float, stat_threshold float,
test text, query_index int, query_display_name text');
create table ci_checks engine File(TSVWithNamesAndTypes, 'ci-checks.tsv')
as select
$PR_TO_TEST pull_request_number,
'$SHA_TO_TEST' commit_sha,
'Performance' check_name,
'$(sed -n 's/.*<!--status: \(.*\)-->/\1/p' report.html)' check_status,
-- TODO toDateTime() can't parse output of 'date', so no time for now.
($(date +%s) - $CHPC_CHECK_START_TIMESTAMP) * 1000 check_duration_ms,
fromUnixTimestamp($CHPC_CHECK_START_TIMESTAMP) check_start_time,
test_name,
test_status,
test_duration_ms,
report_url,
$PR_TO_TEST = 0
? 'https://github.com/ClickHouse/ClickHouse/commit/$SHA_TO_TEST'
: 'https://github.com/ClickHouse/ClickHouse/pull/$PR_TO_TEST' pull_request_url,
'' commit_url,
'' task_url,
'' base_ref,
'' base_repo,
'' head_ref,
'' head_repo
from (
select '' test_name,
'$(sed -n 's/.*<!--message: \(.*\)-->/\1/p' report.html)' test_status,
0 test_duration_ms,
'https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#fail1' report_url
union all
select test || ' #' || toString(query_index), 'slower' test_status, 0 test_duration_ms,
'https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#changes-in-performance.'
|| test || '.' || toString(query_index) report_url
from queries where changed_fail != 0 and diff > 0
union all
select test || ' #' || toString(query_index), 'unstable' test_status, 0 test_duration_ms,
'https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#unstable-queries.'
|| test || '.' || toString(query_index) report_url
from queries where unstable_fail != 0
)
;
"
if ! [ -v CHPC_DATABASE_URL ]
then
echo Database for test results is not specified, will not upload them.
@ -1292,6 +1344,10 @@ $REF_SHA $SHA_TO_TEST $(numactl --show | sed -n 's/^cpubind:[[:space:]]\+/numact
$REF_SHA $SHA_TO_TEST $(numactl --hardware | sed -n 's/^available:[[:space:]]\+/numactl-available /p')
EOF
# Also insert some data about the check into the CI checks table.
"${client[@]}" --query "INSERT INTO "'"'"gh-data"'"'".checks FORMAT TSVWithNamesAndTypes" \
< ci-checks.tsv
set -x
}

View File

@ -1,6 +1,9 @@
#!/bin/bash
set -ex
CHPC_CHECK_START_TIMESTAMP="$(date +%s)"
export CHPC_CHECK_START_TIMESTAMP
# Use the packaged repository to find the revision we will compare to.
function find_reference_sha
{

View File

@ -561,8 +561,9 @@ if args.report == 'main':
# Don't show mildly unstable queries, only the very unstable ones we
# treat as errors.
if very_unstable_queries:
error_tests += very_unstable_queries
status = 'failure'
if very_unstable_queries > 3:
error_tests += very_unstable_queries
status = 'failure'
message_array.append(str(very_unstable_queries) + ' unstable')
error_tests += slow_average_tests

View File

@ -1,6 +1,15 @@
#!/bin/bash
set -e
echo "Configure to use Yandex dockerhub-proxy"
mkdir -p /etc/docker/
cat > /etc/docker/daemon.json << EOF
{
"insecure-registries" : ["dockerhub-proxy.sas.yp-c.yandex.net:5000"],
"registry-mirrors" : ["http://dockerhub-proxy.sas.yp-c.yandex.net:5000"]
}
EOF
dockerd --host=unix:///var/run/docker.sock --host=tcp://0.0.0.0:2375 &>/var/log/somefile &
set +e
@ -16,14 +25,6 @@ while true; do
done
set -e
echo "Configure to use Yandex dockerhub-proxy"
cat > /etc/docker/daemon.json << EOF
{
"insecure-registries": ["dockerhub-proxy.sas.yp-c.yandex.net:5000"],
"registry-mirrors": ["dockerhub-proxy.sas.yp-c.yandex.net:5000"]
}
EOF
echo "Start tests"
export CLICKHOUSE_TESTS_SERVER_BIN_PATH=/clickhouse
export CLICKHOUSE_TESTS_CLIENT_BIN_PATH=/clickhouse

View File

@ -94,6 +94,15 @@ For production environments, its recommended to use the latest `stable`-versi
To run ClickHouse inside Docker follow the guide on [Docker Hub](https://hub.docker.com/r/yandex/clickhouse-server/). Those images use official `deb` packages inside.
### Single Binary
You can install ClickHouse on Linux using single portable binary from the latest commit of the `master` branch: [https://builds.clickhouse.tech/master/amd64/clickhouse].
```
curl -O 'https://builds.clickhouse.tech/master/amd64/clickhouse' && chmod a+x clickhouse
sudo ./clickhouse install
```
### From Precompiled Binaries for Non-Standard Environments {#from-binaries-non-linux}
For non-Linux operating systems and for AArch64 CPU arhitecture, ClickHouse builds are provided as a cross-compiled binary from the latest commit of the `master` branch (with a few hours delay).
@ -104,7 +113,7 @@ For non-Linux operating systems and for AArch64 CPU arhitecture, ClickHouse buil
After downloading, you can use the `clickhouse client` to connect to the server, or `clickhouse local` to process local data.
Run `sudo ./clickhouse install` if you want to install clickhouse system-wide (also with needed configuration files, configuring users etc.). After that run `clickhouse start` commands to start the clickhouse-server and `clickhouse-client` to connect to it.
Run `sudo ./clickhouse install` if you want to install clickhouse system-wide (also with needed configuration files, configuring users etc.). After that run `clickhouse start` commands to start the clickhouse-server and `clickhouse-client` to connect to it.
These builds are not recommended for use in production environments because they are less thoroughly tested, but you can do so on your own risk. They also have only a subset of ClickHouse features available.

View File

@ -56,13 +56,13 @@ Note, that you can define multiple LDAP servers inside the `ldap_servers` sectio
- `port` — LDAP server port, default is `636` if `enable_tls` is set to `true`, `389` otherwise.
- `bind_dn` — Template used to construct the DN to bind to.
- The resulting DN will be constructed by replacing all `{user_name}` substrings of the template with the actual user name during each authentication attempt.
- `user_dn_detection` - Section with LDAP search parameters for detecting the actual user DN of the bound user.
- `user_dn_detection` Section with LDAP search parameters for detecting the actual user DN of the bound user.
- This is mainly used in search filters for further role mapping when the server is Active Directory. The resulting user DN will be used when replacing `{user_dn}` substrings wherever they are allowed. By default, user DN is set equal to bind DN, but once search is performed, it will be updated with to the actual detected user DN value.
- `base_dn` - Template used to construct the base DN for the LDAP search.
- `base_dn` Template used to construct the base DN for the LDAP search.
- The resulting DN will be constructed by replacing all `{user_name}` and `{bind_dn}` substrings of the template with the actual user name and bind DN during the LDAP search.
- `scope` - Scope of the LDAP search.
- `scope` Scope of the LDAP search.
- Accepted values are: `base`, `one_level`, `children`, `subtree` (the default).
- `search_filter` - Template used to construct the search filter for the LDAP search.
- `search_filter` Template used to construct the search filter for the LDAP search.
- The resulting filter will be constructed by replacing all `{user_name}`, `{bind_dn}`, and `{base_dn}` substrings of the template with the actual user name, bind DN, and base DN during the LDAP search.
- Note, that the special characters must be escaped properly in XML.
- `verification_cooldown` — A period of time, in seconds, after a successful bind attempt, during which the user will be assumed to be successfully authenticated for all consecutive requests without contacting the LDAP server.
@ -108,7 +108,6 @@ Note, that user `my_user` refers to `my_ldap_server`. This LDAP server must be c
When SQL-driven [Access Control and Account Management](../access-rights.md#access-control) is enabled, users that are authenticated by LDAP servers can also be created using the [CREATE USER](../../sql-reference/statements/create/user.md#create-user-statement) statement.
Query:
```sql

View File

@ -4,7 +4,7 @@ toc_priority: 144
# sumCount {#agg_function-sumCount}
Calculates the sum of the numbers and counts the number of rows at the same time.
Calculates the sum of the numbers and counts the number of rows at the same time. The function is used by ClickHouse query optimizer: if there are multiple `sum`, `count` or `avg` functions in a query, they can be replaced to single `sumCount` function to reuse the calculations. The function is rarely needed to use explicitly.
**Syntax**

View File

@ -0,0 +1,40 @@
---
toc_priority: 145
---
# sumKahan {#agg_function-sumKahan}
Calculates the sum of the numbers with [Kahan compensated summation algorithm](https://en.wikipedia.org/wiki/Kahan_summation_algorithm)
Slower than [sum](./sum.md) function.
The compensation works only for [Float](../../../sql-reference/data-types/float.md) types.
**Syntax**
``` sql
sumKahan(x)
```
**Arguments**
- `x` — Input value, must be [Integer](../../../sql-reference/data-types/int-uint.md), [Float](../../../sql-reference/data-types/float.md), or [Decimal](../../../sql-reference/data-types/decimal.md).
**Returned value**
- the sum of numbers, with type [Integer](../../../sql-reference/data-types/int-uint.md), [Float](../../../sql-reference/data-types/float.md), or [Decimal](../../../sql-reference/data-types/decimal.md) depends on type of input arguments
**Example**
Query:
``` sql
SELECT sum(0.1), sumKahan(0.1) FROM numbers(10);
```
Result:
``` text
┌───────────sum(0.1)─┬─sumKahan(0.1)─┐
│ 0.9999999999999999 │ 1 │
└────────────────────┴───────────────┘
```

View File

@ -4,7 +4,7 @@ toc_priority: 109
# topKWeighted {#topkweighted}
Similar to `topK` but takes one additional argument of integer type - `weight`. Every value is accounted `weight` times for frequency calculation.
Returns an array of the approximately most frequent values in the specified column. The resulting array is sorted in descending order of approximate frequency of values (not by the values themselves). Additionally, the weight of the value is taken into account.
**Syntax**
@ -15,11 +15,8 @@ topKWeighted(N)(x, weight)
**Arguments**
- `N` — The number of elements to return.
**Arguments**
- `x` — The value.
- `weight` — The weight. [UInt8](../../../sql-reference/data-types/int-uint.md).
- `weight` — The weight. Every value is accounted `weight` times for frequency calculation. [UInt64](../../../sql-reference/data-types/int-uint.md).
**Returned value**
@ -40,3 +37,7 @@ Result:
│ [999,998,997,996,995,994,993,992,991,990] │
└───────────────────────────────────────────┘
```
**See Also**
- [topK](../../../sql-reference/aggregate-functions/reference/topk.md)

View File

@ -1,4 +1,4 @@
# LDAP {#external-authenticators-ldap}
# LDAP {#external-authenticators-ldap}
Для аутентификации пользователей ClickHouse можно использовать сервер LDAP. Существуют два подхода:
@ -17,6 +17,7 @@
<yandex>
<!- ... -->
<ldap_servers>
<!- Typical LDAP server. -->
<my_ldap_server>
<host>localhost</host>
<port>636</port>
@ -31,6 +32,18 @@
<tls_ca_cert_dir>/path/to/tls_ca_cert_dir</tls_ca_cert_dir>
<tls_cipher_suite>ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:AES256-GCM-SHA384</tls_cipher_suite>
</my_ldap_server>
<!- Typical Active Directory with configured user DN detection for further role mapping. -->
<my_ad_server>
<host>localhost</host>
<port>389</port>
<bind_dn>EXAMPLE\{user_name}</bind_dn>
<user_dn_detection>
<base_dn>CN=Users,DC=example,DC=com</base_dn>
<search_filter>(&amp;(objectClass=user)(sAMAccountName={user_name}))</search_filter>
</user_dn_detection>
<enable_tls>no</enable_tls>
</my_ad_server>
</ldap_servers>
</yandex>
```
@ -41,9 +54,18 @@
- `host` — имя хоста сервера LDAP или его IP. Этот параметр обязательный и не может быть пустым.
- `port` — порт сервера LDAP. Если настройка `enable_tls` равна `true`, то по умолчанию используется порт `636`, иначе — порт `389`.
- `bind_dn` — шаблон для создания DN для привязки.
- `bind_dn` — шаблон для создания DN подключения.
- При формировании DN все подстроки `{user_name}` в шаблоне будут заменяться на фактическое имя пользователя при каждой попытке аутентификации.
- `verification_cooldown` — промежуток времени (в секундах) после успешной попытки привязки, в течение которого пользователь будет считаться аутентифицированным и сможет выполнять запросы без повторного обращения к серверам LDAP.
- `user_dn_detection` — секция с параметрами LDAP поиска для определения фактического значения DN подключенного пользователя.
- Это в основном используется в фильтрах поиска для дальнейшего сопоставления ролей, когда сервер является Active Directory. Полученный DN пользователя будет использоваться при замене подстрок `{user_dn}` везде, где они разрешены. По умолчанию DN пользователя устанавливается равным DN подключения, но после выполнения поиска он будет обновлен до фактического найденного значения DN пользователя.
- `base_dn` — шаблон для создания базового DN для LDAP поиска.
- При формировании DN все подстроки `{user_name}` и `{bind_dn}` в шаблоне будут заменяться на фактическое имя пользователя и DN подключения соответственно при каждом LDAP поиске.
- `scope` — область LDAP поиска.
- Возможные значения: `base`, `one_level`, `children`, `subtree` (по умолчанию).
- `search_filter` — шаблон для создания фильтра для каждого LDAP поиска.
- При формировании фильтра все подстроки `{user_name}`, `{bind_dn}`, `{user_dn}` и `{base_dn}` в шаблоне будут заменяться на фактическое имя пользователя, DN подключения, DN пользователя и базовый DN соответственно при каждом LDAP поиске.
- Обратите внимание, что специальные символы должны быть правильно экранированы в XML.
- `verification_cooldown` — промежуток времени (в секундах) после успешной попытки подключения, в течение которого пользователь будет считаться аутентифицированным и сможет выполнять запросы без повторного обращения к серверам LDAP.
- Чтобы отключить кеширование и заставить обращаться к серверу LDAP для каждого запроса аутентификации, укажите `0` (значение по умолчанию).
- `enable_tls` — флаг, включающий использование защищенного соединения с сервером LDAP.
- Укажите `no` для использования текстового протокола `ldap://` (не рекомендовано).
@ -106,7 +128,7 @@ CREATE USER my_user IDENTIFIED WITH ldap SERVER 'my_ldap_server';
<yandex>
<!- ... -->
<user_directories>
<!- ... -->
<!- Typical LDAP server. -->
<ldap>
<server>my_ldap_server</server>
<roles>
@ -121,6 +143,18 @@ CREATE USER my_user IDENTIFIED WITH ldap SERVER 'my_ldap_server';
<prefix>clickhouse_</prefix>
</role_mapping>
</ldap>
<!- Typical Active Directory with role mapping that relies on the detected user DN. -->
<ldap>
<server>my_ad_server</server>
<role_mapping>
<base_dn>CN=Users,DC=example,DC=com</base_dn>
<attribute>CN</attribute>
<scope>subtree</scope>
<search_filter>(&amp;(objectClass=group)(member={user_dn}))</search_filter>
<prefix>clickhouse_</prefix>
</role_mapping>
</ldap>
</user_directories>
</yandex>
```
@ -135,14 +169,14 @@ CREATE USER my_user IDENTIFIED WITH ldap SERVER 'my_ldap_server';
- `role_mapping` — секция c параметрами LDAP поиска и правилами отображения.
- При аутентификации пользователя, пока еще связанного с LDAP, производится LDAP поиск с помощью `search_filter` и имени этого пользователя. Для каждой записи, найденной в ходе поиска, выделяется значение указанного атрибута. У каждого атрибута, имеющего указанный префикс, этот префикс удаляется, а остальная часть значения становится именем локальной роли, определенной в ClickHouse, причем предполагается, что эта роль была ранее создана запросом [CREATE ROLE](../../sql-reference/statements/create/role.md#create-role-statement) до этого.
- Внутри одной секции `ldap` может быть несколько секций `role_mapping`. Все они будут применены.
- `base_dn` — шаблон, который используется для создания базового DN для LDAP поиска.
- При формировании DN все подстроки `{user_name}` и `{bind_dn}` в шаблоне будут заменяться на фактическое имя пользователя и DN привязки соответственно при каждом LDAP поиске.
- `scope`Область LDAP поиска.
- `base_dn` — шаблон для создания базового DN для LDAP поиска.
- При формировании DN все подстроки `{user_name}`, `{bind_dn}` и `{user_dn}` в шаблоне будут заменяться на фактическое имя пользователя, DN подключения и DN пользователя соответственно при каждом LDAP поиске.
- `scope`область LDAP поиска.
- Возможные значения: `base`, `one_level`, `children`, `subtree` (по умолчанию).
- `search_filter` — шаблон, который используется для создания фильтра для каждого LDAP поиска.
- при формировании фильтра все подстроки `{user_name}`, `{bind_dn}` и `{base_dn}` в шаблоне будут заменяться на фактическое имя пользователя, DN привязки и базовый DN соответственно при каждом LDAP поиске.
- `search_filter` — шаблон для создания фильтра для каждого LDAP поиска.
- При формировании фильтра все подстроки `{user_name}`, `{bind_dn}`, `{user_dn}` и `{base_dn}` в шаблоне будут заменяться на фактическое имя пользователя, DN подключения, DN пользователя и базовый DN соответственно при каждом LDAP поиске.
- Обратите внимание, что специальные символы должны быть правильно экранированы в XML.
- `attribute` — имя атрибута, значение которого будет возвращаться LDAP поиском.
- `attribute` — имя атрибута, значение которого будет возвращаться LDAP поиском. По умолчанию: `cn`.
- `prefix` — префикс, который, как предполагается, будет находиться перед началом каждой строки в исходном списке строк, возвращаемых LDAP поиском. Префикс будет удален из исходных строк, а сами они будут рассматриваться как имена локальных ролей. По умолчанию: пустая строка.
[Оригинальная статья](https://clickhouse.tech/docs/en/operations/external-authenticators/ldap) <!--hide-->

View File

@ -0,0 +1,39 @@
---
toc_priority: 145
---
# sumKahan {#agg_function-sumKahan}
Вычисляет сумму с использованием [компенсационного суммирования по алгоритму Кэхэна](https://ru.wikipedia.org/wiki/Алгоритм_Кэхэна).
Работает медленнее функции [sum](./sum.md).
Компенсация работает только для [Float](../../../sql-reference/data-types/float.md) типов.
**Синтаксис**
``` sql
sumKahan(x)
```
**Аргументы**
- `x` — Входное значение типа [Integer](../../../sql-reference/data-types/int-uint.md), [Float](../../../sql-reference/data-types/float.md), или [Decimal](../../../sql-reference/data-types/decimal.md).
**Возвращемое значение**
- сумма чисел с типом [Integer](../../../sql-reference/data-types/int-uint.md), [Float](../../../sql-reference/data-types/float.md), ил [Decimal](../../../sql-reference/data-types/decimal.md) зависящим от типа входящих аргументов
**Пример**
Запрос:
``` sql
SELECT sum(0.1), sumKahan(0.1) FROM numbers(10);
```
Результат:
``` text
┌───────────sum(0.1)─┬─sumKahan(0.1)─┐
│ 0.9999999999999999 │ 1 │
└────────────────────┴───────────────┘
```

View File

@ -4,7 +4,7 @@ toc_priority: 109
# topKWeighted {#topkweighted}
Аналогична `topK`, но дополнительно принимает положительный целочисленный параметр `weight`. Каждое значение учитывается `weight` раз при расчёте частоты.
Возвращает массив наиболее часто встречающихся значений в указанном столбце. Результирующий массив упорядочен по убыванию частоты значения (не по самим значениям). Дополнительно учитывается вес значения.
**Синтаксис**
@ -15,11 +15,8 @@ topKWeighted(N)(x, weight)
**Аргументы**
- `N` — количество элементов для выдачи.
**Аргументы**
- `x` — значение.
- `weight` — вес. [UInt8](../../../sql-reference/data-types/int-uint.md).
- `weight` — вес. Каждое значение учитывается `weight` раз при расчёте частоты. [UInt64](../../../sql-reference/data-types/int-uint.md).
**Возвращаемое значение**
@ -41,3 +38,6 @@ SELECT topKWeighted(10)(number, number) FROM numbers(1000)
└───────────────────────────────────────────┘
```
**Смотрите также**
- [topK](../../../sql-reference/aggregate-functions/reference/topk.md)

View File

@ -319,13 +319,12 @@ GRANT INSERT(x,y) ON db.table TO john
Разрешает выполнять запросы [DROP](misc.md#drop) и [DETACH](misc.md#detach-statement) в соответствии со следующей иерархией привилегий:
- `DROP`. Уровень:
- `DROP`. Уровень: `GROUP`
- `DROP DATABASE`. Уровень: `DATABASE`
- `DROP TABLE`. Уровень: `TABLE`
- `DROP VIEW`. Уровень: `VIEW`
- `DROP DICTIONARY`. Уровень: `DICTIONARY`
### TRUNCATE {#grant-truncate}
Разрешает выполнять запросы [TRUNCATE](../../sql-reference/statements/truncate.md).

View File

@ -122,14 +122,14 @@ FROM s3('https://storage.yandexcloud.net/my-test-bucket-768/big_prefix/file-{000
Запишем данные в файл `test-data.csv.gz`:
``` sql
INSERT INTO s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
INSERT INTO FUNCTION s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
VALUES ('test-data', 1), ('test-data-2', 2);
```
Запишем данные из существующей таблицы в файл `test-data.csv.gz`:
``` sql
INSERT INTO s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
INSERT INTO FUNCTION s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
SELECT name, value FROM existing_table;
```

View File

@ -1,27 +1,24 @@
---
machine_translated: true
machine_translated_rev: 72537a2d527c63c07aa5d2361a8829f3895cf2bd
toc_priority: 36
toc_title: HDFS
---
# HDFS {#table_engines-hdfs}
该引擎提供了集成 [Apache Hadoop](https://en.wikipedia.org/wiki/Apache_Hadoop) 生态系统通过允许管理数据 [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html)通过ClickHouse. 这个引擎是相似的
[文件](../special/file.md#table_engines-file) 和 [URL](../special/url.md#table_engines-url) 引擎但提供Hadoop特定功能。
这个引擎提供了与 [Apache Hadoop](https://en.wikipedia.org/wiki/Apache_Hadoop) 生态系统的集成,允许通过 ClickHouse 管理 [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html) 上的数据。这个引擎类似于
[文件](../../../engines/table-engines/special/file.md#table_engines-file) 和 [URL](../../../engines/table-engines/special/url.md#table_engines-url) 引擎,但提供Hadoop特定功能。
## 用 {#usage}
## 用 {#usage}
``` sql
ENGINE = HDFS(URI, format)
```
`URI` 参数是HDFS中的整个文件URI。
`format` 参数指定一种可用的文件格式。 执行
`SELECT` 查询时,格式必须支持输入,并执行
`INSERT` queries for output. The available formats are listed in the
[格式](../../../interfaces/formats.md#formats) 科。
路径部分 `URI` 可能包含水珠。 在这种情况下,表将是只读的。
`URI` 参数是 HDFS 中整个文件的 URI。
`format` 参数指定一种可用的文件格式。 执行
`SELECT` 查询时,格式必须支持输入,以及执行
`INSERT` 查询时,格式必须支持输出. 你可以在 [格式](../../../interfaces/formats.md#formats) 章节查看可用的格式。
路径部分 `URI` 可能包含 glob 通配符。 在这种情况下,表将是只读的。
**示例:**
@ -58,20 +55,20 @@ SELECT * FROM hdfs_engine_table LIMIT 2
- 索引。
- 复制。
**路径中的水珠**
**路径中的通配符**
多个路径组件可以具有globs。 对于正在处理的文件应该存在并匹配到整个路径模式。 文件列表确定在 `SELECT` (不在 `CREATE` 时刻)。
多个路径组件可以具有 globs。 对于正在处理的文件应该存在并匹配到整个路径模式。 文件列表的确定是在 `SELECT` 的时候进行(而不是在 `CREATE` 的时候)。
- `*`Substitutes any number of any characters except `/` 包括空字符串。
- `?`Substitutes any single character.
- `{some_string,another_string,yet_another_one}`Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`.
- `{N..M}`Substitutes any number in range from N to M including both borders.
- `*`替代任何数量的任何字符,除了 `/` 以及空字符串。
- `?`代替任何单个字符.
- `{some_string,another_string,yet_another_one}`替代任何字符串 `'some_string', 'another_string', 'yet_another_one'`.
- `{N..M}`替换 N 到 M 范围内的任何数字,包括两个边界的值.
建筑与 `{}` 类似于 [远程](../../../sql-reference/table-functions/remote.md) 表功能
`{}` 的结构类似于 [远程](../../../sql-reference/table-functions/remote.md) 表函数
**示例**
1. 假设我们在HDFS上有几个TSV格式的文件其中包含以下Uri:
1. 假设我们在 HDFS 上有几个 TSV 格式的文件,文件的 URI 如下:
- hdfs://hdfs1:9000/some_dir/some_file_1
- hdfs://hdfs1:9000/some_dir/some_file_2
@ -111,10 +108,98 @@ CREATE TABLE table_with_asterisk (name String, value UInt32) ENGINE = HDFS('hdfs
CREARE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV')
```
## 配置 {#configuration}
与 GraphiteMergeTree 类似HDFS 引擎支持使用 ClickHouse 配置文件进行扩展配置。有两个配置键可以使用:全局 (`hdfs`) 和用户级别 (`hdfs_*`)。首先全局配置生效,然后用户级别配置生效 (如果用户级别配置存在) 。
``` xml
<!-- HDFS 引擎类型的全局配置选项 -->
<hdfs>
<hadoop_kerberos_keytab>/tmp/keytab/clickhouse.keytab</hadoop_kerberos_keytab>
<hadoop_kerberos_principal>clickuser@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
<hadoop_security_authentication>kerberos</hadoop_security_authentication>
</hdfs>
<!-- 用户 "root" 的指定配置 -->
<hdfs_root>
<hadoop_kerberos_principal>root@TEST.CLICKHOUSE.TECH</hadoop_kerberos_principal>
</hdfs_root>
```
### 可选配置选项及其默认值的列表
#### libhdfs3 支持的
| **参数** | **默认值** |
| rpc\_client\_connect\_tcpnodelay | true |
| dfs\_client\_read\_shortcircuit | true |
| output\_replace-datanode-on-failure | true |
| input\_notretry-another-node | false |
| input\_localread\_mappedfile | true |
| dfs\_client\_use\_legacy\_blockreader\_local | false |
| rpc\_client\_ping\_interval | 10 * 1000 |
| rpc\_client\_connect\_timeout | 600 * 1000 |
| rpc\_client\_read\_timeout | 3600 * 1000 |
| rpc\_client\_write\_timeout | 3600 * 1000 |
| rpc\_client\_socekt\_linger\_timeout | -1 |
| rpc\_client\_connect\_retry | 10 |
| rpc\_client\_timeout | 3600 * 1000 |
| dfs\_default\_replica | 3 |
| input\_connect\_timeout | 600 * 1000 |
| input\_read\_timeout | 3600 * 1000 |
| input\_write\_timeout | 3600 * 1000 |
| input\_localread\_default\_buffersize | 1 * 1024 * 1024 |
| dfs\_prefetchsize | 10 |
| input\_read\_getblockinfo\_retry | 3 |
| input\_localread\_blockinfo\_cachesize | 1000 |
| input\_read\_max\_retry | 60 |
| output\_default\_chunksize | 512 |
| output\_default\_packetsize | 64 * 1024 |
| output\_default\_write\_retry | 10 |
| output\_connect\_timeout | 600 * 1000 |
| output\_read\_timeout | 3600 * 1000 |
| output\_write\_timeout | 3600 * 1000 |
| output\_close\_timeout | 3600 * 1000 |
| output\_packetpool\_size | 1024 |
| output\_heeartbeat\_interval | 10 * 1000 |
| dfs\_client\_failover\_max\_attempts | 15 |
| dfs\_client\_read\_shortcircuit\_streams\_cache\_size | 256 |
| dfs\_client\_socketcache\_expiryMsec | 3000 |
| dfs\_client\_socketcache\_capacity | 16 |
| dfs\_default\_blocksize | 64 * 1024 * 1024 |
| dfs\_default\_uri | "hdfs://localhost:9000" |
| hadoop\_security\_authentication | "simple" |
| hadoop\_security\_kerberos\_ticket\_cache\_path | "" |
| dfs\_client\_log\_severity | "INFO" |
| dfs\_domain\_socket\_path | "" |
[HDFS 配置参考](https://hawq.apache.org/docs/userguide/2.3.0.0-incubating/reference/HDFSConfigurationParameterReference.html) 也许会解释一些参数的含义.
#### ClickHouse 额外的配置 {#clickhouse-extras}
| **参数** | **默认值** |
|hadoop\_kerberos\_keytab | "" |
|hadoop\_kerberos\_principal | "" |
|hadoop\_kerberos\_kinit\_command | kinit |
#### 限制 {#limitations}
* hadoop\_security\_kerberos\_ticket\_cache\_path 只能在全局配置, 不能指定用户
## Kerberos 支持 {#kerberos-support}
如果 hadoop\_security\_authentication 参数的值为 'kerberos' ClickHouse 将通过 Kerberos 设施进行认证。
[这里的](#clickhouse-extras) 参数和 hadoop\_security\_kerberos\_ticket\_cache\_path 也许会有帮助.
注意,由于 libhdfs3 的限制,只支持老式的方法。
数据节点的安全通信无法由 SASL 保证 ( HADOOP\_SECURE\_DN\_USER 是这种安全方法的一个可靠指标)
使用 tests/integration/test\_storage\_kerberized\_hdfs/hdfs_configs/bootstrap.sh 脚本作为参考。
如果指定了 hadoop\_kerberos\_keytab, hadoop\_kerberos\_principal 或者 hadoop\_kerberos\_kinit\_command ,将会调用 kinit 工具.在此情况下, hadoop\_kerberos\_keytab 和 hadoop\_kerberos\_principal 参数是必须配置的. kinit 工具和 krb5 配置文件是必要的.
## 虚拟列 {#virtual-columns}
- `_path` — Path to the file.
- `_file` — Name of the file.
- `_path`文件路径.
- `_file`文件名.
**另请参阅**

View File

@ -1336,7 +1336,7 @@ private:
fmt::print(
stderr,
"IAST::clone() is broken for some AST node. This is a bug. The original AST ('dump before fuzz') and its cloned copy ('dump of cloned AST') refer to the same nodes, which must never happen. This means that their parent node doesn't implement clone() correctly.");
"Found error: IAST::clone() is broken for some AST node. This is a bug. The original AST ('dump before fuzz') and its cloned copy ('dump of cloned AST') refer to the same nodes, which must never happen. This means that their parent node doesn't implement clone() correctly.");
exit(1);
}
@ -1461,7 +1461,7 @@ private:
const auto text_3 = ast_3->formatForErrorMessage();
if (text_3 != text_2)
{
fmt::print(stderr, "The query formatting is broken.\n");
fmt::print(stderr, "Found error: The query formatting is broken.\n");
printChangedSettings();

View File

@ -325,14 +325,14 @@ void QueryFuzzer::fuzzColumnLikeExpressionList(IAST * ast)
// the generic recursion into IAST.children.
}
void QueryFuzzer::fuzzWindowFrame(WindowFrame & frame)
void QueryFuzzer::fuzzWindowFrame(ASTWindowDefinition & def)
{
switch (fuzz_rand() % 40)
{
case 0:
{
const auto r = fuzz_rand() % 3;
frame.type = r == 0 ? WindowFrame::FrameType::Rows
def.frame_type = r == 0 ? WindowFrame::FrameType::Rows
: r == 1 ? WindowFrame::FrameType::Range
: WindowFrame::FrameType::Groups;
break;
@ -340,44 +340,65 @@ void QueryFuzzer::fuzzWindowFrame(WindowFrame & frame)
case 1:
{
const auto r = fuzz_rand() % 3;
frame.begin_type = r == 0 ? WindowFrame::BoundaryType::Unbounded
def.frame_begin_type = r == 0 ? WindowFrame::BoundaryType::Unbounded
: r == 1 ? WindowFrame::BoundaryType::Current
: WindowFrame::BoundaryType::Offset;
if (def.frame_begin_type == WindowFrame::BoundaryType::Offset)
{
// The offsets are fuzzed normally through 'children'.
def.frame_begin_offset
= std::make_shared<ASTLiteral>(getRandomField(0));
}
else
{
def.frame_begin_offset = nullptr;
}
break;
}
case 2:
{
const auto r = fuzz_rand() % 3;
frame.end_type = r == 0 ? WindowFrame::BoundaryType::Unbounded
def.frame_end_type = r == 0 ? WindowFrame::BoundaryType::Unbounded
: r == 1 ? WindowFrame::BoundaryType::Current
: WindowFrame::BoundaryType::Offset;
break;
}
case 3:
{
frame.begin_offset = getRandomField(0).get<Int64>();
break;
}
case 4:
{
frame.end_offset = getRandomField(0).get<Int64>();
if (def.frame_end_type == WindowFrame::BoundaryType::Offset)
{
def.frame_end_offset
= std::make_shared<ASTLiteral>(getRandomField(0));
}
else
{
def.frame_end_offset = nullptr;
}
break;
}
case 5:
{
frame.begin_preceding = fuzz_rand() % 2;
def.frame_begin_preceding = fuzz_rand() % 2;
break;
}
case 6:
{
frame.end_preceding = fuzz_rand() % 2;
def.frame_end_preceding = fuzz_rand() % 2;
break;
}
default:
break;
}
frame.is_default = (frame == WindowFrame{});
if (def.frame_type == WindowFrame::FrameType::Range
&& def.frame_begin_type == WindowFrame::BoundaryType::Unbounded
&& def.frame_begin_preceding
&& def.frame_end_type == WindowFrame::BoundaryType::Current)
{
def.frame_is_default = true; /* NOLINT clang-tidy could you just shut up please */
}
else
{
def.frame_is_default = false;
}
}
void QueryFuzzer::fuzz(ASTs & asts)
@ -464,7 +485,7 @@ void QueryFuzzer::fuzz(ASTPtr & ast)
auto & def = fn->window_definition->as<ASTWindowDefinition &>();
fuzzColumnLikeExpressionList(def.partition_by.get());
fuzzOrderByList(def.order_by.get());
fuzzWindowFrame(def.frame);
fuzzWindowFrame(def);
}
fuzz(fn->children);

View File

@ -17,7 +17,7 @@ namespace DB
class ASTExpressionList;
class ASTOrderByElement;
struct WindowFrame;
struct ASTWindowDefinition;
/*
* This is an AST-based query fuzzer that makes random modifications to query
@ -69,7 +69,7 @@ struct QueryFuzzer
void fuzzOrderByElement(ASTOrderByElement * elem);
void fuzzOrderByList(IAST * ast);
void fuzzColumnLikeExpressionList(IAST * ast);
void fuzzWindowFrame(WindowFrame & frame);
void fuzzWindowFrame(ASTWindowDefinition & def);
void fuzz(ASTs & asts);
void fuzz(ASTPtr & ast);
void collectFuzzInfoMain(const ASTPtr ast);

File diff suppressed because it is too large Load Diff

View File

@ -18,12 +18,13 @@ public:
ClusterCopier(const String & task_path_,
const String & host_id_,
const String & proxy_database_name_,
ContextMutablePtr context_)
ContextMutablePtr context_,
Poco::Logger * log_)
: WithMutableContext(context_),
task_zookeeper_path(task_path_),
host_id(host_id_),
working_database_name(proxy_database_name_),
log(&Poco::Logger::get("ClusterCopier")) {}
log(log_) {}
void init();
@ -117,14 +118,14 @@ protected:
TaskStatus tryMoveAllPiecesToDestinationTable(const TaskTable & task_table, const String & partition_name);
/// Removes MATERIALIZED and ALIAS columns from create table query
static ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast);
static ASTPtr removeAliasMaterializedAndTTLColumnsFromCreateQuery(const ASTPtr & query_ast, bool allow_to_copy_alias_and_materialized_columns);
bool tryDropPartitionPiece(ShardPartition & task_partition, size_t current_piece_number,
const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock);
static constexpr UInt64 max_table_tries = 3;
static constexpr UInt64 max_shard_partition_tries = 3;
static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 3;
static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 10;
bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
@ -189,9 +190,7 @@ protected:
const ClusterPtr & cluster,
const String & query,
const Settings & current_settings,
PoolMode pool_mode = PoolMode::GET_ALL,
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD,
UInt64 max_successful_executions_per_shard = 0) const;
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD) const;
private:
String task_zookeeper_path;
@ -208,7 +207,6 @@ private:
ConfigurationPtr task_cluster_initial_config;
ConfigurationPtr task_cluster_current_config;
Coordination::Stat task_description_current_stat{};
std::unique_ptr<TaskCluster> task_cluster;

View File

@ -22,8 +22,9 @@ void ClusterCopierApp::initialize(Poco::Util::Application & self)
config_xml_path = config().getString("config-file");
task_path = config().getString("task-path");
log_level = config().getString("log-level", "trace");
log_level = config().getString("log-level", "info");
is_safe_mode = config().has("safe-mode");
is_status_mode = config().has("status");
if (config().has("copy-fault-probability"))
copy_fault_probability = std::max(std::min(config().getDouble("copy-fault-probability"), 1.0), 0.0);
if (config().has("move-fault-probability"))
@ -97,6 +98,7 @@ void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
.argument("base-dir").binding("base-dir"));
options.addOption(Poco::Util::Option("experimental-use-sample-offset", "", "Use SAMPLE OFFSET query instead of cityHash64(PRIMARY KEY) % n == k")
.argument("experimental-use-sample-offset").binding("experimental-use-sample-offset"));
options.addOption(Poco::Util::Option("status", "", "Get for status for current execution").binding("status"));
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("help", "", "produce this help message").binding("help")
@ -106,6 +108,25 @@ void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
void ClusterCopierApp::mainImpl()
{
/// Status command
{
if (is_status_mode)
{
SharedContextHolder shared_context = Context::createShared();
auto context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();
SCOPE_EXIT_SAFE(context->shutdown());
auto zookeeper = context->getZooKeeper();
auto status_json = zookeeper->get(task_path + "/status");
LOG_INFO(&logger(), "{}", status_json);
std::cout << status_json << std::endl;
context->resetZooKeeper();
return;
}
}
StatusFile status_file(process_path + "/status", StatusFile::write_full_info);
ThreadStatus thread_status;
@ -136,7 +157,7 @@ void ClusterCopierApp::mainImpl()
/// Initialize query scope just in case.
CurrentThread::QueryScope query_scope(context);
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, context);
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, context, log);
copier->setSafeMode(is_safe_mode);
copier->setCopyFaultProbability(copy_fault_probability);
copier->setMoveFaultProbability(move_fault_probability);

View File

@ -76,8 +76,9 @@ private:
std::string config_xml_path;
std::string task_path;
std::string log_level = "trace";
std::string log_level = "info";
bool is_safe_mode = false;
bool is_status_mode = false;
double copy_fault_probability = 0.0;
double move_fault_probability = 0.0;
bool is_help = false;

View File

@ -0,0 +1,65 @@
#pragma once
#include <Poco/JSON/Parser.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
#include <unordered_map>
#include <memory>
#include <string>
#include <iostream>
namespace DB
{
class StatusAccumulator
{
public:
struct TableStatus
{
size_t all_partitions_count;
size_t processed_partitions_count;
};
using Map = std::unordered_map<std::string, TableStatus>;
using MapPtr = std::shared_ptr<Map>;
static MapPtr fromJSON(std::string state_json)
{
Poco::JSON::Parser parser;
auto state = parser.parse(state_json).extract<Poco::JSON::Object::Ptr>();
MapPtr result_ptr = std::make_shared<Map>();
for (const auto & table_name : state->getNames())
{
auto table_status_json = state->getValue<std::string>(table_name);
auto table_status = parser.parse(table_status_json).extract<Poco::JSON::Object::Ptr>();
/// Map entry will be created if it is absent
auto & map_table_status = (*result_ptr)[table_name];
map_table_status.all_partitions_count += table_status->getValue<size_t>("all_partitions_count");
map_table_status.processed_partitions_count += table_status->getValue<size_t>("processed_partitions_count");
}
return result_ptr;
}
static std::string serializeToJSON(MapPtr statuses)
{
Poco::JSON::Object result_json;
for (const auto & [table_name, table_status] : *statuses)
{
Poco::JSON::Object status_json;
status_json.set("all_partitions_count", table_status.all_partitions_count);
status_json.set("processed_partitions_count", table_status.processed_partitions_count);
result_json.set(table_name, status_json);
}
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(result_json, oss);
auto result = oss.str();
return result;
}
};
}

View File

@ -77,6 +77,8 @@ inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfigurat
if (config.has(prefix + "settings"))
settings_common.loadSettingsFromConfig(prefix + "settings", config);
settings_common.prefer_localhost_replica = 0;
settings_pull = settings_common;
if (config.has(prefix + "settings_pull"))
settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config);
@ -92,11 +94,15 @@ inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfigurat
/// Override important settings
settings_pull.readonly = 1;
settings_push.insert_distributed_sync = 1;
settings_pull.prefer_localhost_replica = false;
settings_push.insert_distributed_sync = true;
settings_push.prefer_localhost_replica = false;
set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME);
set_default_value(settings_pull.max_threads, 1);
set_default_value(settings_pull.max_block_size, 8192UL);
set_default_value(settings_pull.preferred_block_size_bytes, 0);
set_default_value(settings_push.insert_distributed_timeout, 0);
set_default_value(settings_push.replication_alter_partitions_sync, 2);
}

View File

@ -36,27 +36,33 @@ struct TaskTable
String getPartitionAttachIsDonePath(const String & partition_name) const;
String getPartitionPiecePath(const String & partition_name, const size_t piece_number) const;
String getPartitionPiecePath(const String & partition_name, size_t piece_number) const;
String getCertainPartitionIsDirtyPath(const String & partition_name) const;
String getCertainPartitionPieceIsDirtyPath(const String & partition_name, const size_t piece_number) const;
String getCertainPartitionPieceIsDirtyPath(const String & partition_name, size_t piece_number) const;
String getCertainPartitionIsCleanedPath(const String & partition_name) const;
String getCertainPartitionPieceIsCleanedPath(const String & partition_name, const size_t piece_number) const;
String getCertainPartitionPieceIsCleanedPath(const String & partition_name, size_t piece_number) const;
String getCertainPartitionTaskStatusPath(const String & partition_name) const;
String getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const;
String getCertainPartitionPieceTaskStatusPath(const String & partition_name, size_t piece_number) const;
bool isReplicatedTable() const { return is_replicated_table; }
/// These nodes are used for check-status option
String getStatusAllPartitionCount() const;
String getStatusProcessedPartitionsCount() const;
/// Partitions will be split into number-of-splits pieces.
/// Each piece will be copied independently. (10 by default)
size_t number_of_splits;
bool allow_to_copy_alias_and_materialized_columns{false};
bool allow_to_drop_target_partitions{false};
String name_in_config;
/// Used as task ID
@ -83,7 +89,7 @@ struct TaskTable
String engine_push_zk_path;
bool is_replicated_table;
ASTPtr rewriteReplicatedCreateQueryToPlain();
ASTPtr rewriteReplicatedCreateQueryToPlain() const;
/*
* A Distributed table definition used to split data
@ -181,6 +187,7 @@ struct TaskShard
/// Last CREATE TABLE query of the table of the shard
ASTPtr current_pull_table_create_query;
ASTPtr current_push_table_create_query;
/// Internal distributed tables
DatabaseAndTableName table_read_shard;
@ -242,6 +249,16 @@ inline String TaskTable::getCertainPartitionPieceTaskStatusPath(const String & p
return getPartitionPiecePath(partition_name, piece_number) + "/shards";
}
inline String TaskTable::getStatusAllPartitionCount() const
{
return task_cluster.task_zookeeper_path + "/status/all_partitions_count";
}
inline String TaskTable::getStatusProcessedPartitionsCount() const
{
return task_cluster.task_zookeeper_path + "/status/processed_partitions_count";
}
inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config,
const String & prefix_, const String & table_key)
: task_cluster(parent)
@ -250,7 +267,10 @@ inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConf
name_in_config = table_key;
number_of_splits = config.getUInt64(table_prefix + "number_of_splits", 10);
number_of_splits = config.getUInt64(table_prefix + "number_of_splits", 3);
allow_to_copy_alias_and_materialized_columns = config.getBool(table_prefix + "allow_to_copy_alias_and_materialized_columns", false);
allow_to_drop_target_partitions = config.getBool(table_prefix + "allow_to_drop_target_partitions", false);
cluster_pull_name = config.getString(table_prefix + "cluster_pull");
cluster_push_name = config.getString(table_prefix + "cluster_push");
@ -343,7 +363,7 @@ inline void TaskTable::initShards(RandomEngine && random_engine)
std::uniform_int_distribution<UInt8> get_urand(0, std::numeric_limits<UInt8>::max());
// Compute the priority
for (auto & shard_info : cluster_pull->getShardsInfo())
for (const auto & shard_info : cluster_pull->getShardsInfo())
{
TaskShardPtr task_shard = std::make_shared<TaskShard>(*this, shard_info);
const auto & replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster());
@ -369,7 +389,7 @@ inline void TaskTable::initShards(RandomEngine && random_engine)
local_shards.assign(all_shards.begin(), it_first_remote);
}
inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain()
inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain() const
{
ASTPtr prev_engine_push_ast = engine_push_ast->clone();
@ -383,9 +403,15 @@ inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain()
{
auto & replicated_table_arguments = new_engine_ast.arguments->children;
/// Delete first two arguments of Replicated...MergeTree() table.
replicated_table_arguments.erase(replicated_table_arguments.begin());
replicated_table_arguments.erase(replicated_table_arguments.begin());
/// In some cases of Atomic database engine usage ReplicatedMergeTree tables
/// could be created without arguments.
if (!replicated_table_arguments.empty())
{
/// Delete first two arguments of Replicated...MergeTree() table.
replicated_table_arguments.erase(replicated_table_arguments.begin());
replicated_table_arguments.erase(replicated_table_arguments.begin());
}
}
return new_storage_ast.clone();
@ -400,7 +426,7 @@ inline String DB::TaskShard::getDescription() const
inline String DB::TaskShard::getHostNameExample() const
{
auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster());
const auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster());
return replicas.at(0).readableString();
}

View File

@ -1,7 +1,7 @@
#include <AggregateFunctions/AggregateFunctionArray.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <Common/typeid_cast.h>
#include "registerAggregateFunctions.h"
namespace DB
{

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/AggregateFunctionBitwise.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -1,7 +1,6 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionBoundingRatio.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -3,7 +3,6 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Helpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -1,11 +1,9 @@
#include <Common/StringUtils/StringUtils.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include "registerAggregateFunctions.h"
namespace DB
{
struct Settings;
namespace ErrorCodes
{

View File

@ -1,7 +1,6 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -100,7 +100,7 @@ public:
}
/// Reset the state to specified value. This function is not the part of common interface.
void set(AggregateDataPtr __restrict place, UInt64 new_count)
void set(AggregateDataPtr __restrict place, UInt64 new_count) const
{
data(place).count = new_count;
}

View File

@ -2,7 +2,7 @@
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <AggregateFunctions/Helpers.h>
#include <Common/typeid_cast.h>
#include "registerAggregateFunctions.h"
namespace DB
{

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/AggregateFunctionEntropy.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Helpers.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -17,7 +16,8 @@ namespace ErrorCodes
namespace
{
AggregateFunctionPtr createAggregateFunctionEntropy(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionEntropy(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
if (argument_types.empty())

View File

@ -17,10 +17,10 @@
#include <Common/CurrentThread.h>
#include <Poco/String.h>
#include "registerAggregateFunctions.h"
#include <Functions/FunctionFactory.h>
namespace DB
{
struct Settings;
@ -95,7 +95,7 @@ AggregateFunctionPtr AggregateFunctionFactory::get(
// nullability themselves. Another special case is functions from Nothing
// that are rewritten to AggregateFunctionNothing, in this case
// nested_function is nullptr.
if (nested_function && nested_function->asWindowFunction())
if (nested_function && nested_function->isOnlyWindowFunction())
{
return nested_function;
}

View File

@ -1,12 +1,10 @@
#include <AggregateFunctions/AggregateFunctionForEach.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <Common/typeid_cast.h>
#include "registerAggregateFunctions.h"
namespace DB
{
struct Settings;
namespace ErrorCodes
{

View File

@ -4,7 +4,6 @@
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -50,7 +49,8 @@ inline AggregateFunctionPtr createAggregateFunctionGroupArrayImpl(const DataType
}
AggregateFunctionPtr createAggregateFunctionGroupArray(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionGroupArray(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertUnary(name, argument_types);

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/AggregateFunctionGroupArrayInsertAt.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -17,7 +16,8 @@ namespace ErrorCodes
namespace
{
AggregateFunctionPtr createAggregateFunctionGroupArrayInsertAt(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionGroupArrayInsertAt(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertBinary(name, argument_types);

View File

@ -5,7 +5,6 @@
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -58,7 +57,8 @@ inline AggregateFunctionPtr createAggregateFunctionMovingImpl(const std::string
}
template <template <typename, typename> class Function>
AggregateFunctionPtr createAggregateFunctionMoving(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionMoving(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertUnary(name, argument_types);

View File

@ -2,11 +2,11 @@
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Helpers.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include "registerAggregateFunctions.h"
// TODO include this last because of a broken roaring header. See the comment inside.
#include <AggregateFunctions/AggregateFunctionGroupBitmap.h>
namespace DB
{
struct Settings;
@ -18,9 +18,8 @@ namespace ErrorCodes
namespace
{
template <template <typename, typename> class AggregateFunctionTemplate, template <typename> class Data, typename... TArgs>
static IAggregateFunction * createWithIntegerType(const IDataType & argument_type, TArgs && ... args)
template <template <typename, typename> class AggregateFunctionTemplate, template <typename> typename Data, typename... TArgs>
static IAggregateFunction * createWithIntegerType(const IDataType & argument_type, TArgs &&... args)
{
WhichDataType which(argument_type);
if (which.idx == TypeIndex::UInt8) return new AggregateFunctionTemplate<UInt8, Data<UInt8>>(std::forward<TArgs>(args)...);
@ -34,8 +33,9 @@ namespace
return nullptr;
}
template <template <typename> class Data>
AggregateFunctionPtr createAggregateFunctionBitmap(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
template <template <typename> typename Data>
AggregateFunctionPtr createAggregateFunctionBitmap(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
assertUnary(name, argument_types);
@ -57,12 +57,13 @@ namespace
}
// Additional aggregate functions to manipulate bitmaps.
template <template <typename, typename> class AggregateFunctionTemplate>
AggregateFunctionPtr
createAggregateFunctionBitmapL2(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
template <template <typename, typename> typename AggregateFunctionTemplate>
AggregateFunctionPtr createAggregateFunctionBitmapL2(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
assertUnary(name, argument_types);
DataTypePtr argument_type_ptr = argument_types[0];
WhichDataType which(*argument_type_ptr);
if (which.idx != TypeIndex::AggregateFunction)
@ -70,11 +71,15 @@ namespace
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
/// groupBitmap needs to know about the data type that was used to create bitmaps.
/// We need to look inside the type of its argument to obtain it.
const DataTypeAggregateFunction & datatype_aggfunc = dynamic_cast<const DataTypeAggregateFunction &>(*argument_type_ptr);
AggregateFunctionPtr aggfunc = datatype_aggfunc.getFunction();
argument_type_ptr = aggfunc->getArgumentTypes()[0];
DataTypePtr nested_argument_type_ptr = aggfunc->getArgumentTypes()[0];
AggregateFunctionPtr res(createWithIntegerType<AggregateFunctionTemplate, AggregateFunctionGroupBitmapData>(
*argument_type_ptr, argument_type_ptr));
*nested_argument_type_ptr, argument_type_ptr));
if (!res)
throw Exception(
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,

View File

@ -9,15 +9,19 @@
// TODO include this last because of a broken roaring header. See the comment inside.
#include <AggregateFunctions/AggregateFunctionGroupBitmapData.h>
namespace DB
{
struct Settings;
/// Counts bitmap operation on numbers.
template <typename T, typename Data>
class AggregateFunctionBitmap final : public IAggregateFunctionDataHelper<Data, AggregateFunctionBitmap<T, Data>>
{
public:
AggregateFunctionBitmap(const DataTypePtr & type) : IAggregateFunctionDataHelper<Data, AggregateFunctionBitmap<T, Data>>({type}, {}) { }
AggregateFunctionBitmap(const DataTypePtr & type)
: IAggregateFunctionDataHelper<Data, AggregateFunctionBitmap<T, Data>>({type}, {})
{
}
String getName() const override { return Data::name(); }
@ -46,6 +50,7 @@ public:
};
/// This aggregate function takes the states of AggregateFunctionBitmap as its argument.
template <typename T, typename Data, typename Policy>
class AggregateFunctionBitmapL2 final : public IAggregateFunctionDataHelper<Data, AggregateFunctionBitmapL2<T, Data, Policy>>
{
@ -61,6 +66,11 @@ public:
bool allocatesMemoryInArena() const override { return false; }
DataTypePtr getStateType() const override
{
return this->argument_types.at(0);
}
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
Data & data_lhs = this->data(place);
@ -105,6 +115,7 @@ public:
}
};
template <typename Data>
class BitmapAndPolicy
{

View File

@ -1,9 +1,9 @@
#pragma once
#include <algorithm>
#include <boost/noncopyable.hpp>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <boost/noncopyable.hpp>
#include <Common/HashTable/SmallTable.h>
#include <Common/PODArray.h>
@ -14,9 +14,9 @@
#include <roaring.hh>
#include <roaring64map.hh>
namespace DB
{
struct Settings;
enum BitmapKind
{
@ -24,6 +24,7 @@ enum BitmapKind
Bitmap = 1
};
/**
* For a small number of values - an array of fixed size "on the stack".
* For large, roaring bitmap is allocated.

View File

@ -4,7 +4,6 @@
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -73,7 +72,8 @@ inline AggregateFunctionPtr createAggregateFunctionGroupUniqArrayImpl(const std:
}
AggregateFunctionPtr createAggregateFunctionGroupUniqArray(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionGroupUniqArray(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertUnary(name, argument_types);

View File

@ -4,7 +4,7 @@
#include <AggregateFunctions/Helpers.h>
#include <Common/FieldVisitors.h>
#include "registerAggregateFunctions.h"
namespace DB
{

View File

@ -1,12 +1,10 @@
#include <AggregateFunctions/AggregateFunctionIf.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include "registerAggregateFunctions.h"
#include "AggregateFunctionNull.h"
namespace DB
{
struct Settings;
namespace ErrorCodes
{

View File

@ -11,7 +11,6 @@
#include "AggregateFunctionFactory.h"
#include "FactoryHelpers.h"
#include "Helpers.h"
#include "registerAggregateFunctions.h"
namespace DB
@ -25,13 +24,14 @@ namespace ErrorCodes
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
namespace
{
using FuncLinearRegression = AggregateFunctionMLMethod<LinearModelData, NameLinearRegression>;
using FuncLogisticRegression = AggregateFunctionMLMethod<LinearModelData, NameLogisticRegression>;
template <class Method>
AggregateFunctionPtr
createAggregateFunctionMLMethod(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
template <typename Method>
AggregateFunctionPtr createAggregateFunctionMLMethod(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
if (parameters.size() > 4)
throw Exception(

View File

@ -1,7 +1,6 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionMannWhitney.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
#include <AggregateFunctions/Helpers.h>
@ -17,7 +16,8 @@ struct Settings;
namespace
{
AggregateFunctionPtr createAggregateFunctionMannWhitneyUTest(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionMannWhitneyUTest(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertBinary(name, argument_types);

View File

@ -1,7 +1,6 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/HelpersMinMaxAny.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -11,12 +10,14 @@ struct Settings;
namespace
{
AggregateFunctionPtr createAggregateFunctionMax(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
AggregateFunctionPtr createAggregateFunctionMax(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionMaxData>(name, argument_types, parameters, settings));
}
AggregateFunctionPtr createAggregateFunctionArgMax(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
AggregateFunctionPtr createAggregateFunctionArgMax(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
{
return AggregateFunctionPtr(createAggregateFunctionArgMinMax<AggregateFunctionMaxData>(name, argument_types, parameters, settings));
}

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/AggregateFunctionMaxIntersections.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Helpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -1,12 +1,10 @@
#include <AggregateFunctions/AggregateFunctionMerge.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include "registerAggregateFunctions.h"
namespace DB
{
struct Settings;
namespace ErrorCodes
{

View File

@ -50,6 +50,11 @@ public:
return nested_func->getReturnType();
}
DataTypePtr getStateType() const override
{
return nested_func->getStateType();
}
void create(AggregateDataPtr __restrict place) const override
{
nested_func->create(place);

View File

@ -1,7 +1,6 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/HelpersMinMaxAny.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -11,12 +10,14 @@ struct Settings;
namespace
{
AggregateFunctionPtr createAggregateFunctionMin(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
AggregateFunctionPtr createAggregateFunctionMin(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
{
return AggregateFunctionPtr(createAggregateFunctionSingleValue<AggregateFunctionsSingleValue, AggregateFunctionMinData>(name, argument_types, parameters, settings));
}
AggregateFunctionPtr createAggregateFunctionArgMin(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
AggregateFunctionPtr createAggregateFunctionArgMin(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings * settings)
{
return AggregateFunctionPtr(createAggregateFunctionArgMinMax<AggregateFunctionMinData>(name, argument_types, parameters, settings));
}

View File

@ -700,13 +700,11 @@ template <typename Data>
class AggregateFunctionsSingleValue final : public IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>>
{
private:
DataTypePtr type;
SerializationPtr serialization;
public:
AggregateFunctionsSingleValue(const DataTypePtr & type_)
: IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>>({type_}, {})
, type(this->argument_types[0])
AggregateFunctionsSingleValue(const DataTypePtr & type)
: IAggregateFunctionDataHelper<Data, AggregateFunctionsSingleValue<Data>>({type}, {})
, serialization(type->getDefaultSerialization())
{
if (StringRef(Data::name()) == StringRef("min")
@ -722,7 +720,7 @@ public:
DataTypePtr getReturnType() const override
{
return type;
return this->argument_types.at(0);
}
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override

View File

@ -4,12 +4,10 @@
#include <AggregateFunctions/AggregateFunctionCount.h>
#include <AggregateFunctions/AggregateFunctionState.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include "registerAggregateFunctions.h"
namespace DB
{
struct Settings;
namespace ErrorCodes
{

View File

@ -1,12 +1,11 @@
#include <AggregateFunctions/AggregateFunctionOrFill.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include "registerAggregateFunctions.h"
namespace DB
{
struct Settings;
namespace
{

View File

@ -4,7 +4,7 @@
#include <AggregateFunctions/Helpers.h>
#include <Core/Field.h>
#include "registerAggregateFunctions.h"
namespace DB
{
@ -84,7 +84,8 @@ static constexpr bool supportBigInt()
}
template <template <typename, bool> class Function>
AggregateFunctionPtr createAggregateFunctionQuantile(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
AggregateFunctionPtr createAggregateFunctionQuantile(
const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
{
/// Second argument type check doesn't depend on the type of the first one.
Function<void, true>::assertSecondArg(argument_types);

View File

@ -1,13 +1,12 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionRankCorrelation.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
#include <AggregateFunctions/Helpers.h>
namespace ErrorCodes
{
extern const int NOT_IMPLEMENTED;
extern const int NOT_IMPLEMENTED;
}
namespace DB
@ -17,7 +16,8 @@ struct Settings;
namespace
{
AggregateFunctionPtr createAggregateFunctionRankCorrelation(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionRankCorrelation(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertBinary(name, argument_types);
assertNoParameters(name, parameters);

View File

@ -1,12 +1,10 @@
#include <AggregateFunctions/AggregateFunctionResample.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include "registerAggregateFunctions.h"
namespace DB
{
struct Settings;
namespace ErrorCodes
{

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/AggregateFunctionRetention.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -6,7 +6,6 @@
#include <DataTypes/DataTypeDateTime.h>
#include <ext/range.h>
#include "registerAggregateFunctions.h"
namespace DB
{
@ -23,8 +22,9 @@ namespace ErrorCodes
namespace
{
template <template <typename, typename> class AggregateFunction, template <typename> class Data>
AggregateFunctionPtr createAggregateFunctionSequenceBase(const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
template <template <typename, typename> typename AggregateFunction, template <typename> typename Data>
AggregateFunctionPtr createAggregateFunctionSequenceBase(
const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
{
if (params.size() != 1)
throw Exception{"Aggregate function " + name + " requires exactly one parameter.",

View File

@ -2,17 +2,10 @@
#include <AggregateFunctions/AggregateFunctionMerge.h>
#include <AggregateFunctions/AggregateFunctionCombinatorFactory.h>
#include <DataTypes/DataTypeAggregateFunction.h>
#include "registerAggregateFunctions.h"
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
}
namespace
{
@ -44,36 +37,4 @@ void registerAggregateFunctionCombinatorState(AggregateFunctionCombinatorFactory
factory.registerCombinator(std::make_shared<AggregateFunctionCombinatorState>());
}
DataTypePtr AggregateFunctionState::getReturnType() const
{
auto ptr = std::make_shared<DataTypeAggregateFunction>(nested_func, arguments, params);
/// Special case: it is -MergeState combinator.
/// We must return AggregateFunction(agg, ...) instead of AggregateFunction(aggMerge, ...)
if (typeid_cast<const AggregateFunctionMerge *>(ptr->getFunction().get()))
{
if (arguments.size() != 1)
throw Exception("Combinator -MergeState expects only one argument", ErrorCodes::BAD_ARGUMENTS);
if (!typeid_cast<const DataTypeAggregateFunction *>(arguments[0].get()))
throw Exception("Combinator -MergeState expects argument with AggregateFunction type", ErrorCodes::BAD_ARGUMENTS);
return arguments[0];
}
if (!arguments.empty())
{
DataTypePtr argument_type_ptr = arguments[0];
WhichDataType which(*argument_type_ptr);
if (which.idx == TypeIndex::AggregateFunction)
{
if (arguments.size() != 1)
throw Exception("Nested aggregation expects only one argument", ErrorCodes::BAD_ARGUMENTS);
return arguments[0];
}
}
return ptr;
}
}

View File

@ -33,7 +33,15 @@ public:
return nested_func->getName() + "State";
}
DataTypePtr getReturnType() const override;
DataTypePtr getReturnType() const override
{
return getStateType();
}
DataTypePtr getStateType() const override
{
return nested_func->getStateType();
}
void create(AggregateDataPtr __restrict place) const override
{

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/AggregateFunctionStatistics.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -17,8 +16,9 @@ namespace ErrorCodes
namespace
{
template <template <typename> class FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsUnary(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
template <template <typename> typename FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsUnary(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
assertUnary(name, argument_types);
@ -31,8 +31,9 @@ AggregateFunctionPtr createAggregateFunctionStatisticsUnary(const std::string &
return res;
}
template <template <typename, typename> class FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsBinary(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
template <template <typename, typename> typename FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsBinary(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
assertBinary(name, argument_types);

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/AggregateFunctionStatisticsSimple.h>
#include "registerAggregateFunctions.h"
namespace DB
@ -17,8 +16,9 @@ namespace ErrorCodes
namespace
{
template <template <typename> class FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsUnary(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
template <template <typename> typename FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsUnary(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
assertUnary(name, argument_types);
@ -36,8 +36,9 @@ AggregateFunctionPtr createAggregateFunctionStatisticsUnary(const std::string &
return res;
}
template <template <typename, typename> class FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsBinary(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
template <template <typename, typename> typename FunctionTemplate>
AggregateFunctionPtr createAggregateFunctionStatisticsBinary(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertNoParameters(name, parameters);
assertBinary(name, argument_types);

View File

@ -3,8 +3,6 @@
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Moments.h>
#include "registerAggregateFunctions.h"
namespace ErrorCodes
{
@ -57,7 +55,8 @@ struct StudentTTestData : public TTestMoments<Float64>
}
};
AggregateFunctionPtr createAggregateFunctionStudentTTest(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionStudentTTest(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertBinary(name, argument_types);
assertNoParameters(name, parameters);

View File

@ -2,7 +2,6 @@
#include <AggregateFunctions/AggregateFunctionSum.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -2,7 +2,7 @@
#include <AggregateFunctions/AggregateFunctionSumCount.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB
{

View File

@ -4,7 +4,6 @@
#include <AggregateFunctions/FactoryHelpers.h>
#include <Functions/FunctionHelpers.h>
#include <IO/WriteHelpers.h>
#include "registerAggregateFunctions.h"
namespace DB

View File

@ -11,12 +11,12 @@
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
@ -38,12 +38,6 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const
bool use_exact_hash_function = !isAllArgumentsContiguousInMemory(argument_types);
const WhichDataType t(argument_types[0]);
if (t.isAggregateFunction())
throw Exception(
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
if (argument_types.size() == 1)
{
const IDataType & argument_type = *argument_types[0];
@ -86,12 +80,6 @@ AggregateFunctionPtr createAggregateFunctionUniq(const std::string & name, const
throw Exception("Incorrect number of arguments for aggregate function " + name,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const WhichDataType t(argument_types[0]);
if (t.isAggregateFunction())
throw Exception(
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
/// We use exact hash function if the user wants it;
/// or if the arguments are not contiguous in memory, because only exact hash function have support for this case.
bool use_exact_hash_function = is_exact || !isAllArgumentsContiguousInMemory(argument_types);

View File

@ -13,13 +13,13 @@
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
namespace
@ -105,11 +105,6 @@ namespace
if (argument_types.empty())
throw Exception("Incorrect number of arguments for aggregate function " + name, ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const WhichDataType t(argument_types[0]);
if (t.isAggregateFunction())
throw Exception(
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
switch (precision)
{

View File

@ -10,13 +10,13 @@
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
}
@ -47,11 +47,6 @@ AggregateFunctionPtr createAggregateFunctionUniqUpTo(const std::string & name, c
if (argument_types.empty())
throw Exception("Incorrect number of arguments for aggregate function " + name,
ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
const WhichDataType t(argument_types[0]);
if (t.isAggregateFunction())
throw Exception(
"Illegal type " + argument_types[0]->getName() + " of argument for aggregate function " + name,
ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
bool use_exact_hash_function = !isAllArgumentsContiguousInMemory(argument_types);

View File

@ -3,8 +3,6 @@
#include <AggregateFunctions/FactoryHelpers.h>
#include <AggregateFunctions/Moments.h>
#include "registerAggregateFunctions.h"
namespace ErrorCodes
{
@ -54,7 +52,8 @@ struct WelchTTestData : public TTestMoments<Float64>
}
};
AggregateFunctionPtr createAggregateFunctionWelchTTest(const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
AggregateFunctionPtr createAggregateFunctionWelchTTest(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertBinary(name, argument_types);
assertNoParameters(name, parameters);

View File

@ -2,6 +2,7 @@ include("${ClickHouse_SOURCE_DIR}/cmake/dbms_glob_sources.cmake")
add_headers_and_sources(clickhouse_aggregate_functions .)
list(REMOVE_ITEM clickhouse_aggregate_functions_sources
IAggregateFunction.cpp
AggregateFunctionFactory.cpp
AggregateFunctionCombinatorFactory.cpp
AggregateFunctionCount.cpp

View File

@ -0,0 +1,13 @@
#include <AggregateFunctions/IAggregateFunction.h>
#include <DataTypes/DataTypeAggregateFunction.h>
namespace DB
{
DataTypePtr IAggregateFunction::getStateType() const
{
return std::make_shared<DataTypeAggregateFunction>(shared_from_this(), argument_types, parameters);
}
}

View File

@ -38,7 +38,7 @@ using AggregateDataPtr = char *;
using ConstAggregateDataPtr = const char *;
class IAggregateFunction;
using AggregateFunctionPtr = std::shared_ptr<IAggregateFunction>;
using AggregateFunctionPtr = std::shared_ptr<const IAggregateFunction>;
struct AggregateFunctionProperties;
/** Aggregate functions interface.
@ -49,7 +49,7 @@ struct AggregateFunctionProperties;
* (which can be created in some memory pool),
* and IAggregateFunction is the external interface for manipulating them.
*/
class IAggregateFunction
class IAggregateFunction : public std::enable_shared_from_this<IAggregateFunction>
{
public:
IAggregateFunction(const DataTypes & argument_types_, const Array & parameters_)
@ -61,6 +61,9 @@ public:
/// Get the result type.
virtual DataTypePtr getReturnType() const = 0;
/// Get the data type of internal state. By default it is AggregateFunction(name(params), argument_types...).
virtual DataTypePtr getStateType() const;
/// Get type which will be used for prediction result in case if function is an ML method.
virtual DataTypePtr getReturnTypeToPredict() const
{
@ -236,9 +239,7 @@ public:
// aggregate functions implement IWindowFunction interface and so on. This
// would be more logically correct, but more complex. We only have a handful
// of true window functions, so this hack-ish interface suffices.
virtual IWindowFunction * asWindowFunction() { return nullptr; }
virtual const IWindowFunction * asWindowFunction() const
{ return const_cast<IAggregateFunction *>(this)->asWindowFunction(); }
virtual bool isOnlyWindowFunction() const { return false; }
protected:
DataTypes argument_types;

View File

@ -61,6 +61,7 @@ SRCS(
AggregateFunctionUniqUpTo.cpp
AggregateFunctionWelchTTest.cpp
AggregateFunctionWindowFunnel.cpp
IAggregateFunction.cpp
UniqCombinedBiasData.cpp
UniqVariadicHash.cpp
parseAggregateFunctionParameters.cpp

View File

@ -111,6 +111,7 @@ list (APPEND dbms_sources Functions/IFunction.cpp Functions/FunctionFactory.cpp
list (APPEND dbms_headers Functions/IFunction.h Functions/FunctionFactory.h Functions/FunctionHelpers.h Functions/extractTimeZoneFromFunctionArguments.h Functions/replicate.h Functions/FunctionsLogical.h)
list (APPEND dbms_sources
AggregateFunctions/IAggregateFunction.cpp
AggregateFunctions/AggregateFunctionFactory.cpp
AggregateFunctions/AggregateFunctionCombinatorFactory.cpp
AggregateFunctions/AggregateFunctionState.cpp

View File

@ -167,7 +167,7 @@ MutableColumnPtr ColumnAggregateFunction::predictValues(const ColumnsWithTypeAnd
MutableColumnPtr res = func->getReturnTypeToPredict()->createColumn();
res->reserve(data.size());
auto * machine_learning_function = func.get();
const auto * machine_learning_function = func.get();
if (machine_learning_function)
{
if (data.size() == 1)
@ -485,7 +485,7 @@ Arena & ColumnAggregateFunction::createOrGetArena()
}
static void pushBackAndCreateState(ColumnAggregateFunction::Container & data, Arena & arena, IAggregateFunction * func)
static void pushBackAndCreateState(ColumnAggregateFunction::Container & data, Arena & arena, const IAggregateFunction * func)
{
data.push_back(arena.alignedAlloc(func->sizeOfData(), func->alignOfData()));
try

View File

@ -11,6 +11,7 @@
#include <algorithm>
#include <functional>
#include <filesystem>
#include <boost/algorithm/string.hpp>
#include <Poco/DOM/Text.h>
#include <Poco/DOM/Attr.h>
#include <Poco/DOM/Comment.h>
@ -36,6 +37,7 @@ namespace DB
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int CANNOT_LOAD_CONFIG;
}
/// For cutting preprocessed path to this base
@ -437,6 +439,8 @@ ConfigProcessor::Files ConfigProcessor::getConfigMergeFiles(const std::string &
std::string extension = path.extension();
std::string base_name = path.stem();
boost::algorithm::to_lower(extension);
// Skip non-config and temporary files
if (fs::is_regular_file(path)
&& (extension == ".xml" || extension == ".conf" || extension == ".yaml" || extension == ".yml")
@ -462,13 +466,21 @@ XMLDocumentPtr ConfigProcessor::processConfig(
if (fs::exists(path))
{
fs::path p(path);
if (p.extension() == ".xml")
std::string extension = p.extension();
boost::algorithm::to_lower(extension);
if (extension == ".yaml" || extension == ".yml")
{
config = YAMLParser::parse(path);
}
else if (extension == ".xml" || extension == ".conf" || extension.empty())
{
config = dom_parser.parse(path);
}
else if (p.extension() == ".yaml" || p.extension() == ".yml")
else
{
config = YAMLParser::parse(path);
throw Exception(ErrorCodes::CANNOT_LOAD_CONFIG, "Unknown format of '{}' config", path);
}
}
else
@ -507,7 +519,10 @@ XMLDocumentPtr ConfigProcessor::processConfig(
XMLDocumentPtr with;
fs::path p(merge_file);
if (p.extension() == ".yaml" || p.extension() == ".yml")
std::string extension = p.extension();
boost::algorithm::to_lower(extension);
if (extension == ".yaml" || extension == ".yml")
{
with = YAMLParser::parse(merge_file);
}

View File

@ -269,7 +269,6 @@ public:
void operator() (const AggregateFunctionStateData & x) const;
};
template <typename T> constexpr bool isDecimalField() { return false; }
template <> constexpr bool isDecimalField<DecimalField<Decimal32>>() { return true; }
template <> constexpr bool isDecimalField<DecimalField<Decimal64>>() { return true; }

View File

@ -5,7 +5,7 @@ LIBRARY()
ADDINCL(
contrib/libs/lz4
contrib/libs/zstd
contrib/libs/zstd/include
)
PEERDIR(

View File

@ -4,7 +4,7 @@ LIBRARY()
ADDINCL(
contrib/libs/lz4
contrib/libs/zstd
contrib/libs/zstd/include
)
PEERDIR(

View File

@ -403,6 +403,7 @@ class IColumn;
M(Bool, optimize_if_chain_to_multiif, false, "Replace if(cond1, then1, if(cond2, ...)) chains to multiIf. Currently it's not beneficial for numeric types.", 0) \
M(Bool, optimize_if_transform_strings_to_enum, false, "Replaces string-type arguments in If and Transform to enum. Disabled by default cause it could make inconsistent change in distributed query that would lead to its fail.", 0) \
M(Bool, optimize_monotonous_functions_in_order_by, true, "Replace monotonous function with its argument in ORDER BY", 0) \
M(Bool, optimize_functions_to_subcolumns, false, "Transform functions to subcolumns, if possible, to reduce amount of read data. E.g. 'length(arr)' -> 'arr.size0', 'col IS NULL' -> 'col.null' ", 0) \
M(Bool, normalize_function_names, true, "Normalize function names to their canonical names", 0) \
M(Bool, allow_experimental_alter_materialized_view_structure, false, "Allow atomic alter on Materialized views. Work in progress.", 0) \
M(Bool, enable_early_constant_folding, true, "Enable query optimization where we analyze function and subqueries results and rewrite query if there're constants there", 0) \

View File

@ -100,7 +100,7 @@ bool DataTypeAggregateFunction::equals(const IDataType & rhs) const
SerializationPtr DataTypeAggregateFunction::doGetDefaultSerialization() const
{
return std::make_shared<SerializationAggregateFunction>(function);
return std::make_shared<SerializationAggregateFunction>(function, getName());
}

View File

@ -20,19 +20,16 @@ namespace DB
void SerializationAggregateFunction::serializeBinary(const Field & field, WriteBuffer & ostr) const
{
const String & s = get<const String &>(field);
writeVarUInt(s.size(), ostr);
writeString(s, ostr);
const AggregateFunctionStateData & state = get<const AggregateFunctionStateData &>(field);
writeBinary(state.data, ostr);
}
void SerializationAggregateFunction::deserializeBinary(Field & field, ReadBuffer & istr) const
{
UInt64 size;
readVarUInt(size, istr);
field = String();
String & s = get<String &>(field);
s.resize(size);
istr.readStrict(s.data(), size);
field = AggregateFunctionStateData();
AggregateFunctionStateData & s = get<AggregateFunctionStateData &>(field);
readBinary(s.data, istr);
s.name = type_name;
}
void SerializationAggregateFunction::serializeBinary(const IColumn & column, size_t row_num, WriteBuffer & ostr) const

View File

@ -12,11 +12,13 @@ class SerializationAggregateFunction final : public ISerialization
{
private:
AggregateFunctionPtr function;
String type_name;
public:
static constexpr bool is_parametric = true;
SerializationAggregateFunction(const AggregateFunctionPtr & function_): function(function_) {}
SerializationAggregateFunction(const AggregateFunctionPtr & function_, String type_name_)
: function(function_), type_name(std::move(type_name_)) {}
/// NOTE These two functions for serializing single values are incompatible with the functions below.
void serializeBinary(const Field & field, WriteBuffer & ostr) const override;

View File

@ -55,7 +55,7 @@ public:
cells.resize_fill(cells_size);
size_overlap_mask = cells_size - 1;
setup(dictionary_structure);
createAttributes(dictionary_structure);
}
bool returnsFetchedColumnsInOrderOfRequestedKeys() const override { return true; }
@ -226,23 +226,17 @@ private:
auto & attribute = attributes[attribute_index];
const auto & default_value_provider = fetch_request.defaultValueProviderAtIndex(attribute_index);
size_t fetched_keys_size = fetched_keys.size();
auto & fetched_column = *result.fetched_columns[attribute_index];
fetched_column.reserve(fetched_keys_size);
fetched_column.reserve(fetched_columns_index);
if (unlikely(attribute.is_complex_type))
if (unlikely(attribute.is_nullable))
{
auto & container = std::get<std::vector<Field>>(attribute.attribute_container);
for (size_t fetched_key_index = 0; fetched_key_index < fetched_columns_index; ++fetched_key_index)
{
auto fetched_key = fetched_keys[fetched_key_index];
if (unlikely(fetched_key.is_default))
fetched_column.insert(default_value_provider.getDefaultValue(fetched_key_index));
else
fetched_column.insert(container[fetched_key.element_index]);
}
getItemsForFetchedKeys<Field>(
attribute,
fetched_columns_index,
fetched_keys,
[&](Field & value) { fetched_column.insert(value); },
default_value_provider);
}
else
{
@ -250,46 +244,40 @@ private:
{
using Type = std::decay_t<decltype(dictionary_attribute_type)>;
using AttributeType = typename Type::AttributeType;
using ColumnProvider = DictionaryAttributeColumnProvider<AttributeType>;
using ColumnType = typename ColumnProvider::ColumnType;
using ValueType = DictionaryValueType<AttributeType>;
using ColumnType =
std::conditional_t<std::is_same_v<AttributeType, String>, ColumnString,
std::conditional_t<IsDecimalNumber<AttributeType>, ColumnDecimal<ValueType>,
ColumnVector<AttributeType>>>;
auto & container = std::get<PaddedPODArray<ValueType>>(attribute.attribute_container);
ColumnType & column_typed = static_cast<ColumnType &>(fetched_column);
if constexpr (std::is_same_v<ColumnType, ColumnString>)
if constexpr (std::is_same_v<ValueType, Array>)
{
for (size_t fetched_key_index = 0; fetched_key_index < fetched_columns_index; ++fetched_key_index)
{
auto fetched_key = fetched_keys[fetched_key_index];
if (unlikely(fetched_key.is_default))
column_typed.insert(default_value_provider.getDefaultValue(fetched_key_index));
else
{
auto item = container[fetched_key.element_index];
column_typed.insertData(item.data, item.size);
}
}
getItemsForFetchedKeys<ValueType>(
attribute,
fetched_columns_index,
fetched_keys,
[&](Array & value) { fetched_column.insert(value); },
default_value_provider);
}
else if constexpr (std::is_same_v<ValueType, StringRef>)
{
getItemsForFetchedKeys<ValueType>(
attribute,
fetched_columns_index,
fetched_keys,
[&](StringRef value) { fetched_column.insertData(value.data, value.size); },
default_value_provider);
}
else
{
auto & data = column_typed.getData();
for (size_t fetched_key_index = 0; fetched_key_index < fetched_columns_index; ++fetched_key_index)
{
auto fetched_key = fetched_keys[fetched_key_index];
if (unlikely(fetched_key.is_default))
column_typed.insert(default_value_provider.getDefaultValue(fetched_key_index));
else
{
auto item = container[fetched_key.element_index];
data.push_back(item);
}
}
getItemsForFetchedKeys<ValueType>(
attribute,
fetched_columns_index,
fetched_keys,
[&](auto value) { data.push_back(value); },
default_value_provider);
}
};
@ -339,7 +327,9 @@ private:
column->get(key_index, column_value);
if constexpr (std::is_same_v<ElementType, Field>)
{
container.back() = column_value;
}
else if constexpr (std::is_same_v<ElementType, StringRef>)
{
const String & string_value = column_value.get<String>();
@ -348,7 +338,9 @@ private:
container.back() = inserted_value;
}
else
{
container.back() = column_value.get<NearestFieldType<ElementType>>();
}
});
}
@ -382,7 +374,9 @@ private:
column->get(key_index, column_value);
if constexpr (std::is_same_v<ElementType, Field>)
{
container[index_to_use] = column_value;
}
else if constexpr (std::is_same_v<ElementType, StringRef>)
{
const String & string_value = column_value.get<String>();
@ -398,7 +392,9 @@ private:
container[index_to_use] = inserted_value;
}
else
{
container[index_to_use] = column_value.get<NearestFieldType<ElementType>>();
}
});
}
}
@ -504,9 +500,9 @@ private:
auto & attribute = attributes[attribute_index];
auto & attribute_type = attribute.type;
if (unlikely(attribute.is_complex_type))
if (unlikely(attribute.is_nullable))
{
auto & container = std::get<std::vector<Field>>(attribute.attribute_container);
auto & container = std::get<ContainerType<Field>>(attribute.attribute_container);
std::forward<GetContainerFunc>(func)(container);
}
else
@ -517,7 +513,7 @@ private:
using AttributeType = typename Type::AttributeType;
using ValueType = DictionaryValueType<AttributeType>;
auto & container = std::get<PaddedPODArray<ValueType>>(attribute.attribute_container);
auto & container = std::get<ContainerType<ValueType>>(attribute.attribute_container);
std::forward<GetContainerFunc>(func)(container);
};
@ -541,7 +537,82 @@ private:
return updated_value;
}
void setup(const DictionaryStructure & dictionary_structure)
template<typename ValueType>
using ContainerType = std::conditional_t<
std::is_same_v<ValueType, Field> || std::is_same_v<ValueType, Array>,
std::vector<ValueType>,
PaddedPODArray<ValueType>>;
struct Attribute
{
AttributeUnderlyingType type;
bool is_nullable;
std::variant<
ContainerType<UInt8>,
ContainerType<UInt16>,
ContainerType<UInt32>,
ContainerType<UInt64>,
ContainerType<UInt128>,
ContainerType<UInt256>,
ContainerType<Int8>,
ContainerType<Int16>,
ContainerType<Int32>,
ContainerType<Int64>,
ContainerType<Int128>,
ContainerType<Int256>,
ContainerType<Decimal32>,
ContainerType<Decimal64>,
ContainerType<Decimal128>,
ContainerType<Decimal256>,
ContainerType<Float32>,
ContainerType<Float64>,
ContainerType<UUID>,
ContainerType<StringRef>,
ContainerType<Array>,
ContainerType<Field>> attribute_container;
};
template <typename ValueType, typename ValueSetter>
void getItemsForFetchedKeys(
Attribute & attribute,
size_t fetched_keys_size,
PaddedPODArray<FetchedKey> & fetched_keys,
ValueSetter && value_setter,
const DefaultValueProvider & default_value_provider)
{
auto & container = std::get<ContainerType<ValueType>>(attribute.attribute_container);
for (size_t fetched_key_index = 0; fetched_key_index < fetched_keys_size; ++fetched_key_index)
{
auto fetched_key = fetched_keys[fetched_key_index];
if (unlikely(fetched_key.is_default))
{
auto default_value = default_value_provider.getDefaultValue(fetched_key_index);
if constexpr (std::is_same_v<ValueType, Field>)
{
value_setter(default_value);
}
else if constexpr (std::is_same_v<ValueType, StringRef>)
{
auto & value = default_value.get<String>();
value_setter(value);
}
else
{
value_setter(default_value.get<ValueType>());
}
}
else
{
value_setter(container[fetched_key.element_index]);
}
}
}
void createAttributes(const DictionaryStructure & dictionary_structure)
{
/// For each dictionary attribute create storage attribute
/// For simple attributes create PODArray, for complex vector of Fields
@ -561,12 +632,12 @@ private:
attributes.emplace_back();
auto & last_attribute = attributes.back();
last_attribute.type = attribute_type;
last_attribute.is_complex_type = dictionary_attribute.is_nullable || dictionary_attribute.is_array;
last_attribute.is_nullable = dictionary_attribute.is_nullable;
if (dictionary_attribute.is_nullable)
last_attribute.attribute_container = std::vector<Field>();
last_attribute.attribute_container = ContainerType<Field>();
else
last_attribute.attribute_container = PaddedPODArray<ValueType>();
last_attribute.attribute_container = ContainerType<ValueType>();
};
callOnDictionaryAttributeType(attribute_type, type_call);
@ -583,35 +654,6 @@ private:
time_t deadline;
};
struct Attribute
{
AttributeUnderlyingType type;
bool is_complex_type;
std::variant<
PaddedPODArray<UInt8>,
PaddedPODArray<UInt16>,
PaddedPODArray<UInt32>,
PaddedPODArray<UInt64>,
PaddedPODArray<UInt128>,
PaddedPODArray<UInt256>,
PaddedPODArray<Int8>,
PaddedPODArray<Int16>,
PaddedPODArray<Int32>,
PaddedPODArray<Int64>,
PaddedPODArray<Int128>,
PaddedPODArray<Int256>,
PaddedPODArray<Decimal32>,
PaddedPODArray<Decimal64>,
PaddedPODArray<Decimal128>,
PaddedPODArray<Decimal256>,
PaddedPODArray<Float32>,
PaddedPODArray<Float64>,
PaddedPODArray<UUID>,
PaddedPODArray<StringRef>,
std::vector<Field>> attribute_container;
};
CacheDictionaryStorageConfiguration configuration;
pcg64 rnd_engine;

View File

@ -93,7 +93,7 @@ std::string ClickHouseDictionarySource::getUpdateFieldAndDate()
{
if (update_time != std::chrono::system_clock::from_time_t(0))
{
time_t hr_time = std::chrono::system_clock::to_time_t(update_time) - 1;
time_t hr_time = std::chrono::system_clock::to_time_t(update_time) - configuration.update_lag;
std::string str_time = DateLUT::instance().timeToString(hr_time);
update_time = std::chrono::system_clock::now();
return query_builder.composeUpdateQuery(configuration.update_field, str_time);
@ -222,7 +222,8 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
std::string host = config.getString(settings_config_prefix + ".host", "localhost");
UInt16 port = static_cast<UInt16>(config.getUInt(settings_config_prefix + ".port", default_port));
ClickHouseDictionarySource::Configuration configuration {
ClickHouseDictionarySource::Configuration configuration
{
.secure = config.getBool(settings_config_prefix + ".secure", false),
.host = host,
.port = port,
@ -231,8 +232,9 @@ void registerDictionarySourceClickHouse(DictionarySourceFactory & factory)
.db = config.getString(settings_config_prefix + ".db", default_database),
.table = config.getString(settings_config_prefix + ".table"),
.where = config.getString(settings_config_prefix + ".where", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.invalidate_query = config.getString(settings_config_prefix + ".invalidate_query", ""),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
.is_local = isLocalAddress({host, port}, default_port)
};

View File

@ -28,8 +28,9 @@ public:
const std::string db;
const std::string table;
const std::string where;
const std::string update_field;
const std::string invalidate_query;
const std::string update_field;
const UInt64 update_lag;
const bool is_local;
};

View File

@ -6,8 +6,11 @@
#include <Columns/ColumnDecimal.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnArray.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeNullable.h>
#include <Core/Block.h>
#include <Dictionaries/IDictionary.h>
#include <Dictionaries/DictionaryStructure.h>
@ -231,14 +234,27 @@ class DictionaryAttributeColumnProvider
{
public:
using ColumnType =
std::conditional_t<std::is_same_v<DictionaryAttributeType, String>, ColumnString,
std::conditional_t<IsDecimalNumber<DictionaryAttributeType>, ColumnDecimal<DictionaryAttributeType>,
ColumnVector<DictionaryAttributeType>>>;
std::conditional_t<std::is_same_v<DictionaryAttributeType, Array>, ColumnArray,
std::conditional_t<std::is_same_v<DictionaryAttributeType, String>, ColumnString,
std::conditional_t<IsDecimalNumber<DictionaryAttributeType>, ColumnDecimal<DictionaryAttributeType>,
ColumnVector<DictionaryAttributeType>>>>;
using ColumnPtr = typename ColumnType::MutablePtr;
static ColumnPtr getColumn(const DictionaryAttribute & dictionary_attribute, size_t size)
{
if constexpr (std::is_same_v<DictionaryAttributeType, Array>)
{
if (const auto * array_type = typeid_cast<const DataTypeArray *>(dictionary_attribute.type.get()))
{
auto nested_column = array_type->getNestedType()->createColumn();
return ColumnArray::create(std::move(nested_column));
}
else
{
throw Exception(ErrorCodes::TYPE_MISMATCH, "Unsupported attribute type.");
}
}
if constexpr (std::is_same_v<DictionaryAttributeType, String>)
{
return ColumnType::create();
@ -249,7 +265,8 @@ public:
}
else if constexpr (IsDecimalNumber<DictionaryAttributeType>)
{
auto scale = getDecimalScale(*dictionary_attribute.nested_type);
auto nested_type = removeNullable(dictionary_attribute.type);
auto scale = getDecimalScale(*nested_type);
return ColumnType::create(size, scale);
}
else if constexpr (is_arithmetic_v<DictionaryAttributeType>)
@ -280,18 +297,18 @@ public:
: default_value(std::move(attribute_default_value))
{
if (default_values_column_ == nullptr)
use_default_value_from_column = false;
use_attribute_default_value = true;
else
{
if (const auto * const default_col = checkAndGetColumn<DefaultColumnType>(*default_values_column_))
{
default_values_column = default_col;
use_default_value_from_column = true;
use_attribute_default_value = false;
}
else if (const auto * const default_col_const = checkAndGetColumnConst<DefaultColumnType>(default_values_column_.get()))
{
default_value = default_col_const->template getValue<DictionaryAttributeType>();
use_default_value_from_column = false;
use_attribute_default_value = true;
}
else
throw Exception(ErrorCodes::TYPE_MISMATCH, "Type of default column is not the same as dictionary attribute type.");
@ -300,12 +317,17 @@ public:
DefaultValueType operator[](size_t row)
{
if (!use_default_value_from_column)
if (use_attribute_default_value)
return static_cast<DefaultValueType>(default_value);
assert(default_values_column != nullptr);
if constexpr (std::is_same_v<DefaultColumnType, ColumnString>)
if constexpr (std::is_same_v<DefaultColumnType, ColumnArray>)
{
Field field = (*default_values_column)[row];
return field.get<Array>();
}
else if constexpr (std::is_same_v<DefaultColumnType, ColumnString>)
return default_values_column->getDataAt(row);
else
return default_values_column->getData()[row];
@ -313,7 +335,7 @@ public:
private:
DictionaryAttributeType default_value;
const DefaultColumnType * default_values_column = nullptr;
bool use_default_value_from_column = false;
bool use_attribute_default_value = false;
};
template <DictionaryKeyType key_type>

View File

@ -25,9 +25,10 @@ namespace
Block block;
if (dict_struct.id)
{
block.insert(ColumnWithTypeAndName{ColumnUInt64::create(1, 0), std::make_shared<DataTypeUInt64>(), dict_struct.id->name});
if (dict_struct.key)
}
else if (dict_struct.key)
{
for (const auto & attribute : *dict_struct.key)
{

View File

@ -79,9 +79,7 @@ AttributeUnderlyingType getAttributeUnderlyingType(const DataTypePtr & type)
case TypeIndex::String: return AttributeUnderlyingType::String;
// Temporary hack to allow arrays in keys, since they are never retrieved for polygon dictionaries.
// TODO: This should be fixed by fully supporting arrays in dictionaries.
case TypeIndex::Array: return AttributeUnderlyingType::String;
case TypeIndex::Array: return AttributeUnderlyingType::Array;
default: break;
}
@ -125,7 +123,7 @@ DictionaryStructure::DictionaryStructure(const Poco::Util::AbstractConfiguration
id.emplace(config, structure_prefix + ".id");
else if (has_key)
{
key.emplace(getAttributes(config, structure_prefix + ".key", true));
key.emplace(getAttributes(config, structure_prefix + ".key", /*complex_key_attributes =*/ true));
if (key->empty())
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Empty 'key' supplied");
}
@ -173,7 +171,7 @@ DictionaryStructure::DictionaryStructure(const Poco::Util::AbstractConfiguration
has_expressions = true;
}
attributes = getAttributes(config, structure_prefix, false);
attributes = getAttributes(config, structure_prefix, /*complex_key_attributes =*/ false);
for (size_t i = 0; i < attributes.size(); ++i)
{
@ -375,17 +373,10 @@ std::vector<DictionaryAttribute> DictionaryStructure::getAttributes(
const auto type_string = config.getString(prefix + "type");
const auto initial_type = DataTypeFactory::instance().get(type_string);
auto type = initial_type;
bool is_array = false;
bool is_nullable = false;
bool is_nullable = initial_type->isNullable();
if (type->isNullable())
{
is_nullable = true;
type = removeNullable(type);
}
const auto underlying_type = getAttributeUnderlyingType(type);
auto non_nullable_type = removeNullable(initial_type);
const auto underlying_type = getAttributeUnderlyingType(non_nullable_type);
const auto expression = config.getString(prefix + "expression", "");
if (!expression.empty())
@ -394,26 +385,27 @@ std::vector<DictionaryAttribute> DictionaryStructure::getAttributes(
Field null_value;
if (allow_null_values)
{
/// TODO: Fix serialization for nullable type.
const auto null_value_string = config.getString(prefix + "null_value");
try
{
if (null_value_string.empty())
{
null_value = type->getDefault();
null_value = non_nullable_type->getDefault();
}
else
{
ReadBufferFromString null_value_buffer{null_value_string};
auto column_with_null_value = type->createColumn();
type->getDefaultSerialization()->deserializeTextEscaped(*column_with_null_value, null_value_buffer, format_settings);
auto column_with_null_value = non_nullable_type->createColumn();
non_nullable_type->getDefaultSerialization()->deserializeTextEscaped(*column_with_null_value, null_value_buffer, format_settings);
null_value = (*column_with_null_value)[0];
}
}
catch (Exception & e)
{
String dictionary_name = config.getString(".dictionary.name", "");
e.addMessage("While parsing null_value for attribute with name " + name
+ " in dictionary " + dictionary_name);
e.addMessage(fmt::format("While parsing null_value for attribute with name {} in dictionary {}", name, dictionary_name));
throw;
}
}
@ -436,15 +428,12 @@ std::vector<DictionaryAttribute> DictionaryStructure::getAttributes(
name,
underlying_type,
initial_type,
initial_type->getDefaultSerialization(),
type,
expression,
null_value,
hierarchical,
injective,
is_object_id,
is_nullable,
is_array});
is_nullable});
}
return res_attributes;

View File

@ -39,6 +39,7 @@
M(Decimal256) \
M(UUID) \
M(String) \
M(Array) \
namespace DB
@ -74,15 +75,12 @@ struct DictionaryAttribute final
const std::string name;
const AttributeUnderlyingType underlying_type;
const DataTypePtr type;
const SerializationPtr serialization;
const DataTypePtr nested_type;
const std::string expression;
const Field null_value;
const bool hierarchical;
const bool injective;
const bool is_object_id;
const bool is_nullable;
const bool is_array;
};
template <typename Type>
@ -92,7 +90,7 @@ struct DictionaryAttributeType
};
template <typename F>
void callOnDictionaryAttributeType(AttributeUnderlyingType type, F&& func)
void callOnDictionaryAttributeType(AttributeUnderlyingType type, F && func)
{
switch (type)
{

View File

@ -60,16 +60,12 @@ namespace
ExecutableDictionarySource::ExecutableDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const Configuration & configuration_,
Block & sample_block_,
ContextConstPtr context_)
: log(&Poco::Logger::get("ExecutableDictionarySource"))
, dict_struct{dict_struct_}
, implicit_key{config.getBool(config_prefix + ".implicit_key", false)}
, command{config.getString(config_prefix + ".command")}
, update_field{config.getString(config_prefix + ".update_field", "")}
, format{config.getString(config_prefix + ".format")}
, dict_struct(dict_struct_)
, configuration(configuration_)
, sample_block{sample_block_}
, context(context_)
{
@ -77,7 +73,7 @@ ExecutableDictionarySource::ExecutableDictionarySource(
/// these columns will not be returned from source
/// Implicit key means that the source script will return only values,
/// and the correspondence to the requested keys is determined implicitly - by the order of rows in the result.
if (implicit_key)
if (configuration.implicit_key)
{
auto keys_names = dict_struct.getKeysNames();
@ -91,43 +87,40 @@ ExecutableDictionarySource::ExecutableDictionarySource(
ExecutableDictionarySource::ExecutableDictionarySource(const ExecutableDictionarySource & other)
: log(&Poco::Logger::get("ExecutableDictionarySource"))
, update_time{other.update_time}
, dict_struct{other.dict_struct}
, implicit_key{other.implicit_key}
, command{other.command}
, update_field{other.update_field}
, format{other.format}
, sample_block{other.sample_block}
, update_time(other.update_time)
, dict_struct(other.dict_struct)
, configuration(other.configuration)
, sample_block(other.sample_block)
, context(Context::createCopy(other.context))
{
}
BlockInputStreamPtr ExecutableDictionarySource::loadAll()
{
if (implicit_key)
if (configuration.implicit_key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadAll method");
LOG_TRACE(log, "loadAll {}", toString());
auto process = ShellCommand::execute(command);
auto input_stream = context->getInputFormat(format, process->out, sample_block, max_block_size);
auto process = ShellCommand::execute(configuration.command);
auto input_stream = context->getInputFormat(configuration.format, process->out, sample_block, max_block_size);
return std::make_shared<ShellCommandOwningBlockInputStream>(log, input_stream, std::move(process));
}
BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll()
{
if (implicit_key)
if (configuration.implicit_key)
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutableDictionarySource with implicit_key does not support loadUpdatedAll method");
time_t new_update_time = time(nullptr);
SCOPE_EXIT(update_time = new_update_time);
std::string command_with_update_field = command;
std::string command_with_update_field = configuration.command;
if (update_time)
command_with_update_field += " " + update_field + " " + DB::toString(LocalDateTime(update_time - 1));
command_with_update_field += " " + configuration.update_field + " " + DB::toString(LocalDateTime(update_time - configuration.update_lag));
LOG_TRACE(log, "loadUpdatedAll {}", command_with_update_field);
auto process = ShellCommand::execute(command_with_update_field);
auto input_stream = context->getInputFormat(format, process->out, sample_block, max_block_size);
auto input_stream = context->getInputFormat(configuration.format, process->out, sample_block, max_block_size);
return std::make_shared<ShellCommandOwningBlockInputStream>(log, input_stream, std::move(process));
}
@ -220,15 +213,15 @@ BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_col
BlockInputStreamPtr ExecutableDictionarySource::getStreamForBlock(const Block & block)
{
auto stream = std::make_unique<BlockInputStreamWithBackgroundThread>(
context, format, sample_block, command, log,
context, configuration.format, sample_block, configuration.command, log,
[block, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context->getOutputStream(format, out, block.cloneEmpty());
auto output_stream = context->getOutputStream(configuration.format, out, block.cloneEmpty());
formatBlock(output_stream, block);
out.close();
});
if (implicit_key)
if (configuration.implicit_key)
return std::make_shared<BlockInputStreamWithAdditionalColumns>(block, std::move(stream));
else
return std::shared_ptr<BlockInputStreamWithBackgroundThread>(stream.release());
@ -246,7 +239,7 @@ bool ExecutableDictionarySource::supportsSelectiveLoad() const
bool ExecutableDictionarySource::hasUpdateField() const
{
return !update_field.empty();
return !configuration.update_field.empty();
}
DictionarySourcePtr ExecutableDictionarySource::clone() const
@ -256,7 +249,7 @@ DictionarySourcePtr ExecutableDictionarySource::clone() const
std::string ExecutableDictionarySource::toString() const
{
return "Executable: " + command;
return "Executable: " + configuration.command;
}
void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
@ -280,10 +273,20 @@ void registerDictionarySourceExecutable(DictionarySourceFactory & factory)
auto context_local_copy = copyContextAndApplySettings(config_prefix, context, config);
return std::make_unique<ExecutableDictionarySource>(
dict_struct, config, config_prefix + ".executable",
sample_block, context_local_copy);
std::string settings_config_prefix = config_prefix + ".executable";
ExecutableDictionarySource::Configuration configuration
{
.implicit_key = config.getBool(settings_config_prefix + ".implicit_key", false),
.command = config.getString(settings_config_prefix + ".command"),
.format = config.getString(settings_config_prefix + ".format"),
.update_field = config.getString(settings_config_prefix + ".update_field", ""),
.update_lag = config.getUInt64(settings_config_prefix + ".update_lag", 1),
};
return std::make_unique<ExecutableDictionarySource>(dict_struct, configuration, sample_block, context_local_copy);
};
factory.registerSource("executable", create_table_source);
}

View File

@ -15,10 +15,19 @@ namespace DB
class ExecutableDictionarySource final : public IDictionarySource
{
public:
struct Configuration
{
bool implicit_key;
const std::string command;
const std::string format;
const std::string update_field;
const UInt64 update_lag;
};
ExecutableDictionarySource(
const DictionaryStructure & dict_struct_,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const Configuration & configuration_,
Block & sample_block_,
ContextConstPtr context_);
@ -53,10 +62,7 @@ private:
Poco::Logger * log;
time_t update_time = 0;
const DictionaryStructure dict_struct;
bool implicit_key;
const std::string command;
const std::string update_field;
const std::string format;
const Configuration configuration;
Block sample_block;
ContextConstPtr context;
};

View File

@ -70,12 +70,12 @@ ExecutablePoolDictionarySource::ExecutablePoolDictionarySource(const ExecutableP
BlockInputStreamPtr ExecutablePoolDictionarySource::loadAll()
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource with implicit_key does not support loadAll method");
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource does not support loadAll method");
}
BlockInputStreamPtr ExecutablePoolDictionarySource::loadUpdatedAll()
{
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource with implicit_key does not support loadAll method");
throw Exception(ErrorCodes::UNSUPPORTED_METHOD, "ExecutablePoolDictionarySource does not support loadUpdatedAll method");
}
namespace
@ -254,7 +254,7 @@ bool ExecutablePoolDictionarySource::supportsSelectiveLoad() const
bool ExecutablePoolDictionarySource::hasUpdateField() const
{
return !configuration.update_field.empty();
return false;
}
DictionarySourcePtr ExecutablePoolDictionarySource::clone() const
@ -295,9 +295,9 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
settings_no_parallel_parsing.input_format_parallel_parsing = false;
context_local_copy->setSettings(settings_no_parallel_parsing);
String configuration_config_prefix = config_prefix + ".executable_pool";
String settings_config_prefix = config_prefix + ".executable_pool";
size_t max_command_execution_time = config.getUInt64(configuration_config_prefix + ".max_command_execution_time", 10);
size_t max_command_execution_time = config.getUInt64(settings_config_prefix + ".max_command_execution_time", 10);
size_t max_execution_time_seconds = static_cast<size_t>(context->getSettings().max_execution_time.totalSeconds());
if (max_execution_time_seconds != 0 && max_command_execution_time > max_execution_time_seconds)
@ -305,12 +305,11 @@ void registerDictionarySourceExecutablePool(DictionarySourceFactory & factory)
ExecutablePoolDictionarySource::Configuration configuration
{
.command = config.getString(configuration_config_prefix + ".command"),
.format = config.getString(configuration_config_prefix + ".format"),
.pool_size = config.getUInt64(configuration_config_prefix + ".size"),
.update_field = config.getString(configuration_config_prefix + ".update_field", ""),
.implicit_key = config.getBool(configuration_config_prefix + ".implicit_key", false),
.command_termination_timeout = config.getUInt64(configuration_config_prefix + ".command_termination_timeout", 10),
.command = config.getString(settings_config_prefix + ".command"),
.format = config.getString(settings_config_prefix + ".format"),
.pool_size = config.getUInt64(settings_config_prefix + ".size"),
.implicit_key = config.getBool(settings_config_prefix + ".implicit_key", false),
.command_termination_timeout = config.getUInt64(settings_config_prefix + ".command_termination_timeout", 10),
.max_command_execution_time = max_command_execution_time
};

Some files were not shown because too many files have changed in this diff Show More