diff --git a/CHANGELOG.md b/CHANGELOG.md
index 65a81346037..9d37fe182f9 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -2,6 +2,7 @@
#### Upgrade Notes
+* One bug has been found after release: [#25187](https://github.com/ClickHouse/ClickHouse/issues/25187).
* Do not upgrade if you have partition key with `UUID`.
* `zstd` compression library is updated to v1.5.0. You may get messages about "checksum does not match" in replication. These messages are expected due to update of compression algorithm and you can ignore them. These messages are informational and do not indicate any kinds of undesired behaviour.
* The setting `compile_expressions` is enabled by default. Although it has been heavily tested on variety of scenarios, if you find some undesired behaviour on your servers, you can try turning this setting off.
diff --git a/contrib/croaring b/contrib/croaring
index d8402939b5c..2c867e9f9c9 160000
--- a/contrib/croaring
+++ b/contrib/croaring
@@ -1 +1 @@
-Subproject commit d8402939b5c9fc134fd4fcf058fe0f7006d2b129
+Subproject commit 2c867e9f9c9e2a3a7032791f94c4c7ae3013f6e0
diff --git a/debian/clickhouse-server.cron.d b/debian/clickhouse-server.cron.d
index 03bbd620aa7..1e5d4aab733 100644
--- a/debian/clickhouse-server.cron.d
+++ b/debian/clickhouse-server.cron.d
@@ -1 +1 @@
-#*/10 * * * * root (which service > /dev/null 2>&1 && (service clickhouse-server condstart ||:)) || /etc/init.d/clickhouse-server condstart > /dev/null 2>&1
+#*/10 * * * * root ((which service > /dev/null 2>&1 && (service clickhouse-server condstart ||:)) || /etc/init.d/clickhouse-server condstart) > /dev/null 2>&1
diff --git a/docker/test/fuzzer/run-fuzzer.sh b/docker/test/fuzzer/run-fuzzer.sh
index 670fc9e58b3..c5e457726ef 100755
--- a/docker/test/fuzzer/run-fuzzer.sh
+++ b/docker/test/fuzzer/run-fuzzer.sh
@@ -97,14 +97,10 @@ function fuzz
NEW_TESTS_OPT="${NEW_TESTS_OPT:-}"
fi
+ export CLICKHOUSE_WATCHDOG_ENABLE=0 # interferes with gdb
clickhouse-server --config-file db/config.xml -- --path db 2>&1 | tail -100000 > server.log &
-
server_pid=$!
kill -0 $server_pid
- while ! clickhouse-client --query "select 1" && kill -0 $server_pid ; do echo . ; sleep 1 ; done
- clickhouse-client --query "select 1"
- kill -0 $server_pid
- echo Server started
echo "
handle all noprint
@@ -115,12 +111,31 @@ thread apply all backtrace
continue
" > script.gdb
- gdb -batch -command script.gdb -p "$(pidof clickhouse-server)" &
+ gdb -batch -command script.gdb -p $server_pid &
+
+ # Check connectivity after we attach gdb, because it might cause the server
+ # to freeze and the fuzzer will fail.
+ for _ in {1..60}
+ do
+ sleep 1
+ if clickhouse-client --query "select 1"
+ then
+ break
+ fi
+ done
+ clickhouse-client --query "select 1" # This checks that the server is responding
+ kill -0 $server_pid # This checks that it is our server that is started and not some other one
+ echo Server started and responded
# SC2012: Use find instead of ls to better handle non-alphanumeric filenames. They are all alphanumeric.
# SC2046: Quote this to prevent word splitting. Actually I need word splitting.
# shellcheck disable=SC2012,SC2046
- clickhouse-client --query-fuzzer-runs=1000 --queries-file $(ls -1 ch/tests/queries/0_stateless/*.sql | sort -R) $NEW_TESTS_OPT \
+ clickhouse-client \
+ --receive_timeout=10 \
+ --receive_data_timeout_ms=10000 \
+ --query-fuzzer-runs=1000 \
+ --queries-file $(ls -1 ch/tests/queries/0_stateless/*.sql | sort -R) \
+ $NEW_TESTS_OPT \
> >(tail -n 100000 > fuzzer.log) \
2>&1 &
fuzzer_pid=$!
@@ -198,13 +213,17 @@ continue
echo "success" > status.txt
echo "OK" > description.txt
else
- # The server was alive, but the fuzzer returned some error. Probably this
- # is a problem in the fuzzer itself. Don't grep the server log in this
- # case, because we will find a message about normal server termination
- # (Received signal 15), which is confusing.
+ # The server was alive, but the fuzzer returned some error. This might
+ # be some client-side error detected by fuzzing, or a problem in the
+ # fuzzer itself. Don't grep the server log in this case, because we will
+ # find a message about normal server termination (Received signal 15),
+ # which is confusing.
task_exit_code=$fuzzer_exit_code
echo "failure" > status.txt
- echo "Fuzzer failed ($fuzzer_exit_code). See the logs." > description.txt
+ { grep -o "Found error:.*" fuzzer.log \
+ || grep -o "Exception.*" fuzzer.log \
+ || echo "Fuzzer failed ($fuzzer_exit_code). See the logs." ; } \
+ | tail -1 > description.txt
fi
}
diff --git a/docker/test/performance-comparison/compare.sh b/docker/test/performance-comparison/compare.sh
index a027a94ab70..2621a894dd7 100755
--- a/docker/test/performance-comparison/compare.sh
+++ b/docker/test/performance-comparison/compare.sh
@@ -554,12 +554,6 @@ create table query_metric_stats_denorm engine File(TSVWithNamesAndTypes,
" 2> >(tee -a analyze/errors.log 1>&2)
# Fetch historical query variability thresholds from the CI database
-clickhouse-local --query "
- left join file('analyze/report-thresholds.tsv', TSV,
- 'test text, report_threshold float') thresholds
- on query_metric_stats.test = thresholds.test
-"
-
if [ -v CHPC_DATABASE_URL ]
then
set +x # Don't show password in the log
@@ -577,7 +571,8 @@ then
--date_time_input_format=best_effort)
-# Precision is going to be 1.5 times worse for PRs. How do I know it? I ran this:
+# Precision is going to be 1.5 times worse for PRs, because we run the queries
+# less times. How do I know it? I ran this:
# SELECT quantilesExact(0., 0.1, 0.5, 0.75, 0.95, 1.)(p / m)
# FROM
# (
@@ -592,19 +587,27 @@ then
# query_display_name
# HAVING count(*) > 100
# )
-# The file can be empty if the server is inaccessible, so we can't use TSVWithNamesAndTypes.
+#
+# The file can be empty if the server is inaccessible, so we can't use
+# TSVWithNamesAndTypes.
+#
"${client[@]}" --query "
select test, query_index,
- quantileExact(0.99)(abs(diff)) max_diff,
- quantileExactIf(0.99)(stat_threshold, abs(diff) < stat_threshold) * 1.5 max_stat_threshold,
+ quantileExact(0.99)(abs(diff)) * 1.5 AS max_diff,
+ quantileExactIf(0.99)(stat_threshold, abs(diff) < stat_threshold) * 1.5 AS max_stat_threshold,
query_display_name
from query_metrics_v2
- where event_date > now() - interval 1 month
+ -- We use results at least one week in the past, so that the current
+ -- changes do not immediately influence the statistics, and we have
+ -- some time to notice that something is wrong.
+ where event_date between now() - interval 1 month - interval 1 week
+ and now() - interval 1 week
and metric = 'client_time'
and pr_number = 0
group by test, query_index, query_display_name
having count(*) > 100
" > analyze/historical-thresholds.tsv
+ set -x
else
touch analyze/historical-thresholds.tsv
fi
@@ -1224,6 +1227,55 @@ unset IFS
function upload_results
{
+ # Prepare info for the CI checks table.
+ rm ci-checks.tsv
+ clickhouse-local --query "
+create view queries as select * from file('report/queries.tsv', TSVWithNamesAndTypes,
+ 'changed_fail int, changed_show int, unstable_fail int, unstable_show int,
+ left float, right float, diff float, stat_threshold float,
+ test text, query_index int, query_display_name text');
+
+create table ci_checks engine File(TSVWithNamesAndTypes, 'ci-checks.tsv')
+ as select
+ $PR_TO_TEST pull_request_number,
+ '$SHA_TO_TEST' commit_sha,
+ 'Performance' check_name,
+ '$(sed -n 's/.*/\1/p' report.html)' check_status,
+ -- TODO toDateTime() can't parse output of 'date', so no time for now.
+ ($(date +%s) - $CHPC_CHECK_START_TIMESTAMP) * 1000 check_duration_ms,
+ fromUnixTimestamp($CHPC_CHECK_START_TIMESTAMP) check_start_time,
+ test_name,
+ test_status,
+ test_duration_ms,
+ report_url,
+ $PR_TO_TEST = 0
+ ? 'https://github.com/ClickHouse/ClickHouse/commit/$SHA_TO_TEST'
+ : 'https://github.com/ClickHouse/ClickHouse/pull/$PR_TO_TEST' pull_request_url,
+ '' commit_url,
+ '' task_url,
+ '' base_ref,
+ '' base_repo,
+ '' head_ref,
+ '' head_repo
+ from (
+ select '' test_name,
+ '$(sed -n 's/.*/\1/p' report.html)' test_status,
+ 0 test_duration_ms,
+ 'https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#fail1' report_url
+ union all
+ select test || ' #' || toString(query_index), 'slower' test_status, 0 test_duration_ms,
+ 'https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#changes-in-performance.'
+ || test || '.' || toString(query_index) report_url
+ from queries where changed_fail != 0 and diff > 0
+ union all
+ select test || ' #' || toString(query_index), 'unstable' test_status, 0 test_duration_ms,
+ 'https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#unstable-queries.'
+ || test || '.' || toString(query_index) report_url
+ from queries where unstable_fail != 0
+ )
+;
+ "
+
if ! [ -v CHPC_DATABASE_URL ]
then
echo Database for test results is not specified, will not upload them.
@@ -1292,6 +1344,10 @@ $REF_SHA $SHA_TO_TEST $(numactl --show | sed -n 's/^cpubind:[[:space:]]\+/numact
$REF_SHA $SHA_TO_TEST $(numactl --hardware | sed -n 's/^available:[[:space:]]\+/numactl-available /p')
EOF
+ # Also insert some data about the check into the CI checks table.
+ "${client[@]}" --query "INSERT INTO "'"'"gh-data"'"'".checks FORMAT TSVWithNamesAndTypes" \
+ < ci-checks.tsv
+
set -x
}
diff --git a/docker/test/performance-comparison/entrypoint.sh b/docker/test/performance-comparison/entrypoint.sh
index 570a1c21514..614debce1c1 100755
--- a/docker/test/performance-comparison/entrypoint.sh
+++ b/docker/test/performance-comparison/entrypoint.sh
@@ -1,6 +1,9 @@
#!/bin/bash
set -ex
+CHPC_CHECK_START_TIMESTAMP="$(date +%s)"
+export CHPC_CHECK_START_TIMESTAMP
+
# Use the packaged repository to find the revision we will compare to.
function find_reference_sha
{
diff --git a/docker/test/performance-comparison/report.py b/docker/test/performance-comparison/report.py
index dabf6b7b93d..b69a1e0d3f6 100755
--- a/docker/test/performance-comparison/report.py
+++ b/docker/test/performance-comparison/report.py
@@ -561,8 +561,9 @@ if args.report == 'main':
# Don't show mildly unstable queries, only the very unstable ones we
# treat as errors.
if very_unstable_queries:
- error_tests += very_unstable_queries
- status = 'failure'
+ if very_unstable_queries > 3:
+ error_tests += very_unstable_queries
+ status = 'failure'
message_array.append(str(very_unstable_queries) + ' unstable')
error_tests += slow_average_tests
diff --git a/docker/test/testflows/runner/dockerd-entrypoint.sh b/docker/test/testflows/runner/dockerd-entrypoint.sh
index 01593488648..8abbd9e1c8e 100755
--- a/docker/test/testflows/runner/dockerd-entrypoint.sh
+++ b/docker/test/testflows/runner/dockerd-entrypoint.sh
@@ -1,6 +1,15 @@
#!/bin/bash
set -e
+echo "Configure to use Yandex dockerhub-proxy"
+mkdir -p /etc/docker/
+cat > /etc/docker/daemon.json << EOF
+{
+ "insecure-registries" : ["dockerhub-proxy.sas.yp-c.yandex.net:5000"],
+ "registry-mirrors" : ["http://dockerhub-proxy.sas.yp-c.yandex.net:5000"]
+}
+EOF
+
dockerd --host=unix:///var/run/docker.sock --host=tcp://0.0.0.0:2375 &>/var/log/somefile &
set +e
@@ -16,14 +25,6 @@ while true; do
done
set -e
-echo "Configure to use Yandex dockerhub-proxy"
-cat > /etc/docker/daemon.json << EOF
-{
- "insecure-registries": ["dockerhub-proxy.sas.yp-c.yandex.net:5000"],
- "registry-mirrors": ["dockerhub-proxy.sas.yp-c.yandex.net:5000"]
-}
-EOF
-
echo "Start tests"
export CLICKHOUSE_TESTS_SERVER_BIN_PATH=/clickhouse
export CLICKHOUSE_TESTS_CLIENT_BIN_PATH=/clickhouse
diff --git a/docs/en/getting-started/install.md b/docs/en/getting-started/install.md
index 9a4848a3ef0..4256de49e4a 100644
--- a/docs/en/getting-started/install.md
+++ b/docs/en/getting-started/install.md
@@ -94,6 +94,15 @@ For production environments, it’s recommended to use the latest `stable`-versi
To run ClickHouse inside Docker follow the guide on [Docker Hub](https://hub.docker.com/r/yandex/clickhouse-server/). Those images use official `deb` packages inside.
+### Single Binary
+
+You can install ClickHouse on Linux using single portable binary from the latest commit of the `master` branch: [https://builds.clickhouse.tech/master/amd64/clickhouse].
+
+```
+curl -O 'https://builds.clickhouse.tech/master/amd64/clickhouse' && chmod a+x clickhouse
+sudo ./clickhouse install
+```
+
### From Precompiled Binaries for Non-Standard Environments {#from-binaries-non-linux}
For non-Linux operating systems and for AArch64 CPU arhitecture, ClickHouse builds are provided as a cross-compiled binary from the latest commit of the `master` branch (with a few hours delay).
@@ -104,7 +113,7 @@ For non-Linux operating systems and for AArch64 CPU arhitecture, ClickHouse buil
After downloading, you can use the `clickhouse client` to connect to the server, or `clickhouse local` to process local data.
-Run `sudo ./clickhouse install` if you want to install clickhouse system-wide (also with needed configuration files, configuring users etc.). After that run `clickhouse start` commands to start the clickhouse-server and `clickhouse-client` to connect to it.
+Run `sudo ./clickhouse install` if you want to install clickhouse system-wide (also with needed configuration files, configuring users etc.). After that run `clickhouse start` commands to start the clickhouse-server and `clickhouse-client` to connect to it.
These builds are not recommended for use in production environments because they are less thoroughly tested, but you can do so on your own risk. They also have only a subset of ClickHouse features available.
diff --git a/docs/en/operations/external-authenticators/ldap.md b/docs/en/operations/external-authenticators/ldap.md
index 805d45e1b38..5a3db6faf55 100644
--- a/docs/en/operations/external-authenticators/ldap.md
+++ b/docs/en/operations/external-authenticators/ldap.md
@@ -56,13 +56,13 @@ Note, that you can define multiple LDAP servers inside the `ldap_servers` sectio
- `port` — LDAP server port, default is `636` if `enable_tls` is set to `true`, `389` otherwise.
- `bind_dn` — Template used to construct the DN to bind to.
- The resulting DN will be constructed by replacing all `{user_name}` substrings of the template with the actual user name during each authentication attempt.
-- `user_dn_detection` - Section with LDAP search parameters for detecting the actual user DN of the bound user.
+- `user_dn_detection` — Section with LDAP search parameters for detecting the actual user DN of the bound user.
- This is mainly used in search filters for further role mapping when the server is Active Directory. The resulting user DN will be used when replacing `{user_dn}` substrings wherever they are allowed. By default, user DN is set equal to bind DN, but once search is performed, it will be updated with to the actual detected user DN value.
- - `base_dn` - Template used to construct the base DN for the LDAP search.
+ - `base_dn` — Template used to construct the base DN for the LDAP search.
- The resulting DN will be constructed by replacing all `{user_name}` and `{bind_dn}` substrings of the template with the actual user name and bind DN during the LDAP search.
- - `scope` - Scope of the LDAP search.
+ - `scope` — Scope of the LDAP search.
- Accepted values are: `base`, `one_level`, `children`, `subtree` (the default).
- - `search_filter` - Template used to construct the search filter for the LDAP search.
+ - `search_filter` — Template used to construct the search filter for the LDAP search.
- The resulting filter will be constructed by replacing all `{user_name}`, `{bind_dn}`, and `{base_dn}` substrings of the template with the actual user name, bind DN, and base DN during the LDAP search.
- Note, that the special characters must be escaped properly in XML.
- `verification_cooldown` — A period of time, in seconds, after a successful bind attempt, during which the user will be assumed to be successfully authenticated for all consecutive requests without contacting the LDAP server.
@@ -108,7 +108,6 @@ Note, that user `my_user` refers to `my_ldap_server`. This LDAP server must be c
When SQL-driven [Access Control and Account Management](../access-rights.md#access-control) is enabled, users that are authenticated by LDAP servers can also be created using the [CREATE USER](../../sql-reference/statements/create/user.md#create-user-statement) statement.
-
Query:
```sql
diff --git a/docs/en/sql-reference/aggregate-functions/reference/sumcount.md b/docs/en/sql-reference/aggregate-functions/reference/sumcount.md
index 80e87663f89..b2cb2cfdc09 100644
--- a/docs/en/sql-reference/aggregate-functions/reference/sumcount.md
+++ b/docs/en/sql-reference/aggregate-functions/reference/sumcount.md
@@ -4,7 +4,7 @@ toc_priority: 144
# sumCount {#agg_function-sumCount}
-Calculates the sum of the numbers and counts the number of rows at the same time.
+Calculates the sum of the numbers and counts the number of rows at the same time. The function is used by ClickHouse query optimizer: if there are multiple `sum`, `count` or `avg` functions in a query, they can be replaced to single `sumCount` function to reuse the calculations. The function is rarely needed to use explicitly.
**Syntax**
diff --git a/docs/en/sql-reference/aggregate-functions/reference/sumkahan.md b/docs/en/sql-reference/aggregate-functions/reference/sumkahan.md
new file mode 100644
index 00000000000..1f2b07f692b
--- /dev/null
+++ b/docs/en/sql-reference/aggregate-functions/reference/sumkahan.md
@@ -0,0 +1,40 @@
+---
+toc_priority: 145
+---
+
+# sumKahan {#agg_function-sumKahan}
+
+Calculates the sum of the numbers with [Kahan compensated summation algorithm](https://en.wikipedia.org/wiki/Kahan_summation_algorithm)
+Slower than [sum](./sum.md) function.
+The compensation works only for [Float](../../../sql-reference/data-types/float.md) types.
+
+
+**Syntax**
+
+``` sql
+sumKahan(x)
+```
+
+**Arguments**
+
+- `x` — Input value, must be [Integer](../../../sql-reference/data-types/int-uint.md), [Float](../../../sql-reference/data-types/float.md), or [Decimal](../../../sql-reference/data-types/decimal.md).
+
+**Returned value**
+
+- the sum of numbers, with type [Integer](../../../sql-reference/data-types/int-uint.md), [Float](../../../sql-reference/data-types/float.md), or [Decimal](../../../sql-reference/data-types/decimal.md) depends on type of input arguments
+
+**Example**
+
+Query:
+
+``` sql
+SELECT sum(0.1), sumKahan(0.1) FROM numbers(10);
+```
+
+Result:
+
+``` text
+┌───────────sum(0.1)─┬─sumKahan(0.1)─┐
+│ 0.9999999999999999 │ 1 │
+└────────────────────┴───────────────┘
+```
\ No newline at end of file
diff --git a/docs/en/sql-reference/aggregate-functions/reference/topkweighted.md b/docs/en/sql-reference/aggregate-functions/reference/topkweighted.md
index 8562336c829..694cbd1ad41 100644
--- a/docs/en/sql-reference/aggregate-functions/reference/topkweighted.md
+++ b/docs/en/sql-reference/aggregate-functions/reference/topkweighted.md
@@ -4,7 +4,7 @@ toc_priority: 109
# topKWeighted {#topkweighted}
-Similar to `topK` but takes one additional argument of integer type - `weight`. Every value is accounted `weight` times for frequency calculation.
+Returns an array of the approximately most frequent values in the specified column. The resulting array is sorted in descending order of approximate frequency of values (not by the values themselves). Additionally, the weight of the value is taken into account.
**Syntax**
@@ -15,11 +15,8 @@ topKWeighted(N)(x, weight)
**Arguments**
- `N` — The number of elements to return.
-
-**Arguments**
-
- `x` — The value.
-- `weight` — The weight. [UInt8](../../../sql-reference/data-types/int-uint.md).
+- `weight` — The weight. Every value is accounted `weight` times for frequency calculation. [UInt64](../../../sql-reference/data-types/int-uint.md).
**Returned value**
@@ -40,3 +37,7 @@ Result:
│ [999,998,997,996,995,994,993,992,991,990] │
└───────────────────────────────────────────┘
```
+
+**See Also**
+
+- [topK](../../../sql-reference/aggregate-functions/reference/topk.md)
diff --git a/docs/ru/operations/external-authenticators/ldap.md b/docs/ru/operations/external-authenticators/ldap.md
index 312020000ea..8df59cdfdad 100644
--- a/docs/ru/operations/external-authenticators/ldap.md
+++ b/docs/ru/operations/external-authenticators/ldap.md
@@ -1,4 +1,4 @@
-# LDAP {#external-authenticators-ldap}
+# LDAP {#external-authenticators-ldap}
Для аутентификации пользователей ClickHouse можно использовать сервер LDAP. Существуют два подхода:
@@ -17,6 +17,7 @@
+
localhost
636
@@ -31,6 +32,18 @@
/path/to/tls_ca_cert_dir
ECDHE-ECDSA-AES256-GCM-SHA384:ECDHE-RSA-AES256-GCM-SHA384:AES256-GCM-SHA384
+
+
+
+ localhost
+ 389
+ EXAMPLE\{user_name}
+
+ CN=Users,DC=example,DC=com
+ (&(objectClass=user)(sAMAccountName={user_name}))
+
+ no
+
```
@@ -41,9 +54,18 @@
- `host` — имя хоста сервера LDAP или его IP. Этот параметр обязательный и не может быть пустым.
- `port` — порт сервера LDAP. Если настройка `enable_tls` равна `true`, то по умолчанию используется порт `636`, иначе — порт `389`.
-- `bind_dn` — шаблон для создания DN для привязки.
+- `bind_dn` — шаблон для создания DN подключения.
- При формировании DN все подстроки `{user_name}` в шаблоне будут заменяться на фактическое имя пользователя при каждой попытке аутентификации.
-- `verification_cooldown` — промежуток времени (в секундах) после успешной попытки привязки, в течение которого пользователь будет считаться аутентифицированным и сможет выполнять запросы без повторного обращения к серверам LDAP.
+- `user_dn_detection` — секция с параметрами LDAP поиска для определения фактического значения DN подключенного пользователя.
+ - Это в основном используется в фильтрах поиска для дальнейшего сопоставления ролей, когда сервер является Active Directory. Полученный DN пользователя будет использоваться при замене подстрок `{user_dn}` везде, где они разрешены. По умолчанию DN пользователя устанавливается равным DN подключения, но после выполнения поиска он будет обновлен до фактического найденного значения DN пользователя.
+ - `base_dn` — шаблон для создания базового DN для LDAP поиска.
+ - При формировании DN все подстроки `{user_name}` и `{bind_dn}` в шаблоне будут заменяться на фактическое имя пользователя и DN подключения соответственно при каждом LDAP поиске.
+ - `scope` — область LDAP поиска.
+ - Возможные значения: `base`, `one_level`, `children`, `subtree` (по умолчанию).
+ - `search_filter` — шаблон для создания фильтра для каждого LDAP поиска.
+ - При формировании фильтра все подстроки `{user_name}`, `{bind_dn}`, `{user_dn}` и `{base_dn}` в шаблоне будут заменяться на фактическое имя пользователя, DN подключения, DN пользователя и базовый DN соответственно при каждом LDAP поиске.
+ - Обратите внимание, что специальные символы должны быть правильно экранированы в XML.
+- `verification_cooldown` — промежуток времени (в секундах) после успешной попытки подключения, в течение которого пользователь будет считаться аутентифицированным и сможет выполнять запросы без повторного обращения к серверам LDAP.
- Чтобы отключить кеширование и заставить обращаться к серверу LDAP для каждого запроса аутентификации, укажите `0` (значение по умолчанию).
- `enable_tls` — флаг, включающий использование защищенного соединения с сервером LDAP.
- Укажите `no` для использования текстового протокола `ldap://` (не рекомендовано).
@@ -106,7 +128,7 @@ CREATE USER my_user IDENTIFIED WITH ldap SERVER 'my_ldap_server';
-
+
my_ldap_server
@@ -121,6 +143,18 @@ CREATE USER my_user IDENTIFIED WITH ldap SERVER 'my_ldap_server';
clickhouse_
+
+
+
+ my_ad_server
+
+ CN=Users,DC=example,DC=com
+ CN
+ subtree
+ (&(objectClass=group)(member={user_dn}))
+ clickhouse_
+
+
```
@@ -135,14 +169,14 @@ CREATE USER my_user IDENTIFIED WITH ldap SERVER 'my_ldap_server';
- `role_mapping` — секция c параметрами LDAP поиска и правилами отображения.
- При аутентификации пользователя, пока еще связанного с LDAP, производится LDAP поиск с помощью `search_filter` и имени этого пользователя. Для каждой записи, найденной в ходе поиска, выделяется значение указанного атрибута. У каждого атрибута, имеющего указанный префикс, этот префикс удаляется, а остальная часть значения становится именем локальной роли, определенной в ClickHouse, причем предполагается, что эта роль была ранее создана запросом [CREATE ROLE](../../sql-reference/statements/create/role.md#create-role-statement) до этого.
- Внутри одной секции `ldap` может быть несколько секций `role_mapping`. Все они будут применены.
- - `base_dn` — шаблон, который используется для создания базового DN для LDAP поиска.
- - При формировании DN все подстроки `{user_name}` и `{bind_dn}` в шаблоне будут заменяться на фактическое имя пользователя и DN привязки соответственно при каждом LDAP поиске.
- - `scope` — Область LDAP поиска.
+ - `base_dn` — шаблон для создания базового DN для LDAP поиска.
+ - При формировании DN все подстроки `{user_name}`, `{bind_dn}` и `{user_dn}` в шаблоне будут заменяться на фактическое имя пользователя, DN подключения и DN пользователя соответственно при каждом LDAP поиске.
+ - `scope` — область LDAP поиска.
- Возможные значения: `base`, `one_level`, `children`, `subtree` (по умолчанию).
- - `search_filter` — шаблон, который используется для создания фильтра для каждого LDAP поиска.
- - при формировании фильтра все подстроки `{user_name}`, `{bind_dn}` и `{base_dn}` в шаблоне будут заменяться на фактическое имя пользователя, DN привязки и базовый DN соответственно при каждом LDAP поиске.
+ - `search_filter` — шаблон для создания фильтра для каждого LDAP поиска.
+ - При формировании фильтра все подстроки `{user_name}`, `{bind_dn}`, `{user_dn}` и `{base_dn}` в шаблоне будут заменяться на фактическое имя пользователя, DN подключения, DN пользователя и базовый DN соответственно при каждом LDAP поиске.
- Обратите внимание, что специальные символы должны быть правильно экранированы в XML.
- - `attribute` — имя атрибута, значение которого будет возвращаться LDAP поиском.
+ - `attribute` — имя атрибута, значение которого будет возвращаться LDAP поиском. По умолчанию: `cn`.
- `prefix` — префикс, который, как предполагается, будет находиться перед началом каждой строки в исходном списке строк, возвращаемых LDAP поиском. Префикс будет удален из исходных строк, а сами они будут рассматриваться как имена локальных ролей. По умолчанию: пустая строка.
[Оригинальная статья](https://clickhouse.tech/docs/en/operations/external-authenticators/ldap)
diff --git a/docs/ru/sql-reference/aggregate-functions/reference/sumkahan.md b/docs/ru/sql-reference/aggregate-functions/reference/sumkahan.md
new file mode 100644
index 00000000000..cdc713d5726
--- /dev/null
+++ b/docs/ru/sql-reference/aggregate-functions/reference/sumkahan.md
@@ -0,0 +1,39 @@
+---
+toc_priority: 145
+---
+
+# sumKahan {#agg_function-sumKahan}
+
+Вычисляет сумму с использованием [компенсационного суммирования по алгоритму Кэхэна](https://ru.wikipedia.org/wiki/Алгоритм_Кэхэна).
+Работает медленнее функции [sum](./sum.md).
+Компенсация работает только для [Float](../../../sql-reference/data-types/float.md) типов.
+
+**Синтаксис**
+
+``` sql
+sumKahan(x)
+```
+
+**Аргументы**
+
+- `x` — Входное значение типа [Integer](../../../sql-reference/data-types/int-uint.md), [Float](../../../sql-reference/data-types/float.md), или [Decimal](../../../sql-reference/data-types/decimal.md).
+
+**Возвращемое значение**
+
+- сумма чисел с типом [Integer](../../../sql-reference/data-types/int-uint.md), [Float](../../../sql-reference/data-types/float.md), ил [Decimal](../../../sql-reference/data-types/decimal.md) зависящим от типа входящих аргументов
+
+**Пример**
+
+Запрос:
+
+``` sql
+SELECT sum(0.1), sumKahan(0.1) FROM numbers(10);
+```
+
+Результат:
+
+``` text
+┌───────────sum(0.1)─┬─sumKahan(0.1)─┐
+│ 0.9999999999999999 │ 1 │
+└────────────────────┴───────────────┘
+```
\ No newline at end of file
diff --git a/docs/ru/sql-reference/aggregate-functions/reference/topkweighted.md b/docs/ru/sql-reference/aggregate-functions/reference/topkweighted.md
index 840f9c553f5..d0fd3856b24 100644
--- a/docs/ru/sql-reference/aggregate-functions/reference/topkweighted.md
+++ b/docs/ru/sql-reference/aggregate-functions/reference/topkweighted.md
@@ -4,7 +4,7 @@ toc_priority: 109
# topKWeighted {#topkweighted}
-Аналогична `topK`, но дополнительно принимает положительный целочисленный параметр `weight`. Каждое значение учитывается `weight` раз при расчёте частоты.
+Возвращает массив наиболее часто встречающихся значений в указанном столбце. Результирующий массив упорядочен по убыванию частоты значения (не по самим значениям). Дополнительно учитывается вес значения.
**Синтаксис**
@@ -15,11 +15,8 @@ topKWeighted(N)(x, weight)
**Аргументы**
- `N` — количество элементов для выдачи.
-
-**Аргументы**
-
- `x` — значение.
-- `weight` — вес. [UInt8](../../../sql-reference/data-types/int-uint.md).
+- `weight` — вес. Каждое значение учитывается `weight` раз при расчёте частоты. [UInt64](../../../sql-reference/data-types/int-uint.md).
**Возвращаемое значение**
@@ -41,3 +38,6 @@ SELECT topKWeighted(10)(number, number) FROM numbers(1000)
└───────────────────────────────────────────┘
```
+**Смотрите также**
+
+- [topK](../../../sql-reference/aggregate-functions/reference/topk.md)
diff --git a/docs/ru/sql-reference/statements/grant.md b/docs/ru/sql-reference/statements/grant.md
index 093e6eb3b93..05ffaa22bbd 100644
--- a/docs/ru/sql-reference/statements/grant.md
+++ b/docs/ru/sql-reference/statements/grant.md
@@ -319,13 +319,12 @@ GRANT INSERT(x,y) ON db.table TO john
Разрешает выполнять запросы [DROP](misc.md#drop) и [DETACH](misc.md#detach-statement) в соответствии со следующей иерархией привилегий:
-- `DROP`. Уровень:
+- `DROP`. Уровень: `GROUP`
- `DROP DATABASE`. Уровень: `DATABASE`
- `DROP TABLE`. Уровень: `TABLE`
- `DROP VIEW`. Уровень: `VIEW`
- `DROP DICTIONARY`. Уровень: `DICTIONARY`
-
### TRUNCATE {#grant-truncate}
Разрешает выполнять запросы [TRUNCATE](../../sql-reference/statements/truncate.md).
diff --git a/docs/ru/sql-reference/table-functions/s3.md b/docs/ru/sql-reference/table-functions/s3.md
index e062e59c67c..5b54940e830 100644
--- a/docs/ru/sql-reference/table-functions/s3.md
+++ b/docs/ru/sql-reference/table-functions/s3.md
@@ -122,14 +122,14 @@ FROM s3('https://storage.yandexcloud.net/my-test-bucket-768/big_prefix/file-{000
Запишем данные в файл `test-data.csv.gz`:
``` sql
-INSERT INTO s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
+INSERT INTO FUNCTION s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
VALUES ('test-data', 1), ('test-data-2', 2);
```
Запишем данные из существующей таблицы в файл `test-data.csv.gz`:
``` sql
-INSERT INTO s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
+INSERT INTO FUNCTION s3('https://storage.yandexcloud.net/my-test-bucket-768/test-data.csv.gz', 'CSV', 'name String, value UInt32', 'gzip')
SELECT name, value FROM existing_table;
```
diff --git a/docs/zh/engines/table-engines/integrations/hdfs.md b/docs/zh/engines/table-engines/integrations/hdfs.md
index 8d35dfeeb95..1a6ba0ba9e9 100644
--- a/docs/zh/engines/table-engines/integrations/hdfs.md
+++ b/docs/zh/engines/table-engines/integrations/hdfs.md
@@ -1,27 +1,24 @@
---
-machine_translated: true
-machine_translated_rev: 72537a2d527c63c07aa5d2361a8829f3895cf2bd
toc_priority: 36
toc_title: HDFS
---
# HDFS {#table_engines-hdfs}
-该引擎提供了集成 [Apache Hadoop](https://en.wikipedia.org/wiki/Apache_Hadoop) 生态系统通过允许管理数据 [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html)通过ClickHouse. 这个引擎是相似的
-到 [文件](../special/file.md#table_engines-file) 和 [URL](../special/url.md#table_engines-url) 引擎,但提供Hadoop特定的功能。
+这个引擎提供了与 [Apache Hadoop](https://en.wikipedia.org/wiki/Apache_Hadoop) 生态系统的集成,允许通过 ClickHouse 管理 [HDFS](https://hadoop.apache.org/docs/current/hadoop-project-dist/hadoop-hdfs/HdfsDesign.html) 上的数据。这个引擎类似于
+[文件](../../../engines/table-engines/special/file.md#table_engines-file) 和 [URL](../../../engines/table-engines/special/url.md#table_engines-url) 引擎,但提供了 Hadoop 的特定功能。
-## 用途 {#usage}
+## 用法 {#usage}
``` sql
ENGINE = HDFS(URI, format)
```
-该 `URI` 参数是HDFS中的整个文件URI。
-该 `format` 参数指定一种可用的文件格式。 执行
-`SELECT` 查询时,格式必须支持输入,并执行
-`INSERT` queries – for output. The available formats are listed in the
-[格式](../../../interfaces/formats.md#formats) 科。
-路径部分 `URI` 可能包含水珠。 在这种情况下,表将是只读的。
+`URI` 参数是 HDFS 中整个文件的 URI。
+`format` 参数指定一种可用的文件格式。 执行
+`SELECT` 查询时,格式必须支持输入,以及执行
+`INSERT` 查询时,格式必须支持输出. 你可以在 [格式](../../../interfaces/formats.md#formats) 章节查看可用的格式。
+路径部分 `URI` 可能包含 glob 通配符。 在这种情况下,表将是只读的。
**示例:**
@@ -58,20 +55,20 @@ SELECT * FROM hdfs_engine_table LIMIT 2
- 索引。
- 复制。
-**路径中的水珠**
+**路径中的通配符**
-多个路径组件可以具有globs。 对于正在处理的文件应该存在并匹配到整个路径模式。 文件列表确定在 `SELECT` (不在 `CREATE` 时刻)。
+多个路径组件可以具有 globs。 对于正在处理的文件应该存在并匹配到整个路径模式。 文件列表的确定是在 `SELECT` 的时候进行(而不是在 `CREATE` 的时候)。
-- `*` — Substitutes any number of any characters except `/` 包括空字符串。
-- `?` — Substitutes any single character.
-- `{some_string,another_string,yet_another_one}` — Substitutes any of strings `'some_string', 'another_string', 'yet_another_one'`.
-- `{N..M}` — Substitutes any number in range from N to M including both borders.
+- `*` — 替代任何数量的任何字符,除了 `/` 以及空字符串。
+- `?` — 代替任何单个字符.
+- `{some_string,another_string,yet_another_one}` — 替代任何字符串 `'some_string', 'another_string', 'yet_another_one'`.
+- `{N..M}` — 替换 N 到 M 范围内的任何数字,包括两个边界的值.
-建筑与 `{}` 类似于 [远程](../../../sql-reference/table-functions/remote.md) 表功能。
+带 `{}` 的结构类似于 [远程](../../../sql-reference/table-functions/remote.md) 表函数。
**示例**
-1. 假设我们在HDFS上有几个TSV格式的文件,其中包含以下Uri:
+1. 假设我们在 HDFS 上有几个 TSV 格式的文件,文件的 URI 如下:
- ‘hdfs://hdfs1:9000/some_dir/some_file_1’
- ‘hdfs://hdfs1:9000/some_dir/some_file_2’
@@ -111,10 +108,98 @@ CREATE TABLE table_with_asterisk (name String, value UInt32) ENGINE = HDFS('hdfs
CREARE TABLE big_table (name String, value UInt32) ENGINE = HDFS('hdfs://hdfs1:9000/big_dir/file{0..9}{0..9}{0..9}', 'CSV')
```
+## 配置 {#configuration}
+
+与 GraphiteMergeTree 类似,HDFS 引擎支持使用 ClickHouse 配置文件进行扩展配置。有两个配置键可以使用:全局 (`hdfs`) 和用户级别 (`hdfs_*`)。首先全局配置生效,然后用户级别配置生效 (如果用户级别配置存在) 。
+
+``` xml
+
+
+ /tmp/keytab/clickhouse.keytab
+ clickuser@TEST.CLICKHOUSE.TECH
+ kerberos
+
+
+
+
+ root@TEST.CLICKHOUSE.TECH
+
+```
+
+### 可选配置选项及其默认值的列表
+#### libhdfs3 支持的
+
+
+| **参数** | **默认值** |
+| rpc\_client\_connect\_tcpnodelay | true |
+| dfs\_client\_read\_shortcircuit | true |
+| output\_replace-datanode-on-failure | true |
+| input\_notretry-another-node | false |
+| input\_localread\_mappedfile | true |
+| dfs\_client\_use\_legacy\_blockreader\_local | false |
+| rpc\_client\_ping\_interval | 10 * 1000 |
+| rpc\_client\_connect\_timeout | 600 * 1000 |
+| rpc\_client\_read\_timeout | 3600 * 1000 |
+| rpc\_client\_write\_timeout | 3600 * 1000 |
+| rpc\_client\_socekt\_linger\_timeout | -1 |
+| rpc\_client\_connect\_retry | 10 |
+| rpc\_client\_timeout | 3600 * 1000 |
+| dfs\_default\_replica | 3 |
+| input\_connect\_timeout | 600 * 1000 |
+| input\_read\_timeout | 3600 * 1000 |
+| input\_write\_timeout | 3600 * 1000 |
+| input\_localread\_default\_buffersize | 1 * 1024 * 1024 |
+| dfs\_prefetchsize | 10 |
+| input\_read\_getblockinfo\_retry | 3 |
+| input\_localread\_blockinfo\_cachesize | 1000 |
+| input\_read\_max\_retry | 60 |
+| output\_default\_chunksize | 512 |
+| output\_default\_packetsize | 64 * 1024 |
+| output\_default\_write\_retry | 10 |
+| output\_connect\_timeout | 600 * 1000 |
+| output\_read\_timeout | 3600 * 1000 |
+| output\_write\_timeout | 3600 * 1000 |
+| output\_close\_timeout | 3600 * 1000 |
+| output\_packetpool\_size | 1024 |
+| output\_heeartbeat\_interval | 10 * 1000 |
+| dfs\_client\_failover\_max\_attempts | 15 |
+| dfs\_client\_read\_shortcircuit\_streams\_cache\_size | 256 |
+| dfs\_client\_socketcache\_expiryMsec | 3000 |
+| dfs\_client\_socketcache\_capacity | 16 |
+| dfs\_default\_blocksize | 64 * 1024 * 1024 |
+| dfs\_default\_uri | "hdfs://localhost:9000" |
+| hadoop\_security\_authentication | "simple" |
+| hadoop\_security\_kerberos\_ticket\_cache\_path | "" |
+| dfs\_client\_log\_severity | "INFO" |
+| dfs\_domain\_socket\_path | "" |
+
+
+[HDFS 配置参考](https://hawq.apache.org/docs/userguide/2.3.0.0-incubating/reference/HDFSConfigurationParameterReference.html) 也许会解释一些参数的含义.
+
+#### ClickHouse 额外的配置 {#clickhouse-extras}
+
+| **参数** | **默认值** |
+|hadoop\_kerberos\_keytab | "" |
+|hadoop\_kerberos\_principal | "" |
+|hadoop\_kerberos\_kinit\_command | kinit |
+
+#### 限制 {#limitations}
+ * hadoop\_security\_kerberos\_ticket\_cache\_path 只能在全局配置, 不能指定用户
+
+## Kerberos 支持 {#kerberos-support}
+
+如果 hadoop\_security\_authentication 参数的值为 'kerberos' ,ClickHouse 将通过 Kerberos 设施进行认证。
+[这里的](#clickhouse-extras) 参数和 hadoop\_security\_kerberos\_ticket\_cache\_path 也许会有帮助.
+注意,由于 libhdfs3 的限制,只支持老式的方法。
+数据节点的安全通信无法由 SASL 保证 ( HADOOP\_SECURE\_DN\_USER 是这种安全方法的一个可靠指标)
+使用 tests/integration/test\_storage\_kerberized\_hdfs/hdfs_configs/bootstrap.sh 脚本作为参考。
+
+如果指定了 hadoop\_kerberos\_keytab, hadoop\_kerberos\_principal 或者 hadoop\_kerberos\_kinit\_command ,将会调用 kinit 工具.在此情况下, hadoop\_kerberos\_keytab 和 hadoop\_kerberos\_principal 参数是必须配置的. kinit 工具和 krb5 配置文件是必要的.
+
## 虚拟列 {#virtual-columns}
-- `_path` — Path to the file.
-- `_file` — Name of the file.
+- `_path` — 文件路径.
+- `_file` — 文件名.
**另请参阅**
diff --git a/programs/client/Client.cpp b/programs/client/Client.cpp
index f268d2b5cdc..c9cd02d4e94 100644
--- a/programs/client/Client.cpp
+++ b/programs/client/Client.cpp
@@ -1336,7 +1336,7 @@ private:
fmt::print(
stderr,
- "IAST::clone() is broken for some AST node. This is a bug. The original AST ('dump before fuzz') and its cloned copy ('dump of cloned AST') refer to the same nodes, which must never happen. This means that their parent node doesn't implement clone() correctly.");
+ "Found error: IAST::clone() is broken for some AST node. This is a bug. The original AST ('dump before fuzz') and its cloned copy ('dump of cloned AST') refer to the same nodes, which must never happen. This means that their parent node doesn't implement clone() correctly.");
exit(1);
}
@@ -1461,7 +1461,7 @@ private:
const auto text_3 = ast_3->formatForErrorMessage();
if (text_3 != text_2)
{
- fmt::print(stderr, "The query formatting is broken.\n");
+ fmt::print(stderr, "Found error: The query formatting is broken.\n");
printChangedSettings();
diff --git a/programs/client/QueryFuzzer.cpp b/programs/client/QueryFuzzer.cpp
index 721e5acb991..438a8cab819 100644
--- a/programs/client/QueryFuzzer.cpp
+++ b/programs/client/QueryFuzzer.cpp
@@ -325,14 +325,14 @@ void QueryFuzzer::fuzzColumnLikeExpressionList(IAST * ast)
// the generic recursion into IAST.children.
}
-void QueryFuzzer::fuzzWindowFrame(WindowFrame & frame)
+void QueryFuzzer::fuzzWindowFrame(ASTWindowDefinition & def)
{
switch (fuzz_rand() % 40)
{
case 0:
{
const auto r = fuzz_rand() % 3;
- frame.type = r == 0 ? WindowFrame::FrameType::Rows
+ def.frame_type = r == 0 ? WindowFrame::FrameType::Rows
: r == 1 ? WindowFrame::FrameType::Range
: WindowFrame::FrameType::Groups;
break;
@@ -340,44 +340,65 @@ void QueryFuzzer::fuzzWindowFrame(WindowFrame & frame)
case 1:
{
const auto r = fuzz_rand() % 3;
- frame.begin_type = r == 0 ? WindowFrame::BoundaryType::Unbounded
+ def.frame_begin_type = r == 0 ? WindowFrame::BoundaryType::Unbounded
: r == 1 ? WindowFrame::BoundaryType::Current
: WindowFrame::BoundaryType::Offset;
+
+ if (def.frame_begin_type == WindowFrame::BoundaryType::Offset)
+ {
+ // The offsets are fuzzed normally through 'children'.
+ def.frame_begin_offset
+ = std::make_shared(getRandomField(0));
+ }
+ else
+ {
+ def.frame_begin_offset = nullptr;
+ }
break;
}
case 2:
{
const auto r = fuzz_rand() % 3;
- frame.end_type = r == 0 ? WindowFrame::BoundaryType::Unbounded
+ def.frame_end_type = r == 0 ? WindowFrame::BoundaryType::Unbounded
: r == 1 ? WindowFrame::BoundaryType::Current
: WindowFrame::BoundaryType::Offset;
- break;
- }
- case 3:
- {
- frame.begin_offset = getRandomField(0).get();
- break;
- }
- case 4:
- {
- frame.end_offset = getRandomField(0).get();
+
+ if (def.frame_end_type == WindowFrame::BoundaryType::Offset)
+ {
+ def.frame_end_offset
+ = std::make_shared(getRandomField(0));
+ }
+ else
+ {
+ def.frame_end_offset = nullptr;
+ }
break;
}
case 5:
{
- frame.begin_preceding = fuzz_rand() % 2;
+ def.frame_begin_preceding = fuzz_rand() % 2;
break;
}
case 6:
{
- frame.end_preceding = fuzz_rand() % 2;
+ def.frame_end_preceding = fuzz_rand() % 2;
break;
}
default:
break;
}
- frame.is_default = (frame == WindowFrame{});
+ if (def.frame_type == WindowFrame::FrameType::Range
+ && def.frame_begin_type == WindowFrame::BoundaryType::Unbounded
+ && def.frame_begin_preceding
+ && def.frame_end_type == WindowFrame::BoundaryType::Current)
+ {
+ def.frame_is_default = true; /* NOLINT clang-tidy could you just shut up please */
+ }
+ else
+ {
+ def.frame_is_default = false;
+ }
}
void QueryFuzzer::fuzz(ASTs & asts)
@@ -464,7 +485,7 @@ void QueryFuzzer::fuzz(ASTPtr & ast)
auto & def = fn->window_definition->as();
fuzzColumnLikeExpressionList(def.partition_by.get());
fuzzOrderByList(def.order_by.get());
- fuzzWindowFrame(def.frame);
+ fuzzWindowFrame(def);
}
fuzz(fn->children);
diff --git a/programs/client/QueryFuzzer.h b/programs/client/QueryFuzzer.h
index 7c79e683eb4..19f089c6c4e 100644
--- a/programs/client/QueryFuzzer.h
+++ b/programs/client/QueryFuzzer.h
@@ -17,7 +17,7 @@ namespace DB
class ASTExpressionList;
class ASTOrderByElement;
-struct WindowFrame;
+struct ASTWindowDefinition;
/*
* This is an AST-based query fuzzer that makes random modifications to query
@@ -69,7 +69,7 @@ struct QueryFuzzer
void fuzzOrderByElement(ASTOrderByElement * elem);
void fuzzOrderByList(IAST * ast);
void fuzzColumnLikeExpressionList(IAST * ast);
- void fuzzWindowFrame(WindowFrame & frame);
+ void fuzzWindowFrame(ASTWindowDefinition & def);
void fuzz(ASTs & asts);
void fuzz(ASTPtr & ast);
void collectFuzzInfoMain(const ASTPtr ast);
diff --git a/programs/copier/ClusterCopier.cpp b/programs/copier/ClusterCopier.cpp
index a60896388a0..6863c6e7c19 100644
--- a/programs/copier/ClusterCopier.cpp
+++ b/programs/copier/ClusterCopier.cpp
@@ -1,12 +1,15 @@
#include "ClusterCopier.h"
#include "Internals.h"
+#include "StatusAccumulator.h"
#include
#include
#include
#include
-
+#include
+#include
+#include
namespace DB
{
@@ -29,17 +32,16 @@ void ClusterCopier::init()
if (response.error != Coordination::Error::ZOK)
return;
UInt64 version = ++task_description_version;
- LOG_DEBUG(log, "Task description should be updated, local version {}", version);
+ LOG_INFO(log, "Task description should be updated, local version {}", version);
};
task_description_path = task_zookeeper_path + "/description";
task_cluster = std::make_unique(task_zookeeper_path, working_database_name);
reloadTaskDescription();
- task_cluster_initial_config = task_cluster_current_config;
- task_cluster->loadTasks(*task_cluster_initial_config);
- getContext()->setClustersConfig(task_cluster_initial_config, task_cluster->clusters_prefix);
+ task_cluster->loadTasks(*task_cluster_current_config);
+ getContext()->setClustersConfig(task_cluster_current_config, task_cluster->clusters_prefix);
/// Set up shards and their priority
task_cluster->random_engine.seed(task_cluster->random_device());
@@ -50,12 +52,14 @@ void ClusterCopier::init()
task_table.initShards(task_cluster->random_engine);
}
- LOG_DEBUG(log, "Will process {} table tasks", task_cluster->table_tasks.size());
+ LOG_INFO(log, "Will process {} table tasks", task_cluster->table_tasks.size());
/// Do not initialize tables, will make deferred initialization in process()
zookeeper->createAncestors(getWorkersPathVersion() + "/");
zookeeper->createAncestors(getWorkersPath() + "/");
+ /// Init status node
+ zookeeper->createIfNotExists(task_zookeeper_path + "/status", "{}");
}
template
@@ -138,7 +142,7 @@ void ClusterCopier::discoverShardPartitions(const ConnectionTimeouts & timeouts,
{
if (!task_table.enabled_partitions_set.count(partition_name))
{
- LOG_DEBUG(log, "Partition {} will not be processed, since it is not in enabled_partitions of {}", partition_name, task_table.table_id);
+ LOG_INFO(log, "Partition {} will not be processed, since it is not in enabled_partitions of {}", partition_name, task_table.table_id);
}
}
}
@@ -173,7 +177,7 @@ void ClusterCopier::discoverShardPartitions(const ConnectionTimeouts & timeouts,
LOG_WARNING(log, "There are no {} partitions from enabled_partitions in shard {} :{}", missing_partitions.size(), task_shard->getDescription(), ss.str());
}
- LOG_DEBUG(log, "Will copy {} partitions from shard {}", task_shard->partition_tasks.size(), task_shard->getDescription());
+ LOG_INFO(log, "Will copy {} partitions from shard {}", task_shard->partition_tasks.size(), task_shard->getDescription());
}
void ClusterCopier::discoverTablePartitions(const ConnectionTimeouts & timeouts, TaskTable & task_table, UInt64 num_threads)
@@ -189,7 +193,7 @@ void ClusterCopier::discoverTablePartitions(const ConnectionTimeouts & timeouts,
discoverShardPartitions(timeouts, task_shard);
});
- LOG_DEBUG(log, "Waiting for {} setup jobs", thread_pool.active());
+ LOG_INFO(log, "Waiting for {} setup jobs", thread_pool.active());
thread_pool.wait();
}
}
@@ -213,7 +217,7 @@ void ClusterCopier::uploadTaskDescription(const std::string & task_path, const s
if (code != Coordination::Error::ZOK && force)
zookeeper->createOrUpdate(local_task_description_path, task_config_str, zkutil::CreateMode::Persistent);
- LOG_DEBUG(log, "Task description {} uploaded to {} with result {} ({})",
+ LOG_INFO(log, "Task description {} uploaded to {} with result {} ({})",
((code != Coordination::Error::ZOK && !force) ? "not " : ""), local_task_description_path, code, Coordination::errorMessage(code));
}
@@ -222,23 +226,17 @@ void ClusterCopier::reloadTaskDescription()
auto zookeeper = getContext()->getZooKeeper();
task_description_watch_zookeeper = zookeeper;
- String task_config_str;
Coordination::Stat stat{};
- Coordination::Error code;
- zookeeper->tryGetWatch(task_description_path, task_config_str, &stat, task_description_watch_callback, &code);
- if (code != Coordination::Error::ZOK)
- throw Exception("Can't get description node " + task_description_path, ErrorCodes::BAD_ARGUMENTS);
+ /// It will throw exception if such a node doesn't exist.
+ auto task_config_str = zookeeper->get(task_description_path, &stat);
- LOG_DEBUG(log, "Loading description, zxid={}", task_description_current_stat.czxid);
- auto config = getConfigurationFromXMLString(task_config_str);
+ LOG_INFO(log, "Loading task description");
+ task_cluster_current_config = getConfigurationFromXMLString(task_config_str);
/// Setup settings
- task_cluster->reloadSettings(*config);
+ task_cluster->reloadSettings(*task_cluster_current_config);
getContext()->setSettings(task_cluster->settings_common);
-
- task_cluster_current_config = config;
- task_description_current_stat = stat;
}
void ClusterCopier::updateConfigIfNeeded()
@@ -250,7 +248,7 @@ void ClusterCopier::updateConfigIfNeeded()
if (!is_outdated_version && !is_expired_session)
return;
- LOG_DEBUG(log, "Updating task description");
+ LOG_INFO(log, "Updating task description");
reloadTaskDescription();
task_description_current_version = version_to_update;
@@ -361,7 +359,7 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee
if (static_cast(stat.numChildren) >= task_cluster->max_workers)
{
- LOG_DEBUG(log, "Too many workers ({}, maximum {}). Postpone processing {}", stat.numChildren, task_cluster->max_workers, description);
+ LOG_INFO(log, "Too many workers ({}, maximum {}). Postpone processing {}", stat.numChildren, task_cluster->max_workers, description);
if (unprioritized)
current_sleep_time = std::min(max_sleep_time, current_sleep_time + default_sleep_time);
@@ -387,7 +385,7 @@ zkutil::EphemeralNodeHolder::Ptr ClusterCopier::createTaskWorkerNodeAndWaitIfNee
/// Try to make fast retries
if (num_bad_version_errors > 3)
{
- LOG_DEBUG(log, "A concurrent worker has just been added, will check free worker slots again");
+ LOG_INFO(log, "A concurrent worker has just been added, will check free worker slots again");
std::chrono::milliseconds random_sleep_time(std::uniform_int_distribution(1, 1000)(task_cluster->random_engine));
std::this_thread::sleep_for(random_sleep_time);
num_bad_version_errors = 0;
@@ -422,7 +420,7 @@ bool ClusterCopier::checkAllPiecesInPartitionAreDone(const TaskTable & task_tabl
{
bool piece_is_done = checkPartitionPieceIsDone(task_table, partition_name, piece_number, shards_with_partition);
if (!piece_is_done)
- LOG_DEBUG(log, "Partition {} piece {} is not already done.", partition_name, piece_number);
+ LOG_INFO(log, "Partition {} piece {} is not already done.", partition_name, piece_number);
answer &= piece_is_done;
}
@@ -438,7 +436,7 @@ bool ClusterCopier::checkAllPiecesInPartitionAreDone(const TaskTable & task_tabl
bool ClusterCopier::checkPartitionPieceIsDone(const TaskTable & task_table, const String & partition_name,
size_t piece_number, const TasksShard & shards_with_partition)
{
- LOG_DEBUG(log, "Check that all shards processed partition {} piece {} successfully", partition_name, piece_number);
+ LOG_INFO(log, "Check that all shards processed partition {} piece {} successfully", partition_name, piece_number);
auto zookeeper = getContext()->getZooKeeper();
@@ -530,7 +528,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
inject_fault = value < move_fault_probability;
}
- LOG_DEBUG(log, "Try to move {} to destination table", partition_name);
+ LOG_INFO(log, "Try to move {} to destination table", partition_name);
auto zookeeper = getContext()->getZooKeeper();
@@ -548,7 +546,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
{
if (e.code == Coordination::Error::ZNODEEXISTS)
{
- LOG_DEBUG(log, "Someone is already moving pieces {}", current_partition_attach_is_active);
+ LOG_INFO(log, "Someone is already moving pieces {}", current_partition_attach_is_active);
return TaskStatus::Active;
}
@@ -565,13 +563,13 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
if (status.state == TaskState::Finished)
{
- LOG_DEBUG(log, "All pieces for partition from this task {} has been successfully moved to destination table by {}", current_partition_attach_is_active, status.owner);
+ LOG_INFO(log, "All pieces for partition from this task {} has been successfully moved to destination table by {}", current_partition_attach_is_active, status.owner);
return TaskStatus::Finished;
}
/// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
/// Initialize DROP PARTITION
- LOG_DEBUG(log, "Moving piece for partition {} has not been successfully finished by {}. Will try to move by myself.", current_partition_attach_is_active, status.owner);
+ LOG_INFO(log, "Moving piece for partition {} has not been successfully finished by {}. Will try to move by myself.", current_partition_attach_is_active, status.owner);
/// Remove is_done marker.
zookeeper->remove(current_partition_attach_is_done);
@@ -585,10 +583,25 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
zookeeper->create(current_partition_attach_is_done, start_state, zkutil::CreateMode::Persistent);
}
+
+ /// Try to drop destination partition in original table
+ if (task_table.allow_to_drop_target_partitions)
+ {
+ DatabaseAndTableName original_table = task_table.table_push;
+
+ WriteBufferFromOwnString ss;
+ ss << "ALTER TABLE " << getQuotedTable(original_table) << ((partition_name == "'all'") ? " DROP PARTITION ID " : " DROP PARTITION ") << partition_name;
+
+ UInt64 num_shards_drop_partition = executeQueryOnCluster(task_table.cluster_push, ss.str(), task_cluster->settings_push, ClusterExecutionMode::ON_EACH_SHARD);
+
+ LOG_INFO(log, "Drop partition {} in original table {} have been executed successfully on {} shards of {}",
+ partition_name, getQuotedTable(original_table), num_shards_drop_partition, task_table.cluster_push->getShardCount());
+ }
+
/// Move partition to original destination table.
for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
{
- LOG_DEBUG(log, "Trying to move partition {} piece {} to original table", partition_name, toString(current_piece_number));
+ LOG_INFO(log, "Trying to move partition {} piece {} to original table", partition_name, toString(current_piece_number));
ASTPtr query_alter_ast;
String query_alter_ast_string;
@@ -600,18 +613,15 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
Settings settings_push = task_cluster->settings_push;
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_NODE;
- UInt64 max_successful_executions_per_shard = 0;
+
if (settings_push.replication_alter_partitions_sync == 1)
- {
execution_mode = ClusterExecutionMode::ON_EACH_SHARD;
- max_successful_executions_per_shard = 1;
- }
query_alter_ast_string += " ALTER TABLE " + getQuotedTable(original_table) +
((partition_name == "'all'") ? " ATTACH PARTITION ID " : " ATTACH PARTITION ") + partition_name +
" FROM " + getQuotedTable(helping_table);
- LOG_DEBUG(log, "Executing ALTER query: {}", query_alter_ast_string);
+ LOG_INFO(log, "Executing ALTER query: {}", query_alter_ast_string);
try
{
@@ -620,9 +630,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
task_table.cluster_push,
query_alter_ast_string,
task_cluster->settings_push,
- PoolMode::GET_MANY,
- execution_mode,
- max_successful_executions_per_shard);
+ execution_mode);
if (settings_push.replication_alter_partitions_sync == 1)
{
@@ -634,9 +642,7 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
task_table.cluster_push->getShardCount());
if (num_nodes != task_table.cluster_push->getShardCount())
- {
return TaskStatus::Error;
- }
}
else
{
@@ -645,50 +651,46 @@ TaskStatus ClusterCopier::tryMoveAllPiecesToDestinationTable(const TaskTable & t
}
catch (...)
{
- LOG_DEBUG(log, "Error while moving partition {} piece {} to original table", partition_name, toString(current_piece_number));
+ LOG_INFO(log, "Error while moving partition {} piece {} to original table", partition_name, toString(current_piece_number));
+ LOG_WARNING(log, "In case of non-replicated tables it can cause duplicates.");
throw;
}
if (inject_fault)
throw Exception("Copy fault injection is activated", ErrorCodes::UNFINISHED);
-
- try
- {
- String query_deduplicate_ast_string;
- if (!task_table.isReplicatedTable())
- {
- query_deduplicate_ast_string += " OPTIMIZE TABLE " + getQuotedTable(original_table) +
- ((partition_name == "'all'") ? " PARTITION ID " : " PARTITION ") + partition_name + " DEDUPLICATE;";
-
- LOG_DEBUG(log, "Executing OPTIMIZE DEDUPLICATE query: {}", query_deduplicate_ast_string);
-
- UInt64 num_nodes = executeQueryOnCluster(
- task_table.cluster_push,
- query_deduplicate_ast_string,
- task_cluster->settings_push,
- PoolMode::GET_MANY);
-
- LOG_INFO(log, "Number of shard that executed OPTIMIZE DEDUPLICATE query successfully : {}", toString(num_nodes));
- }
- }
- catch (...)
- {
- LOG_DEBUG(log, "Error while executing OPTIMIZE DEDUPLICATE partition {}in the original table", partition_name);
- throw;
- }
}
/// Create node to signal that we finished moving
{
String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
zookeeper->set(current_partition_attach_is_done, state_finished, 0);
+ /// Also increment a counter of processed partitions
+ while (true)
+ {
+ Coordination::Stat stat;
+ auto status_json = zookeeper->get(task_zookeeper_path + "/status", &stat);
+ auto statuses = StatusAccumulator::fromJSON(status_json);
+
+ /// Increment status for table.
+ auto status_for_table = (*statuses)[task_table.name_in_config];
+ status_for_table.processed_partitions_count += 1;
+ (*statuses)[task_table.name_in_config] = status_for_table;
+
+ auto statuses_to_commit = StatusAccumulator::serializeToJSON(statuses);
+ auto error = zookeeper->trySet(task_zookeeper_path + "/status", statuses_to_commit, stat.version, &stat);
+ if (error == Coordination::Error::ZOK)
+ break;
+ }
}
return TaskStatus::Finished;
}
-/// Removes MATERIALIZED and ALIAS columns from create table query
-ASTPtr ClusterCopier::removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast)
+/// This is needed to create internal Distributed table
+/// Removes column's TTL expression from `CREATE` query
+/// Removes MATEREALIZED or ALIAS columns not to copy additional and useless data over the network.
+/// Removes data skipping indices.
+ASTPtr ClusterCopier::removeAliasMaterializedAndTTLColumnsFromCreateQuery(const ASTPtr & query_ast, bool allow_to_copy_alias_and_materialized_columns)
{
const ASTs & column_asts = query_ast->as().columns_list->columns->children;
auto new_columns = std::make_shared();
@@ -697,14 +699,21 @@ ASTPtr ClusterCopier::removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast
{
const auto & column = column_ast->as();
- if (!column.default_specifier.empty())
+ /// Skip this columns
+ if (!column.default_specifier.empty() && !allow_to_copy_alias_and_materialized_columns)
{
ColumnDefaultKind kind = columnDefaultKindFromString(column.default_specifier);
if (kind == ColumnDefaultKind::Materialized || kind == ColumnDefaultKind::Alias)
continue;
}
- new_columns->children.emplace_back(column_ast->clone());
+ /// Remove TTL on columns definition.
+ auto new_column_ast = column_ast->clone();
+ auto & new_column = new_column_ast->as();
+ if (new_column.ttl)
+ new_column.ttl.reset();
+
+ new_columns->children.emplace_back(new_column_ast);
}
ASTPtr new_query_ast = query_ast->clone();
@@ -712,10 +721,8 @@ ASTPtr ClusterCopier::removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast
auto new_columns_list = std::make_shared();
new_columns_list->set(new_columns_list->columns, new_columns);
- if (const auto * indices = query_ast->as()->columns_list->indices)
- new_columns_list->set(new_columns_list->indices, indices->clone());
- if (const auto * projections = query_ast->as()->columns_list->projections)
- new_columns_list->set(new_columns_list->projections, projections->clone());
+
+ /// Skip indices and projections are not needed, because distributed table doesn't support it.
new_query.replace(new_query.columns_list, new_columns_list);
@@ -739,6 +746,8 @@ std::shared_ptr rewriteCreateQueryStorage(const ASTPtr & create_
res->children.clear();
res->set(res->columns_list, create.columns_list->clone());
res->set(res->storage, new_storage_ast->clone());
+ /// Just to make it better and don't store additional flag like `is_table_created` somewhere else
+ res->if_not_exists = true;
return res;
}
@@ -771,7 +780,7 @@ bool ClusterCopier::tryDropPartitionPiece(
{
if (e.code == Coordination::Error::ZNODEEXISTS)
{
- LOG_DEBUG(log, "Partition {} piece {} is cleaning now by somebody, sleep", task_partition.name, toString(current_piece_number));
+ LOG_INFO(log, "Partition {} piece {} is cleaning now by somebody, sleep", task_partition.name, toString(current_piece_number));
std::this_thread::sleep_for(default_sleep_time);
return false;
}
@@ -784,7 +793,7 @@ bool ClusterCopier::tryDropPartitionPiece(
{
if (stat.numChildren != 0)
{
- LOG_DEBUG(log, "Partition {} contains {} active workers while trying to drop it. Going to sleep.", task_partition.name, stat.numChildren);
+ LOG_INFO(log, "Partition {} contains {} active workers while trying to drop it. Going to sleep.", task_partition.name, stat.numChildren);
std::this_thread::sleep_for(default_sleep_time);
return false;
}
@@ -804,7 +813,7 @@ bool ClusterCopier::tryDropPartitionPiece(
{
if (e.code == Coordination::Error::ZNODEEXISTS)
{
- LOG_DEBUG(log, "Partition {} is being filled now by somebody, sleep", task_partition.name);
+ LOG_INFO(log, "Partition {} is being filled now by somebody, sleep", task_partition.name);
return false;
}
@@ -842,12 +851,11 @@ bool ClusterCopier::tryDropPartitionPiece(
/// It is important, DROP PARTITION must be done synchronously
settings_push.replication_alter_partitions_sync = 2;
- LOG_DEBUG(log, "Execute distributed DROP PARTITION: {}", query);
+ LOG_INFO(log, "Execute distributed DROP PARTITION: {}", query);
/// We have to drop partition_piece on each replica
size_t num_shards = executeQueryOnCluster(
cluster_push, query,
settings_push,
- PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(log, "DROP PARTITION was successfully executed on {} nodes of a cluster.", num_shards);
@@ -863,7 +871,7 @@ bool ClusterCopier::tryDropPartitionPiece(
}
else
{
- LOG_DEBUG(log, "Clean state is altered when dropping the partition, cowardly bailing");
+ LOG_INFO(log, "Clean state is altered when dropping the partition, cowardly bailing");
/// clean state is stale
return false;
}
@@ -889,6 +897,31 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
LOG_WARNING(log, "Create destination Tale Failed ");
return false;
}
+
+ /// Set all_partitions_count for table in Zookeeper
+ auto zookeeper = getContext()->getZooKeeper();
+ while (true)
+ {
+ Coordination::Stat stat;
+ auto status_json = zookeeper->get(task_zookeeper_path + "/status", &stat);
+ auto statuses = StatusAccumulator::fromJSON(status_json);
+
+ /// Exit if someone already set the initial value for this table.
+ if (statuses->find(task_table.name_in_config) != statuses->end())
+ break;
+ (*statuses)[task_table.name_in_config] = StatusAccumulator::TableStatus
+ {
+ /*all_partitions_count=*/task_table.ordered_partition_names.size(),
+ /*processed_partition_count=*/0
+ };
+
+ auto statuses_to_commit = StatusAccumulator::serializeToJSON(statuses);
+ auto error = zookeeper->trySet(task_zookeeper_path + "/status", statuses_to_commit, stat.version);
+ if (error == Coordination::Error::ZOK)
+ break;
+ }
+
+
/// An heuristic: if previous shard is already done, then check next one without sleeps due to max_workers constraint
bool previous_shard_is_instantly_finished = false;
@@ -907,7 +940,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
++cluster_partition.total_tries;
- LOG_DEBUG(log, "Processing partition {} for the whole cluster", partition_name);
+ LOG_INFO(log, "Processing partition {} for the whole cluster", partition_name);
/// Process each source shard having current partition and copy current partition
/// NOTE: shards are sorted by "distance" to current host
@@ -929,7 +962,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
{
const size_t number_of_splits = task_table.number_of_splits;
shard->partition_tasks.emplace(partition_name, ShardPartition(*shard, partition_name, number_of_splits));
- LOG_DEBUG(log, "Discovered partition {} in shard {}", partition_name, shard->getDescription());
+ LOG_INFO(log, "Discovered partition {} in shard {}", partition_name, shard->getDescription());
/// To save references in the future.
auto shard_partition_it = shard->partition_tasks.find(partition_name);
PartitionPieces & shard_partition_pieces = shard_partition_it->second.pieces;
@@ -942,7 +975,7 @@ bool ClusterCopier::tryProcessTable(const ConnectionTimeouts & timeouts, TaskTab
}
else
{
- LOG_DEBUG(log, "Found that shard {} does not contain current partition {}", shard->getDescription(), partition_name);
+ LOG_INFO(log, "Found that shard {} does not contain current partition {}", shard->getDescription(), partition_name);
continue;
}
}
@@ -1100,18 +1133,14 @@ TaskStatus ClusterCopier::tryCreateDestinationTable(const ConnectionTimeouts & t
InterpreterCreateQuery::prepareOnClusterQuery(create, getContext(), task_table.cluster_push_name);
String query = queryToString(create_query_push_ast);
- LOG_DEBUG(log, "Create destination tables. Query: {}", query);
- UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
+ LOG_INFO(log, "Create destination tables. Query: \n {}", query);
+ UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(
log,
"Destination tables {} have been created on {} shards of {}",
getQuotedTable(task_table.table_push),
shards,
task_table.cluster_push->getShardCount());
- if (shards != task_table.cluster_push->getShardCount())
- {
- return TaskStatus::Error;
- }
}
catch (...)
{
@@ -1226,17 +1255,17 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
auto create_is_dirty_node = [&] (const CleanStateClock & clock)
{
if (clock.is_stale())
- LOG_DEBUG(log, "Clean state clock is stale while setting dirty flag, cowardly bailing");
+ LOG_INFO(log, "Clean state clock is stale while setting dirty flag, cowardly bailing");
else if (!clock.is_clean())
- LOG_DEBUG(log, "Thank you, Captain Obvious");
+ LOG_INFO(log, "Thank you, Captain Obvious");
else if (clock.discovery_version)
{
- LOG_DEBUG(log, "Updating clean state clock");
+ LOG_INFO(log, "Updating clean state clock");
zookeeper->set(piece_is_dirty_flag_path, host_id, clock.discovery_version.value());
}
else
{
- LOG_DEBUG(log, "Creating clean state clock");
+ LOG_INFO(log, "Creating clean state clock");
zookeeper->create(piece_is_dirty_flag_path, host_id, zkutil::CreateMode::Persistent);
}
};
@@ -1262,6 +1291,8 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
if (!limit.empty())
query += " LIMIT " + limit;
+ query += "FORMAT Native";
+
ParserQuery p_query(query.data() + query.size());
const auto & settings = getContext()->getSettingsRef();
@@ -1271,7 +1302,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
/// Load balancing
auto worker_node_holder = createTaskWorkerNodeAndWaitIfNeed(zookeeper, current_task_piece_status_path, is_unprioritized_task);
- LOG_DEBUG(log, "Processing {}", current_task_piece_status_path);
+ LOG_INFO(log, "Processing {}", current_task_piece_status_path);
const String piece_status_path = partition_piece.getPartitionPieceShardsPath();
@@ -1282,12 +1313,12 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
/// Do not start if partition piece is dirty, try to clean it
if (is_clean)
{
- LOG_DEBUG(log, "Partition {} piece {} appears to be clean", task_partition.name, current_piece_number);
+ LOG_INFO(log, "Partition {} piece {} appears to be clean", task_partition.name, current_piece_number);
zookeeper->createAncestors(current_task_piece_status_path);
}
else
{
- LOG_DEBUG(log, "Partition {} piece {} is dirty, try to drop it", task_partition.name, current_piece_number);
+ LOG_INFO(log, "Partition {} piece {} is dirty, try to drop it", task_partition.name, current_piece_number);
try
{
@@ -1312,7 +1343,7 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
{
if (e.code == Coordination::Error::ZNODEEXISTS)
{
- LOG_DEBUG(log, "Someone is already processing {}", current_task_piece_is_active_path);
+ LOG_INFO(log, "Someone is already processing {}", current_task_piece_is_active_path);
return TaskStatus::Active;
}
@@ -1328,13 +1359,13 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
TaskStateWithOwner status = TaskStateWithOwner::fromString(status_data);
if (status.state == TaskState::Finished)
{
- LOG_DEBUG(log, "Task {} has been successfully executed by {}", current_task_piece_status_path, status.owner);
+ LOG_INFO(log, "Task {} has been successfully executed by {}", current_task_piece_status_path, status.owner);
return TaskStatus::Finished;
}
/// Task is abandoned, because previously we created ephemeral node, possibly in other copier's process.
/// Initialize DROP PARTITION
- LOG_DEBUG(log, "Task {} has not been successfully finished by {}. Partition will be dropped and refilled.", current_task_piece_status_path, status.owner);
+ LOG_INFO(log, "Task {} has not been successfully finished by {}. Partition will be dropped and refilled.", current_task_piece_status_path, status.owner);
create_is_dirty_node(clean_state_clock);
return TaskStatus::Error;
@@ -1342,6 +1373,47 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
}
+ /// Try create table (if not exists) on each shard
+ /// We have to create this table even in case that partition piece is empty
+ /// This is significant, because we will have simpler code
+ {
+ /// 1) Get columns description from any replica of destination cluster
+ /// 2) Change ENGINE, database and table name
+ /// 3) Create helping table on the whole destination cluster
+ auto & settings_push = task_cluster->settings_push;
+
+ auto connection = task_table.cluster_push->getAnyShardInfo().pool->get(timeouts, &settings_push, true);
+ String create_query = getRemoteCreateTable(task_shard.task_table.table_push, *connection, settings_push);
+
+ ParserCreateQuery parser_create_query;
+ auto create_query_ast = parseQuery(parser_create_query, create_query, settings_push.max_query_size, settings_push.max_parser_depth);
+ /// Define helping table database and name for current partition piece
+ DatabaseAndTableName database_and_table_for_current_piece
+ {
+ task_table.table_push.first,
+ task_table.table_push.second + "_piece_" + toString(current_piece_number)
+ };
+
+
+ auto new_engine_push_ast = task_table.engine_push_ast;
+ if (task_table.isReplicatedTable())
+ new_engine_push_ast = task_table.rewriteReplicatedCreateQueryToPlain();
+
+ /// Take columns definition from destination table, new database and table name, and new engine (non replicated variant of MergeTree)
+ auto create_query_push_ast = rewriteCreateQueryStorage(create_query_ast, database_and_table_for_current_piece, new_engine_push_ast);
+ String query = queryToString(create_query_push_ast);
+
+ LOG_INFO(log, "Create destination tables. Query: \n {}", query);
+ UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, ClusterExecutionMode::ON_EACH_NODE);
+ LOG_INFO(
+ log,
+ "Destination tables {} have been created on {} shards of {}",
+ getQuotedTable(task_table.table_push),
+ shards,
+ task_table.cluster_push->getShardCount());
+ }
+
+
/// Exit if current piece is absent on this shard. Also mark it as finished, because we will check
/// whether each shard have processed each partitition (and its pieces).
if (partition_piece.is_absent_piece)
@@ -1349,9 +1421,9 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
String state_finished = TaskStateWithOwner::getData(TaskState::Finished, host_id);
auto res = zookeeper->tryCreate(current_task_piece_status_path, state_finished, zkutil::CreateMode::Persistent);
if (res == Coordination::Error::ZNODEEXISTS)
- LOG_DEBUG(log, "Partition {} piece {} is absent on current replica of a shard. But other replicas have already marked it as done.", task_partition.name, current_piece_number);
+ LOG_INFO(log, "Partition {} piece {} is absent on current replica of a shard. But other replicas have already marked it as done.", task_partition.name, current_piece_number);
if (res == Coordination::Error::ZOK)
- LOG_DEBUG(log, "Partition {} piece {} is absent on current replica of a shard. Will mark it as done. Other replicas will do the same.", task_partition.name, current_piece_number);
+ LOG_INFO(log, "Partition {} piece {} is absent on current replica of a shard. Will mark it as done. Other replicas will do the same.", task_partition.name, current_piece_number);
return TaskStatus::Finished;
}
@@ -1415,40 +1487,6 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
zookeeper->create(current_task_piece_status_path, start_state, zkutil::CreateMode::Persistent);
}
- /// Try create table (if not exists) on each shard
- {
- /// Define push table for current partition piece
- auto database_and_table_for_current_piece= std::pair(
- task_table.table_push.first,
- task_table.table_push.second + "_piece_" + toString(current_piece_number));
-
- auto new_engine_push_ast = task_table.engine_push_ast;
- if (task_table.isReplicatedTable())
- {
- new_engine_push_ast = task_table.rewriteReplicatedCreateQueryToPlain();
- }
-
- auto create_query_push_ast = rewriteCreateQueryStorage(
- task_shard.current_pull_table_create_query,
- database_and_table_for_current_piece, new_engine_push_ast);
-
- create_query_push_ast->as().if_not_exists = true;
- String query = queryToString(create_query_push_ast);
-
- LOG_DEBUG(log, "Create destination tables. Query: {}", query);
- UInt64 shards = executeQueryOnCluster(task_table.cluster_push, query, task_cluster->settings_push, PoolMode::GET_MANY);
- LOG_INFO(
- log,
- "Destination tables {} have been created on {} shards of {}",
- getQuotedTable(task_table.table_push),
- shards,
- task_table.cluster_push->getShardCount());
-
- if (shards != task_table.cluster_push->getShardCount())
- {
- return TaskStatus::Error;
- }
- }
/// Do the copying
{
@@ -1462,18 +1500,18 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
// Select all fields
ASTPtr query_select_ast = get_select_query(task_shard.table_read_shard, "*", /*enable_splitting*/ true, inject_fault ? "1" : "");
- LOG_DEBUG(log, "Executing SELECT query and pull from {} : {}", task_shard.getDescription(), queryToString(query_select_ast));
+ LOG_INFO(log, "Executing SELECT query and pull from {} : {}", task_shard.getDescription(), queryToString(query_select_ast));
ASTPtr query_insert_ast;
{
String query;
- query += "INSERT INTO " + getQuotedTable(split_table_for_current_piece) + " VALUES ";
+ query += "INSERT INTO " + getQuotedTable(split_table_for_current_piece) + " FORMAT Native ";
ParserQuery p_query(query.data() + query.size());
const auto & settings = getContext()->getSettingsRef();
query_insert_ast = parseQuery(p_query, query, settings.max_query_size, settings.max_parser_depth);
- LOG_DEBUG(log, "Executing INSERT query: {}", query);
+ LOG_INFO(log, "Executing INSERT query: {}", query);
}
try
@@ -1491,8 +1529,19 @@ TaskStatus ClusterCopier::processPartitionPieceTaskImpl(
BlockIO io_select = InterpreterFactory::get(query_select_ast, context_select)->execute();
BlockIO io_insert = InterpreterFactory::get(query_insert_ast, context_insert)->execute();
- input = io_select.getInputStream();
+ auto pure_input = io_select.getInputStream();
output = io_insert.out;
+
+ /// Add converting actions to make it possible to copy blocks with slightly different schema
+ const auto & select_block = pure_input->getHeader();
+ const auto & insert_block = output->getHeader();
+ auto actions_dag = ActionsDAG::makeConvertingActions(
+ select_block.getColumnsWithTypeAndName(),
+ insert_block.getColumnsWithTypeAndName(),
+ ActionsDAG::MatchColumnsMode::Position);
+ auto actions = std::make_shared(actions_dag, ExpressionActionsSettings::fromContext(getContext()));
+
+ input = std::make_shared(pure_input, actions);
}
/// Fail-fast optimization to abort copying when the current clean state expires
@@ -1600,7 +1649,7 @@ void ClusterCopier::dropLocalTableIfExists(const DatabaseAndTableName & table_na
void ClusterCopier::dropHelpingTablesByPieceNumber(const TaskTable & task_table, size_t current_piece_number)
{
- LOG_DEBUG(log, "Removing helping tables piece {}", current_piece_number);
+ LOG_INFO(log, "Removing helping tables piece {}", current_piece_number);
DatabaseAndTableName original_table = task_table.table_push;
DatabaseAndTableName helping_table
@@ -1611,17 +1660,17 @@ void ClusterCopier::dropHelpingTablesByPieceNumber(const TaskTable & task_table,
const ClusterPtr & cluster_push = task_table.cluster_push;
Settings settings_push = task_cluster->settings_push;
- LOG_DEBUG(log, "Execute distributed DROP TABLE: {}", query);
+ LOG_INFO(log, "Execute distributed DROP TABLE: {}", query);
/// We have to drop partition_piece on each replica
- UInt64 num_nodes = executeQueryOnCluster(cluster_push, query, settings_push, PoolMode::GET_MANY, ClusterExecutionMode::ON_EACH_NODE);
+ UInt64 num_nodes = executeQueryOnCluster(cluster_push, query, settings_push, ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(log, "DROP TABLE query was successfully executed on {} nodes.", toString(num_nodes));
}
void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
{
- LOG_DEBUG(log, "Removing helping tables");
+ LOG_INFO(log, "Removing helping tables");
for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
{
dropHelpingTablesByPieceNumber(task_table, current_piece_number);
@@ -1630,7 +1679,7 @@ void ClusterCopier::dropHelpingTables(const TaskTable & task_table)
void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskTable & task_table, const String & partition_name)
{
- LOG_DEBUG(log, "Try drop partition partition from all helping tables.");
+ LOG_INFO(log, "Try drop partition partition from all helping tables.");
for (size_t current_piece_number = 0; current_piece_number < task_table.number_of_splits; ++current_piece_number)
{
DatabaseAndTableName original_table = task_table.table_push;
@@ -1641,17 +1690,16 @@ void ClusterCopier::dropParticularPartitionPieceFromAllHelpingTables(const TaskT
const ClusterPtr & cluster_push = task_table.cluster_push;
Settings settings_push = task_cluster->settings_push;
- LOG_DEBUG(log, "Execute distributed DROP PARTITION: {}", query);
+ LOG_INFO(log, "Execute distributed DROP PARTITION: {}", query);
/// We have to drop partition_piece on each replica
UInt64 num_nodes = executeQueryOnCluster(
cluster_push, query,
settings_push,
- PoolMode::GET_MANY,
ClusterExecutionMode::ON_EACH_NODE);
LOG_INFO(log, "DROP PARTITION query was successfully executed on {} nodes.", toString(num_nodes));
}
- LOG_DEBUG(log, "All helping tables dropped partition {}", partition_name);
+ LOG_INFO(log, "All helping tables dropped partition {}", partition_name);
}
String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, Connection & connection, const Settings & settings)
@@ -1666,6 +1714,7 @@ String ClusterCopier::getRemoteCreateTable(const DatabaseAndTableName & table, C
return typeid_cast(*block.safeGetByPosition(0).column).getDataAt(0).toString();
}
+
ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & timeouts, TaskShard & task_shard)
{
/// Fetch and parse (possibly) new definition
@@ -1680,6 +1729,7 @@ ASTPtr ClusterCopier::getCreateTableForPullShard(const ConnectionTimeouts & time
return parseQuery(parser_create_query, create_query_pull_str, settings.max_query_size, settings.max_parser_depth);
}
+
/// If it is implicitly asked to create split Distributed table for certain piece on current shard, we will do it.
void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeouts,
TaskShard & task_shard, bool create_split)
@@ -1709,7 +1759,9 @@ void ClusterCopier::createShardInternalTables(const ConnectionTimeouts & timeout
auto storage_shard_ast = createASTStorageDistributed(shard_read_cluster_name, task_table.table_pull.first, task_table.table_pull.second);
- auto create_query_ast = removeAliasColumnsFromCreateQuery(task_shard.current_pull_table_create_query);
+ auto create_query_ast = removeAliasMaterializedAndTTLColumnsFromCreateQuery(
+ task_shard.current_pull_table_create_query,
+ task_table.allow_to_copy_alias_and_materialized_columns);
auto create_table_pull_ast = rewriteCreateQueryStorage(create_query_ast, task_shard.table_read_shard, storage_shard_ast);
dropAndCreateLocalTable(create_table_pull_ast);
@@ -1768,7 +1820,7 @@ std::set ClusterCopier::getShardPartitions(const ConnectionTimeouts & ti
const auto & settings = getContext()->getSettingsRef();
ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
- LOG_DEBUG(log, "Computing destination partition set, executing query: {}", query);
+ LOG_INFO(log, "Computing destination partition set, executing query: \n {}", query);
auto local_context = Context::createCopy(context);
local_context->setSettings(task_cluster->settings_pull);
@@ -1787,7 +1839,7 @@ std::set ClusterCopier::getShardPartitions(const ConnectionTimeouts & ti
}
}
- LOG_DEBUG(log, "There are {} destination partitions in shard {}", res.size(), task_shard.getDescription());
+ LOG_INFO(log, "There are {} destination partitions in shard {}", res.size(), task_shard.getDescription());
return res;
}
@@ -1799,21 +1851,22 @@ bool ClusterCopier::checkShardHasPartition(const ConnectionTimeouts & timeouts,
TaskTable & task_table = task_shard.task_table;
- std::string query = "SELECT 1 FROM " + getQuotedTable(task_shard.table_read_shard)
- + " WHERE (" + queryToString(task_table.engine_push_partition_key_ast) +
- " = (" + partition_quoted_name + " AS partition_key))";
-
+ WriteBufferFromOwnString ss;
+ ss << "SELECT 1 FROM " << getQuotedTable(task_shard.table_read_shard);
+ ss << " WHERE (" << queryToString(task_table.engine_push_partition_key_ast);
+ ss << " = (" + partition_quoted_name << " AS partition_key))";
if (!task_table.where_condition_str.empty())
- query += " AND (" + task_table.where_condition_str + ")";
-
- query += " LIMIT 1";
-
- LOG_DEBUG(log, "Checking shard {} for partition {} existence, executing query: {}", task_shard.getDescription(), partition_quoted_name, query);
+ ss << " AND (" << task_table.where_condition_str << ")";
+ ss << " LIMIT 1";
+ auto query = ss.str();
ParserQuery parser_query(query.data() + query.size());
const auto & settings = getContext()->getSettingsRef();
ASTPtr query_ast = parseQuery(parser_query, query, settings.max_query_size, settings.max_parser_depth);
+ LOG_INFO(log, "Checking shard {} for partition {} existence, executing query: \n {}",
+ task_shard.getDescription(), partition_quoted_name, query_ast->formatForErrorMessage());
+
auto local_context = Context::createCopy(context);
local_context->setSettings(task_cluster->settings_pull);
return InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()->read().rows() != 0;
@@ -1847,7 +1900,7 @@ bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTi
query += " LIMIT 1";
- LOG_DEBUG(log, "Checking shard {} for partition {} piece {} existence, executing query: {}", task_shard.getDescription(), partition_quoted_name, std::to_string(current_piece_number), query);
+ LOG_INFO(log, "Checking shard {} for partition {} piece {} existence, executing query: \n \u001b[36m {}", task_shard.getDescription(), partition_quoted_name, std::to_string(current_piece_number), query);
ParserQuery parser_query(query.data() + query.size());
const auto & settings = getContext()->getSettingsRef();
@@ -1857,12 +1910,13 @@ bool ClusterCopier::checkPresentPartitionPiecesOnCurrentShard(const ConnectionTi
local_context->setSettings(task_cluster->settings_pull);
auto result = InterpreterFactory::get(query_ast, local_context)->execute().getInputStream()->read().rows();
if (result != 0)
- LOG_DEBUG(log, "Partition {} piece number {} is PRESENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
+ LOG_INFO(log, "Partition {} piece number {} is PRESENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
else
- LOG_DEBUG(log, "Partition {} piece number {} is ABSENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
+ LOG_INFO(log, "Partition {} piece number {} is ABSENT on shard {}", partition_quoted_name, std::to_string(current_piece_number), task_shard.getDescription());
return result != 0;
}
+
/** Executes simple query (without output streams, for example DDL queries) on each shard of the cluster
* Returns number of shards for which at least one replica executed query successfully
*/
@@ -1870,112 +1924,69 @@ UInt64 ClusterCopier::executeQueryOnCluster(
const ClusterPtr & cluster,
const String & query,
const Settings & current_settings,
- PoolMode pool_mode,
- ClusterExecutionMode execution_mode,
- UInt64 max_successful_executions_per_shard) const
+ ClusterExecutionMode execution_mode) const
{
- auto num_shards = cluster->getShardsInfo().size();
- std::vector per_shard_num_successful_replicas(num_shards, 0);
-
- ParserQuery p_query(query.data() + query.size());
- ASTPtr query_ast = parseQuery(p_query, query, current_settings.max_query_size, current_settings.max_parser_depth);
-
- /// We will have to execute query on each replica of a shard.
+ ClusterPtr cluster_for_query = cluster;
if (execution_mode == ClusterExecutionMode::ON_EACH_NODE)
- max_successful_executions_per_shard = 0;
+ cluster_for_query = cluster->getClusterWithReplicasAsShards(current_settings);
- std::atomic origin_replicas_number = 0;
+ std::vector> connections;
+ connections.reserve(cluster->getShardCount());
- /// We need to execute query on one replica at least
- auto do_for_shard = [&] (UInt64 shard_index, Settings shard_settings)
+ std::atomic successfully_executed = 0;
+
+ for (const auto & replicas : cluster_for_query->getShardsAddresses())
{
- setThreadName("QueryForShard");
-
- const Cluster::ShardInfo & shard = cluster->getShardsInfo().at(shard_index);
- UInt64 & num_successful_executions = per_shard_num_successful_replicas.at(shard_index);
- num_successful_executions = 0;
-
- auto increment_and_check_exit = [&] () -> bool
+ for (const auto & node : replicas)
{
- ++num_successful_executions;
- return max_successful_executions_per_shard && num_successful_executions >= max_successful_executions_per_shard;
- };
-
- UInt64 num_replicas = cluster->getShardsAddresses().at(shard_index).size();
-
- origin_replicas_number += num_replicas;
- UInt64 num_local_replicas = shard.getLocalNodeCount();
- UInt64 num_remote_replicas = num_replicas - num_local_replicas;
-
- /// In that case we don't have local replicas, but do it just in case
- for (UInt64 i = 0; i < num_local_replicas; ++i)
- {
- auto interpreter = InterpreterFactory::get(query_ast, getContext());
- interpreter->execute();
-
- if (increment_and_check_exit())
- return;
- }
-
- /// Will try to make as many as possible queries
- if (shard.hasRemoteConnections())
- {
- shard_settings.max_parallel_replicas = num_remote_replicas ? num_remote_replicas : 1;
-
- auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(shard_settings).getSaturated(shard_settings.max_execution_time);
- auto connections = shard.pool->getMany(timeouts, &shard_settings, pool_mode);
-
- auto shard_context = Context::createCopy(context);
- shard_context->setSettings(shard_settings);
-
- for (auto & connection : connections)
+ try
{
- if (connection.isNull())
- continue;
+ connections.emplace_back(std::make_shared(
+ node.host_name, node.port, node.default_database,
+ node.user, node.password, node.cluster, node.cluster_secret,
+ "ClusterCopier", node.compression, node.secure
+ ));
+
+ /// We execute only Alter, Create and Drop queries.
+ const auto header = Block{};
+
+ /// For unknown reason global context is passed to IStorage::read() method
+ /// So, task_identifier is passed as constructor argument. It is more obvious.
+ auto remote_query_executor = std::make_shared(
+ *connections.back(), query, header, getContext(),
+ /*throttler=*/nullptr, Scalars(), Tables(), QueryProcessingStage::Complete);
try
{
- /// CREATE TABLE and DROP PARTITION queries return empty block
- RemoteBlockInputStream stream{*connection, query, Block{}, shard_context};
- NullBlockOutputStream output{Block{}};
- copyData(stream, output);
-
- if (increment_and_check_exit())
- return;
+ remote_query_executor->sendQuery();
}
- catch (const Exception &)
+ catch (...)
{
- LOG_INFO(log, getCurrentExceptionMessage(false, true));
+ LOG_WARNING(log, "Seemns like node with address {} is unreachable.", node.host_name);
+ continue;
}
+
+ while (true)
+ {
+ auto block = remote_query_executor->read();
+ if (!block)
+ break;
+ }
+
+ remote_query_executor->finish();
+ ++successfully_executed;
+ break;
+ }
+ catch (...)
+ {
+ LOG_WARNING(log, "An error occurred while processing query : \n {}", query);
+ tryLogCurrentException(log);
+ continue;
}
}
- };
-
- {
- ThreadPool thread_pool(std::min(num_shards, getNumberOfPhysicalCPUCores()));
-
- for (UInt64 shard_index = 0; shard_index < num_shards; ++shard_index)
- thread_pool.scheduleOrThrowOnError([=, shard_settings = current_settings] { do_for_shard(shard_index, std::move(shard_settings)); });
-
- thread_pool.wait();
}
- UInt64 successful_nodes = 0;
- for (UInt64 num_replicas : per_shard_num_successful_replicas)
- {
- if (execution_mode == ClusterExecutionMode::ON_EACH_NODE)
- successful_nodes += num_replicas;
- else
- /// Count only successful shards
- successful_nodes += (num_replicas > 0);
- }
-
- if (execution_mode == ClusterExecutionMode::ON_EACH_NODE && successful_nodes != origin_replicas_number)
- {
- LOG_INFO(log, "There was an error while executing ALTER on each node. Query was executed on {} nodes. But had to be executed on {}", toString(successful_nodes), toString(origin_replicas_number.load()));
- }
-
- return successful_nodes;
+ return successfully_executed.load();
}
}
diff --git a/programs/copier/ClusterCopier.h b/programs/copier/ClusterCopier.h
index 085fa2ece06..387b089724a 100644
--- a/programs/copier/ClusterCopier.h
+++ b/programs/copier/ClusterCopier.h
@@ -18,12 +18,13 @@ public:
ClusterCopier(const String & task_path_,
const String & host_id_,
const String & proxy_database_name_,
- ContextMutablePtr context_)
+ ContextMutablePtr context_,
+ Poco::Logger * log_)
: WithMutableContext(context_),
task_zookeeper_path(task_path_),
host_id(host_id_),
working_database_name(proxy_database_name_),
- log(&Poco::Logger::get("ClusterCopier")) {}
+ log(log_) {}
void init();
@@ -117,14 +118,14 @@ protected:
TaskStatus tryMoveAllPiecesToDestinationTable(const TaskTable & task_table, const String & partition_name);
/// Removes MATERIALIZED and ALIAS columns from create table query
- static ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast);
+ static ASTPtr removeAliasMaterializedAndTTLColumnsFromCreateQuery(const ASTPtr & query_ast, bool allow_to_copy_alias_and_materialized_columns);
bool tryDropPartitionPiece(ShardPartition & task_partition, size_t current_piece_number,
const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock);
static constexpr UInt64 max_table_tries = 3;
static constexpr UInt64 max_shard_partition_tries = 3;
- static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 3;
+ static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 10;
bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
@@ -189,9 +190,7 @@ protected:
const ClusterPtr & cluster,
const String & query,
const Settings & current_settings,
- PoolMode pool_mode = PoolMode::GET_ALL,
- ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD,
- UInt64 max_successful_executions_per_shard = 0) const;
+ ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD) const;
private:
String task_zookeeper_path;
@@ -208,7 +207,6 @@ private:
ConfigurationPtr task_cluster_initial_config;
ConfigurationPtr task_cluster_current_config;
- Coordination::Stat task_description_current_stat{};
std::unique_ptr task_cluster;
diff --git a/programs/copier/ClusterCopierApp.cpp b/programs/copier/ClusterCopierApp.cpp
index 8925ab63f99..7a0b81309b0 100644
--- a/programs/copier/ClusterCopierApp.cpp
+++ b/programs/copier/ClusterCopierApp.cpp
@@ -22,8 +22,9 @@ void ClusterCopierApp::initialize(Poco::Util::Application & self)
config_xml_path = config().getString("config-file");
task_path = config().getString("task-path");
- log_level = config().getString("log-level", "trace");
+ log_level = config().getString("log-level", "info");
is_safe_mode = config().has("safe-mode");
+ is_status_mode = config().has("status");
if (config().has("copy-fault-probability"))
copy_fault_probability = std::max(std::min(config().getDouble("copy-fault-probability"), 1.0), 0.0);
if (config().has("move-fault-probability"))
@@ -97,6 +98,7 @@ void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
.argument("base-dir").binding("base-dir"));
options.addOption(Poco::Util::Option("experimental-use-sample-offset", "", "Use SAMPLE OFFSET query instead of cityHash64(PRIMARY KEY) % n == k")
.argument("experimental-use-sample-offset").binding("experimental-use-sample-offset"));
+ options.addOption(Poco::Util::Option("status", "", "Get for status for current execution").binding("status"));
using Me = std::decay_t;
options.addOption(Poco::Util::Option("help", "", "produce this help message").binding("help")
@@ -106,6 +108,25 @@ void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
void ClusterCopierApp::mainImpl()
{
+ /// Status command
+ {
+ if (is_status_mode)
+ {
+ SharedContextHolder shared_context = Context::createShared();
+ auto context = Context::createGlobal(shared_context.get());
+ context->makeGlobalContext();
+ SCOPE_EXIT_SAFE(context->shutdown());
+
+ auto zookeeper = context->getZooKeeper();
+ auto status_json = zookeeper->get(task_path + "/status");
+
+ LOG_INFO(&logger(), "{}", status_json);
+ std::cout << status_json << std::endl;
+
+ context->resetZooKeeper();
+ return;
+ }
+ }
StatusFile status_file(process_path + "/status", StatusFile::write_full_info);
ThreadStatus thread_status;
@@ -136,7 +157,7 @@ void ClusterCopierApp::mainImpl()
/// Initialize query scope just in case.
CurrentThread::QueryScope query_scope(context);
- auto copier = std::make_unique(task_path, host_id, default_database, context);
+ auto copier = std::make_unique(task_path, host_id, default_database, context, log);
copier->setSafeMode(is_safe_mode);
copier->setCopyFaultProbability(copy_fault_probability);
copier->setMoveFaultProbability(move_fault_probability);
diff --git a/programs/copier/ClusterCopierApp.h b/programs/copier/ClusterCopierApp.h
index 257b10cf196..cce07e338c0 100644
--- a/programs/copier/ClusterCopierApp.h
+++ b/programs/copier/ClusterCopierApp.h
@@ -76,8 +76,9 @@ private:
std::string config_xml_path;
std::string task_path;
- std::string log_level = "trace";
+ std::string log_level = "info";
bool is_safe_mode = false;
+ bool is_status_mode = false;
double copy_fault_probability = 0.0;
double move_fault_probability = 0.0;
bool is_help = false;
diff --git a/programs/copier/StatusAccumulator.h b/programs/copier/StatusAccumulator.h
new file mode 100644
index 00000000000..6e20e3dc95d
--- /dev/null
+++ b/programs/copier/StatusAccumulator.h
@@ -0,0 +1,65 @@
+#pragma once
+
+
+#include
+#include
+#include
+#include
+
+#include
+#include
+#include
+#include
+
+namespace DB
+{
+
+class StatusAccumulator
+{
+ public:
+ struct TableStatus
+ {
+ size_t all_partitions_count;
+ size_t processed_partitions_count;
+ };
+
+ using Map = std::unordered_map;
+ using MapPtr = std::shared_ptr