Merge branch 'master' into ast-table-identifier-2

This commit is contained in:
Ivan 2021-06-10 16:43:22 +03:00 committed by GitHub
commit a7fa4e641c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
150 changed files with 3475 additions and 811 deletions

5
.gitignore vendored
View File

@ -14,6 +14,11 @@
/build-*
/tests/venv
# logs
*.log
*.stderr
*.stdout
/docs/build
/docs/publish
/docs/edit

View File

@ -2,6 +2,7 @@
#### Upgrade Notes
* Do not upgrade if you have partition key with `UUID`.
* `zstd` compression library is updated to v1.5.0. You may get messages about "checksum does not match" in replication. These messages are expected due to update of compression algorithm and you can ignore them. These messages are informational and do not indicate any kinds of undesired behaviour.
* The setting `compile_expressions` is enabled by default. Although it has been heavily tested on variety of scenarios, if you find some undesired behaviour on your servers, you can try turning this setting off.
* Values of `UUID` type cannot be compared with integer. For example, instead of writing `uuid != 0` type `uuid != '00000000-0000-0000-0000-000000000000'`.
@ -763,6 +764,7 @@
* Allow using extended integer types (`Int128`, `Int256`, `UInt256`) in `avg` and `avgWeighted` functions. Also allow using different types (integer, decimal, floating point) for value and for weight in `avgWeighted` function. This is a backward-incompatible change: now the `avg` and `avgWeighted` functions always return `Float64` (as documented). Before this change the return type for `Decimal` arguments was also `Decimal`. [#15419](https://github.com/ClickHouse/ClickHouse/pull/15419) ([Mike](https://github.com/myrrc)).
* Expression `toUUID(N)` no longer works. Replace with `toUUID('00000000-0000-0000-0000-000000000000')`. This change is motivated by non-obvious results of `toUUID(N)` where N is non zero.
* SSL Certificates with incorrect "key usage" are rejected. In previous versions they are used to work. See [#19262](https://github.com/ClickHouse/ClickHouse/issues/19262).
* `incl` references to substitutions file (`/etc/metrika.xml`) were removed from the default config (`<remote_servers>`, `<zookeeper>`, `<macros>`, `<compression>`, `<networks>`). If you were using substitutions file and were relying on those implicit references, you should put them back manually and explicitly by adding corresponding sections with `incl="..."` attributes before the update. See [#18740](https://github.com/ClickHouse/ClickHouse/pull/18740) ([alexey-milovidov](https://github.com/alexey-milovidov)).
#### New Feature

View File

@ -4,12 +4,14 @@
#include <Core/Block.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/TextLog.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <sys/time.h>
#include <Poco/Message.h>
#include <Common/CurrentThread.h>
#include <Common/DNSResolver.h>
#include <common/getThreadId.h>
#include <Common/SensitiveDataMasker.h>
#include <Common/IO.h>
namespace DB
{
@ -26,16 +28,48 @@ void OwnSplitChannel::log(const Poco::Message & msg)
auto matches = masker->wipeSensitiveData(message_text);
if (matches > 0)
{
logSplit({msg, message_text}); // we will continue with the copy of original message with text modified
tryLogSplit({msg, message_text}); // we will continue with the copy of original message with text modified
return;
}
}
logSplit(msg);
tryLogSplit(msg);
}
void OwnSplitChannel::tryLogSplit(const Poco::Message & msg)
{
try
{
logSplit(msg);
}
/// It is better to catch the errors here in order to avoid
/// breaking some functionality because of unexpected "File not
/// found" (or similar) error.
///
/// For example StorageDistributedDirectoryMonitor will mark batch
/// as broken, some MergeTree code can also be affected.
///
/// Also note, that we cannot log the exception here, since this
/// will lead to recursion, using regular tryLogCurrentException().
/// but let's log it into the stderr at least.
catch (...)
{
MemoryTracker::LockExceptionInThread lock_memory_tracker(VariableContext::Global);
const std::string & exception_message = getCurrentExceptionMessage(true);
const std::string & message = msg.getText();
/// NOTE: errors are ignored, since nothing can be done.
writeRetry(STDERR_FILENO, "Cannot add message to the log: ");
writeRetry(STDERR_FILENO, message.data(), message.size());
writeRetry(STDERR_FILENO, "\n");
writeRetry(STDERR_FILENO, exception_message.data(), exception_message.size());
writeRetry(STDERR_FILENO, "\n");
}
}
void OwnSplitChannel::logSplit(const Poco::Message & msg)
{
ExtendedLogMessage msg_ext = ExtendedLogMessage::getFrom(msg);

View File

@ -24,6 +24,7 @@ public:
private:
void logSplit(const Poco::Message & msg);
void tryLogSplit(const Poco::Message & msg);
using ChannelPtr = Poco::AutoPtr<Poco::Channel>;
/// Handler and its pointer casted to extended interface

View File

@ -1,6 +1,6 @@
if (SANITIZE OR NOT (
((OS_LINUX OR OS_FREEBSD) AND (ARCH_AMD64 OR ARCH_ARM OR ARCH_PPC64LE)) OR
(OS_DARWIN AND CMAKE_BUILD_TYPE STREQUAL "RelWithDebInfo")
(OS_DARWIN AND (CMAKE_BUILD_TYPE STREQUAL "RelWithDebInfo" OR CMAKE_BUILD_TYPE STREQUAL "Debug"))
))
if (ENABLE_JEMALLOC)
message (${RECONFIGURE_MESSAGE_LEVEL}

View File

@ -1 +1 @@
#*/10 * * * * root (which service > /dev/null 2>&1 && (service clickhouse-server condstart ||:)) || /etc/init.d/clickhouse-server condstart > /dev/null 2>&1
#*/10 * * * * root ((which service > /dev/null 2>&1 && (service clickhouse-server condstart ||:)) || /etc/init.d/clickhouse-server condstart) > /dev/null 2>&1

View File

@ -229,6 +229,7 @@ status()
case "$1" in
status)
status
exit 0
;;
esac

View File

@ -97,14 +97,10 @@ function fuzz
NEW_TESTS_OPT="${NEW_TESTS_OPT:-}"
fi
export CLICKHOUSE_WATCHDOG_ENABLE=0 # interferes with gdb
clickhouse-server --config-file db/config.xml -- --path db 2>&1 | tail -100000 > server.log &
server_pid=$!
kill -0 $server_pid
while ! clickhouse-client --query "select 1" && kill -0 $server_pid ; do echo . ; sleep 1 ; done
clickhouse-client --query "select 1"
kill -0 $server_pid
echo Server started
echo "
handle all noprint
@ -115,12 +111,31 @@ thread apply all backtrace
continue
" > script.gdb
gdb -batch -command script.gdb -p "$(pidof clickhouse-server)" &
gdb -batch -command script.gdb -p $server_pid &
# Check connectivity after we attach gdb, because it might cause the server
# to freeze and the fuzzer will fail.
for _ in {1..60}
do
sleep 1
if clickhouse-client --query "select 1"
then
break
fi
done
clickhouse-client --query "select 1" # This checks that the server is responding
kill -0 $server_pid # This checks that it is our server that is started and not some other one
echo Server started and responded
# SC2012: Use find instead of ls to better handle non-alphanumeric filenames. They are all alphanumeric.
# SC2046: Quote this to prevent word splitting. Actually I need word splitting.
# shellcheck disable=SC2012,SC2046
clickhouse-client --query-fuzzer-runs=1000 --queries-file $(ls -1 ch/tests/queries/0_stateless/*.sql | sort -R) $NEW_TESTS_OPT \
clickhouse-client \
--receive_timeout=10 \
--receive_data_timeout_ms=10000 \
--query-fuzzer-runs=1000 \
--queries-file $(ls -1 ch/tests/queries/0_stateless/*.sql | sort -R) \
$NEW_TESTS_OPT \
> >(tail -n 100000 > fuzzer.log) \
2>&1 &
fuzzer_pid=$!
@ -198,13 +213,17 @@ continue
echo "success" > status.txt
echo "OK" > description.txt
else
# The server was alive, but the fuzzer returned some error. Probably this
# is a problem in the fuzzer itself. Don't grep the server log in this
# case, because we will find a message about normal server termination
# (Received signal 15), which is confusing.
# The server was alive, but the fuzzer returned some error. This might
# be some client-side error detected by fuzzing, or a problem in the
# fuzzer itself. Don't grep the server log in this case, because we will
# find a message about normal server termination (Received signal 15),
# which is confusing.
task_exit_code=$fuzzer_exit_code
echo "failure" > status.txt
echo "Fuzzer failed ($fuzzer_exit_code). See the logs." > description.txt
{ grep -o "Found error:.*" fuzzer.log \
|| grep -o "Exception.*" fuzzer.log \
|| echo "Fuzzer failed ($fuzzer_exit_code). See the logs." ; } \
| tail -1 > description.txt
fi
}

View File

@ -554,12 +554,6 @@ create table query_metric_stats_denorm engine File(TSVWithNamesAndTypes,
" 2> >(tee -a analyze/errors.log 1>&2)
# Fetch historical query variability thresholds from the CI database
clickhouse-local --query "
left join file('analyze/report-thresholds.tsv', TSV,
'test text, report_threshold float') thresholds
on query_metric_stats.test = thresholds.test
"
if [ -v CHPC_DATABASE_URL ]
then
set +x # Don't show password in the log
@ -577,7 +571,8 @@ then
--date_time_input_format=best_effort)
# Precision is going to be 1.5 times worse for PRs. How do I know it? I ran this:
# Precision is going to be 1.5 times worse for PRs, because we run the queries
# less times. How do I know it? I ran this:
# SELECT quantilesExact(0., 0.1, 0.5, 0.75, 0.95, 1.)(p / m)
# FROM
# (
@ -592,19 +587,27 @@ then
# query_display_name
# HAVING count(*) > 100
# )
# The file can be empty if the server is inaccessible, so we can't use TSVWithNamesAndTypes.
#
# The file can be empty if the server is inaccessible, so we can't use
# TSVWithNamesAndTypes.
#
"${client[@]}" --query "
select test, query_index,
quantileExact(0.99)(abs(diff)) max_diff,
quantileExactIf(0.99)(stat_threshold, abs(diff) < stat_threshold) * 1.5 max_stat_threshold,
quantileExact(0.99)(abs(diff)) * 1.5 AS max_diff,
quantileExactIf(0.99)(stat_threshold, abs(diff) < stat_threshold) * 1.5 AS max_stat_threshold,
query_display_name
from query_metrics_v2
where event_date > now() - interval 1 month
-- We use results at least one week in the past, so that the current
-- changes do not immediately influence the statistics, and we have
-- some time to notice that something is wrong.
where event_date between now() - interval 1 month - interval 1 week
and now() - interval 1 week
and metric = 'client_time'
and pr_number = 0
group by test, query_index, query_display_name
having count(*) > 100
" > analyze/historical-thresholds.tsv
set -x
else
touch analyze/historical-thresholds.tsv
fi
@ -1224,6 +1227,55 @@ unset IFS
function upload_results
{
# Prepare info for the CI checks table.
rm ci-checks.tsv
clickhouse-local --query "
create view queries as select * from file('report/queries.tsv', TSVWithNamesAndTypes,
'changed_fail int, changed_show int, unstable_fail int, unstable_show int,
left float, right float, diff float, stat_threshold float,
test text, query_index int, query_display_name text');
create table ci_checks engine File(TSVWithNamesAndTypes, 'ci-checks.tsv')
as select
$PR_TO_TEST pull_request_number,
'$SHA_TO_TEST' commit_sha,
'Performance' check_name,
'$(sed -n 's/.*<!--status: \(.*\)-->/\1/p' report.html)' check_status,
-- TODO toDateTime() can't parse output of 'date', so no time for now.
($(date +%s) - $CHPC_CHECK_START_TIMESTAMP) * 1000 check_duration_ms,
fromUnixTimestamp($CHPC_CHECK_START_TIMESTAMP) check_start_time,
test_name,
test_status,
test_duration_ms,
report_url,
$PR_TO_TEST = 0
? 'https://github.com/ClickHouse/ClickHouse/commit/$SHA_TO_TEST'
: 'https://github.com/ClickHouse/ClickHouse/pull/$PR_TO_TEST' pull_request_url,
'' commit_url,
'' task_url,
'' base_ref,
'' base_repo,
'' head_ref,
'' head_repo
from (
select '' test_name,
'$(sed -n 's/.*<!--message: \(.*\)-->/\1/p' report.html)' test_status,
0 test_duration_ms,
'https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#fail1' report_url
union all
select test || ' #' || toString(query_index), 'slower' test_status, 0 test_duration_ms,
'https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#changes-in-performance.'
|| test || '.' || toString(query_index) report_url
from queries where changed_fail != 0 and diff > 0
union all
select test || ' #' || toString(query_index), 'unstable' test_status, 0 test_duration_ms,
'https://clickhouse-test-reports.s3.yandex.net/$PR_TO_TEST/$SHA_TO_TEST/performance_comparison/report.html#unstable-queries.'
|| test || '.' || toString(query_index) report_url
from queries where unstable_fail != 0
)
;
"
if ! [ -v CHPC_DATABASE_URL ]
then
echo Database for test results is not specified, will not upload them.
@ -1292,6 +1344,10 @@ $REF_SHA $SHA_TO_TEST $(numactl --show | sed -n 's/^cpubind:[[:space:]]\+/numact
$REF_SHA $SHA_TO_TEST $(numactl --hardware | sed -n 's/^available:[[:space:]]\+/numactl-available /p')
EOF
# Also insert some data about the check into the CI checks table.
"${client[@]}" --query "INSERT INTO "'"'"gh-data"'"'".checks FORMAT TSVWithNamesAndTypes" \
< ci-checks.tsv
set -x
}

View File

@ -1,6 +1,9 @@
#!/bin/bash
set -ex
CHPC_CHECK_START_TIMESTAMP="$(date +%s)"
export CHPC_CHECK_START_TIMESTAMP
# Use the packaged repository to find the revision we will compare to.
function find_reference_sha
{

View File

@ -561,8 +561,9 @@ if args.report == 'main':
# Don't show mildly unstable queries, only the very unstable ones we
# treat as errors.
if very_unstable_queries:
error_tests += very_unstable_queries
status = 'failure'
if very_unstable_queries > 3:
error_tests += very_unstable_queries
status = 'failure'
message_array.append(str(very_unstable_queries) + ' unstable')
error_tests += slow_average_tests

View File

@ -8,6 +8,7 @@ RUN apt-get update -y && \
python3-wheel \
brotli \
netcat-openbsd \
postgresql-client \
zstd
RUN python3 -m pip install \

View File

@ -1,6 +1,15 @@
#!/bin/bash
set -e
echo "Configure to use Yandex dockerhub-proxy"
mkdir -p /etc/docker/
cat > /etc/docker/daemon.json << EOF
{
"insecure-registries" : ["dockerhub-proxy.sas.yp-c.yandex.net:5000"],
"registry-mirrors" : ["http://dockerhub-proxy.sas.yp-c.yandex.net:5000"]
}
EOF
dockerd --host=unix:///var/run/docker.sock --host=tcp://0.0.0.0:2375 &>/var/log/somefile &
set +e
@ -16,14 +25,6 @@ while true; do
done
set -e
echo "Configure to use Yandex dockerhub-proxy"
cat > /etc/docker/daemon.json << EOF
{
"insecure-registries": ["dockerhub-proxy.sas.yp-c.yandex.net:5000"],
"registry-mirrors": ["dockerhub-proxy.sas.yp-c.yandex.net:5000"]
}
EOF
echo "Start tests"
export CLICKHOUSE_TESTS_SERVER_BIN_PATH=/clickhouse
export CLICKHOUSE_TESTS_CLIENT_BIN_PATH=/clickhouse

View File

@ -32,7 +32,7 @@ CREATE TABLE `ontime`
`Reporting_Airline` String,
`DOT_ID_Reporting_Airline` Int32,
`IATA_CODE_Reporting_Airline` String,
`Tail_Number` Int32,
`Tail_Number` String,
`Flight_Number_Reporting_Airline` String,
`OriginAirportID` Int32,
`OriginAirportSeqID` Int32,

View File

@ -94,6 +94,15 @@ For production environments, its recommended to use the latest `stable`-versi
To run ClickHouse inside Docker follow the guide on [Docker Hub](https://hub.docker.com/r/yandex/clickhouse-server/). Those images use official `deb` packages inside.
### Single Binary
You can install ClickHouse on Linux using single portable binary from the latest commit of the `master` branch: [https://builds.clickhouse.tech/master/amd64/clickhouse].
```
curl -O 'https://builds.clickhouse.tech/master/amd64/clickhouse' && chmod a+x clickhouse
sudo ./clickhouse install
```
### From Precompiled Binaries for Non-Standard Environments {#from-binaries-non-linux}
For non-Linux operating systems and for AArch64 CPU arhitecture, ClickHouse builds are provided as a cross-compiled binary from the latest commit of the `master` branch (with a few hours delay).
@ -104,7 +113,7 @@ For non-Linux operating systems and for AArch64 CPU arhitecture, ClickHouse buil
After downloading, you can use the `clickhouse client` to connect to the server, or `clickhouse local` to process local data.
Run `sudo ./clickhouse install` if you want to install clickhouse system-wide (also with needed configuration files, configuring users etc.). After that run `clickhouse start` commands to start the clickhouse-server and `clickhouse-client` to connect to it.
Run `sudo ./clickhouse install` if you want to install clickhouse system-wide (also with needed configuration files, configuring users etc.). After that run `clickhouse start` commands to start the clickhouse-server and `clickhouse-client` to connect to it.
These builds are not recommended for use in production environments because they are less thoroughly tested, but you can do so on your own risk. They also have only a subset of ClickHouse features available.

View File

@ -0,0 +1,107 @@
---
toc_priority: 146
toc_title: intervalLengthSum
---
# intervalLengthSum {#agg_function-intervallengthsum}
Calculates the total length of union of all ranges (segments on numeric axis).
**Syntax**
``` sql
intervalLengthSum(start, end)
```
**Arguments**
- `start` — The starting value of the interval. [Int32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Int64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Float32](../../../sql-reference/data-types/float.md#float32-float64), [Float64](../../../sql-reference/data-types/float.md#float32-float64), [DateTime](../../../sql-reference/data-types/datetime.md#data_type-datetime) or [Date](../../../sql-reference/data-types/date.md#data_type-date).
- `end` — The ending value of the interval. [Int32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Int64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Float32](../../../sql-reference/data-types/float.md#float32-float64), [Float64](../../../sql-reference/data-types/float.md#float32-float64), [DateTime](../../../sql-reference/data-types/datetime.md#data_type-datetime) or [Date](../../../sql-reference/data-types/date.md#data_type-date).
!!! info "Note"
Arguments must be of the same data type. Otherwise, an exception will be thrown.
**Returned value**
- Total length of union of all ranges (segments on numeric axis). Depending on the type of the argument, the return value may be [UInt64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64) or [Float64](../../../sql-reference/data-types/float.md#float32-float64) type.
**Examples**
1. Input table:
``` text
┌─id─┬─start─┬─end─┐
│ a │ 1.1 │ 2.9 │
│ a │ 2.5 │ 3.2 │
│ a │ 4 │ 5 │
└────┴───────┴─────┘
```
In this example, the arguments of the Float32 type are used. The function returns a value of the Float64 type.
Result is the sum of lengths of intervals `[1.1, 3.2]` (union of `[1.1, 2.9]` and `[2.5, 3.2]`) and `[4, 5]`
Query:
``` sql
SELECT id, intervalLengthSum(start, end), toTypeName(intervalLengthSum(start, end)) FROM fl_interval GROUP BY id ORDER BY id;
```
Result:
``` text
┌─id─┬─intervalLengthSum(start, end)─┬─toTypeName(intervalLengthSum(start, end))─┐
│ a │ 3.1 │ Float64 │
└────┴───────────────────────────────┴───────────────────────────────────────────┘
```
2. Input table:
``` text
┌─id─┬───────────────start─┬─────────────────end─┐
│ a │ 2020-01-01 01:12:30 │ 2020-01-01 02:10:10 │
│ a │ 2020-01-01 02:05:30 │ 2020-01-01 02:50:31 │
│ a │ 2020-01-01 03:11:22 │ 2020-01-01 03:23:31 │
└────┴─────────────────────┴─────────────────────┘
```
In this example, the arguments of the DateTime type are used. The function returns a value in seconds.
Query:
``` sql
SELECT id, intervalLengthSum(start, end), toTypeName(intervalLengthSum(start, end)) FROM dt_interval GROUP BY id ORDER BY id;
```
Result:
``` text
┌─id─┬─intervalLengthSum(start, end)─┬─toTypeName(intervalLengthSum(start, end))─┐
│ a │ 6610 │ UInt64 │
└────┴───────────────────────────────┴───────────────────────────────────────────┘
```
3. Input table:
``` text
┌─id─┬──────start─┬────────end─┐
│ a │ 2020-01-01 │ 2020-01-04 │
│ a │ 2020-01-12 │ 2020-01-18 │
└────┴────────────┴────────────┘
```
In this example, the arguments of the Date type are used. The function returns a value in days.
Query:
``` sql
SELECT id, intervalLengthSum(start, end), toTypeName(intervalLengthSum(start, end)) FROM date_interval GROUP BY id ORDER BY id;
```
Result:
``` text
┌─id─┬─intervalLengthSum(start, end)─┬─toTypeName(intervalLengthSum(start, end))─┐
│ a │ 9 │ UInt64 │
└────┴───────────────────────────────┴───────────────────────────────────────────┘
```

View File

@ -0,0 +1,37 @@
---
toc_priority: 145
---
# sumKahan {#agg_function-sumKahan}
Calculates the sum of the numbers with [Kahan compensated summation algorithm](https://en.wikipedia.org/wiki/Kahan_summation_algorithm)
**Syntax**
``` sql
sumKahan(x)
```
**Arguments**
- `x` — Input value, must be [Integer](../../../sql-reference/data-types/int-uint.md), [Float](../../../sql-reference/data-types/float.md), or [Decimal](../../../sql-reference/data-types/decimal.md).
**Returned value**
- the sum of numbers, with type [Integer](../../../sql-reference/data-types/int-uint.md), [Float](../../../sql-reference/data-types/float.md), or [Decimal](../../../sql-reference/data-types/decimal.md) depends on type of input arguments
**Example**
Query:
``` sql
SELECT sum(0.1), sumKahan(0.1) FROM numbers(10);
```
Result:
``` text
┌───────────sum(0.1)─┬─sumKahan(0.1)─┐
│ 0.9999999999999999 │ 1 │
└────────────────────┴───────────────┘
```

View File

@ -4,7 +4,7 @@ toc_priority: 109
# topKWeighted {#topkweighted}
Similar to `topK` but takes one additional argument of integer type - `weight`. Every value is accounted `weight` times for frequency calculation.
Returns an array of the approximately most frequent values in the specified column. The resulting array is sorted in descending order of approximate frequency of values (not by the values themselves). Additionally, the weight of the value is taken into account.
**Syntax**
@ -15,11 +15,8 @@ topKWeighted(N)(x, weight)
**Arguments**
- `N` — The number of elements to return.
**Arguments**
- `x` — The value.
- `weight` — The weight. [UInt8](../../../sql-reference/data-types/int-uint.md).
- `weight` — The weight. Every value is accounted `weight` times for frequency calculation. [UInt64](../../../sql-reference/data-types/int-uint.md).
**Returned value**
@ -40,3 +37,7 @@ Result:
│ [999,998,997,996,995,994,993,992,991,990] │
└───────────────────────────────────────────┘
```
**See Also**
- [topK](../../../sql-reference/aggregate-functions/reference/topk.md)

View File

@ -159,7 +159,7 @@ Configuration fields:
| Tag | Description | Required |
|------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|----------|
| `name` | Column name. | Yes |
| `type` | ClickHouse data type.<br/>ClickHouse tries to cast value from dictionary to the specified data type. For example, for MySQL, the field might be `TEXT`, `VARCHAR`, or `BLOB` in the MySQL source table, but it can be uploaded as `String` in ClickHouse.<br/>[Nullable](../../../sql-reference/data-types/nullable.md) is currently supported for [Flat](external-dicts-dict-layout.md#flat), [Hashed](external-dicts-dict-layout.md#dicts-external_dicts_dict_layout-hashed), [ComplexKeyHashed](external-dicts-dict-layout.md#complex-key-hashed), [Direct](external-dicts-dict-layout.md#direct), [ComplexKeyDirect](external-dicts-dict-layout.md#complex-key-direct), [RangeHashed](external-dicts-dict-layout.md#range-hashed), [Polygon](external-dicts-dict-polygon.md), [Cache](external-dicts-dict-layout.md#cache), [ComplexKeyCache](external-dicts-dict-layout.md#complex-key-cache), [SSDCache](external-dicts-dict-layout.md#ssd-cache), [SSDComplexKeyCache](external-dicts-dict-layout.md#complex-key-ssd-cache) dictionaries. In [IPTrie](external-dicts-dict-layout.md#ip-trie) dictionaries `Nullable` types are not supported. | Yes |
| `type` | ClickHouse data type: [UInt8](../../../sql-reference/data-types/int-uint.md), [UInt16](../../../sql-reference/data-types/int-uint.md), [UInt32](../../../sql-reference/data-types/int-uint.md), [UInt64](../../../sql-reference/data-types/int-uint.md), [Int8](../../../sql-reference/data-types/int-uint.md), [Int16](../../../sql-reference/data-types/int-uint.md), [Int32](../../../sql-reference/data-types/int-uint.md), [Int64](../../../sql-reference/data-types/int-uint.md), [Float32](../../../sql-reference/data-types/float.md), [Float64](../../../sql-reference/data-types/float.md), [UUID](../../../sql-reference/data-types/uuid.md), [Decimal32](../../../sql-reference/data-types/decimal.md), [Decimal64](../../../sql-reference/data-types/decimal.md), [Decimal128](../../../sql-reference/data-types/decimal.md), [Decimal256](../../../sql-reference/data-types/decimal.md), [String](../../../sql-reference/data-types/string.md).<br/>ClickHouse tries to cast value from dictionary to the specified data type. For example, for MySQL, the field might be `TEXT`, `VARCHAR`, or `BLOB` in the MySQL source table, but it can be uploaded as `String` in ClickHouse.<br/>[Nullable](../../../sql-reference/data-types/nullable.md) is currently supported for [Flat](external-dicts-dict-layout.md#flat), [Hashed](external-dicts-dict-layout.md#dicts-external_dicts_dict_layout-hashed), [ComplexKeyHashed](external-dicts-dict-layout.md#complex-key-hashed), [Direct](external-dicts-dict-layout.md#direct), [ComplexKeyDirect](external-dicts-dict-layout.md#complex-key-direct), [RangeHashed](external-dicts-dict-layout.md#range-hashed), [Polygon](external-dicts-dict-polygon.md), [Cache](external-dicts-dict-layout.md#cache), [ComplexKeyCache](external-dicts-dict-layout.md#complex-key-cache), [SSDCache](external-dicts-dict-layout.md#ssd-cache), [SSDComplexKeyCache](external-dicts-dict-layout.md#complex-key-ssd-cache) dictionaries. In [IPTrie](external-dicts-dict-layout.md#ip-trie) dictionaries `Nullable` types are not supported. | Yes |
| `null_value` | Default value for a non-existing element.<br/>In the example, it is an empty string. [NULL](../../syntax.md#null-literal) value can be used only for the `Nullable` types (see the previous line with types description). | Yes |
| `expression` | [Expression](../../../sql-reference/syntax.md#syntax-expressions) that ClickHouse executes on the value.<br/>The expression can be a column name in the remote SQL database. Thus, you can use it to create an alias for the remote column.<br/><br/>Default value: no expression. | No |
| <a name="hierarchical-dict-attr"></a> `hierarchical` | If `true`, the attribute contains the value of a parent key for the current key. See [Hierarchical Dictionaries](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-hierarchical.md).<br/><br/>Default value: `false`. | No |

View File

@ -0,0 +1,107 @@
---
toc_priority: 146
toc_title: intervalLengthSum
---
# intervalLengthSum {#agg_function-intervallengthsum}
Вычисляет длину объединения интервалов (отрезков на числовой оси).
**Синтаксис**
``` sql
intervalLengthSum(start, end)
```
**Аргументы**
- `start` — начальное значение интервала. [Int32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Int64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Float32](../../../sql-reference/data-types/float.md#float32-float64), [Float64](../../../sql-reference/data-types/float.md#float32-float64), [DateTime](../../../sql-reference/data-types/datetime.md#data_type-datetime) или [Date](../../../sql-reference/data-types/date.md#data_type-date).
- `end` — конечное значение интервала. [Int32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Int64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt32](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [UInt64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64), [Float32](../../../sql-reference/data-types/float.md#float32-float64), [Float64](../../../sql-reference/data-types/float.md#float32-float64), [DateTime](../../../sql-reference/data-types/datetime.md#data_type-datetime) или [Date](../../../sql-reference/data-types/date.md#data_type-date).
!!! info "Примечание"
Аргументы должны быть одного типа. В противном случае ClickHouse сгенерирует исключение.
**Возвращаемое значение**
- Длина объединения всех интервалов (отрезков на числовой оси). В зависимости от типа аргумента возвращаемое значение может быть типа [UInt64](../../../sql-reference/data-types/int-uint.md#uint8-uint16-uint32-uint64-int8-int16-int32-int64) или [Float64](../../../sql-reference/data-types/float.md#float32-float64).
**Примеры**
1. Входная таблица:
``` text
┌─id─┬─start─┬─end─┐
│ a │ 1.1 │ 2.9 │
│ a │ 2.5 │ 3.2 │
│ a │ 4 │ 5 │
└────┴───────┴─────┘
```
В этом примере используются аргументы типа Float32. Функция возвращает значение типа Float64.
Результатом функции будет сумма длин интервалов `[1.1, 3.2]` (объединение `[1.1, 2.9]` и `[2.5, 3.2]`) и `[4, 5]`
Запрос:
``` sql
SELECT id, intervalLengthSum(start, end), toTypeName(intervalLengthSum(start, end)) FROM fl_interval GROUP BY id ORDER BY id;
```
Результат:
``` text
┌─id─┬─intervalLengthSum(start, end)─┬─toTypeName(intervalLengthSum(start, end))─┐
│ a │ 3.1 │ Float64 │
└────┴───────────────────────────────┴───────────────────────────────────────────┘
```
2. Входная таблица:
``` text
┌─id─┬───────────────start─┬─────────────────end─┐
│ a │ 2020-01-01 01:12:30 │ 2020-01-01 02:10:10 │
│ a │ 2020-01-01 02:05:30 │ 2020-01-01 02:50:31 │
│ a │ 2020-01-01 03:11:22 │ 2020-01-01 03:23:31 │
└────┴─────────────────────┴─────────────────────┘
```
В этом примере используются аргументы типа DateTime. Функция возвращает значение, выраженное в секундах.
Запрос:
``` sql
SELECT id, intervalLengthSum(start, end), toTypeName(intervalLengthSum(start, end)) FROM dt_interval GROUP BY id ORDER BY id;
```
Результат:
``` text
┌─id─┬─intervalLengthSum(start, end)─┬─toTypeName(intervalLengthSum(start, end))─┐
│ a │ 6610 │ UInt64 │
└────┴───────────────────────────────┴───────────────────────────────────────────┘
```
3. Входная таблица:
``` text
┌─id─┬──────start─┬────────end─┐
│ a │ 2020-01-01 │ 2020-01-04 │
│ a │ 2020-01-12 │ 2020-01-18 │
└────┴────────────┴────────────┘
```
В этом примере используются аргументы типа Date. Функция возвращает значение, выраженное в днях.
Запрос:
``` sql
SELECT id, intervalLengthSum(start, end), toTypeName(intervalLengthSum(start, end)) FROM date_interval GROUP BY id ORDER BY id;
```
Результат:
``` text
┌─id─┬─intervalLengthSum(start, end)─┬─toTypeName(intervalLengthSum(start, end))─┐
│ a │ 9 │ UInt64 │
└────┴───────────────────────────────┴───────────────────────────────────────────┘
```

View File

@ -0,0 +1,37 @@
---
toc_priority: 145
---
# sumKahan {#agg_function-sumKahan}
Вычисляет сумму с использованием [компенсационного суммирования по алгоритму Кэхэна](https://ru.wikipedia.org/wiki/Алгоритм_Кэхэна)
**Синтаксис**
``` sql
sumKahan(x)
```
**Аргументы**
- `x` — Входное значение типа [Integer](../../../sql-reference/data-types/int-uint.md), [Float](../../../sql-reference/data-types/float.md), или [Decimal](../../../sql-reference/data-types/decimal.md).
**Возвращемое значение**
- сумма чисел с типом [Integer](../../../sql-reference/data-types/int-uint.md), [Float](../../../sql-reference/data-types/float.md), ил [Decimal](../../../sql-reference/data-types/decimal.md) зависящим от типа входящих аргументов
**Пример**
Запрос:
``` sql
SELECT sum(0.1), sumKahan(0.1) FROM numbers(10);
```
Результат:
``` text
┌───────────sum(0.1)─┬─sumKahan(0.1)─┐
│ 0.9999999999999999 │ 1 │
└────────────────────┴───────────────┘
```

View File

@ -4,7 +4,7 @@ toc_priority: 109
# topKWeighted {#topkweighted}
Аналогична `topK`, но дополнительно принимает положительный целочисленный параметр `weight`. Каждое значение учитывается `weight` раз при расчёте частоты.
Возвращает массив наиболее часто встречающихся значений в указанном столбце. Результирующий массив упорядочен по убыванию частоты значения (не по самим значениям). Дополнительно учитывается вес значения.
**Синтаксис**
@ -15,11 +15,8 @@ topKWeighted(N)(x, weight)
**Аргументы**
- `N` — количество элементов для выдачи.
**Аргументы**
- `x` — значение.
- `weight` — вес. [UInt8](../../../sql-reference/data-types/int-uint.md).
- `weight` — вес. Каждое значение учитывается `weight` раз при расчёте частоты. [UInt64](../../../sql-reference/data-types/int-uint.md).
**Возвращаемое значение**
@ -41,3 +38,6 @@ SELECT topKWeighted(10)(number, number) FROM numbers(1000)
└───────────────────────────────────────────┘
```
**Смотрите также**
- [topK](../../../sql-reference/aggregate-functions/reference/topk.md)

View File

@ -159,7 +159,7 @@ CREATE DICTIONARY somename (
| Тег | Описание | Обязательный |
|------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--------------|
| `name` | Имя столбца. | Да |
| `type` | Тип данных ClickHouse.<br/>ClickHouse пытается привести значение из словаря к заданному типу данных. Например, в случае MySQL, в таблице-источнике поле может быть `TEXT`, `VARCHAR`, `BLOB`, но загружено может быть как `String`. <br/>[Nullable](../../../sql-reference/data-types/nullable.md) в настоящее время поддерживается для словарей [Flat](external-dicts-dict-layout.md#flat), [Hashed](external-dicts-dict-layout.md#dicts-external_dicts_dict_layout-hashed), [ComplexKeyHashed](external-dicts-dict-layout.md#complex-key-hashed), [Direct](external-dicts-dict-layout.md#direct), [ComplexKeyDirect](external-dicts-dict-layout.md#complex-key-direct), [RangeHashed](external-dicts-dict-layout.md#range-hashed), [Polygon](external-dicts-dict-polygon.md), [Cache](external-dicts-dict-layout.md#cache), [ComplexKeyCache](external-dicts-dict-layout.md#complex-key-cache), [SSDCache](external-dicts-dict-layout.md#ssd-cache), [SSDComplexKeyCache](external-dicts-dict-layout.md#complex-key-ssd-cache). Для словарей [IPTrie](external-dicts-dict-layout.md#ip-trie) `Nullable`-типы не поддерживаются. | Да |
| `type` | Тип данных ClickHouse: [UInt8](../../../sql-reference/data-types/int-uint.md), [UInt16](../../../sql-reference/data-types/int-uint.md), [UInt32](../../../sql-reference/data-types/int-uint.md), [UInt64](../../../sql-reference/data-types/int-uint.md), [Int8](../../../sql-reference/data-types/int-uint.md), [Int16](../../../sql-reference/data-types/int-uint.md), [Int32](../../../sql-reference/data-types/int-uint.md), [Int64](../../../sql-reference/data-types/int-uint.md), [Float32](../../../sql-reference/data-types/float.md), [Float64](../../../sql-reference/data-types/float.md), [UUID](../../../sql-reference/data-types/uuid.md), [Decimal32](../../../sql-reference/data-types/decimal.md), [Decimal64](../../../sql-reference/data-types/decimal.md), [Decimal128](../../../sql-reference/data-types/decimal.md), [Decimal256](../../../sql-reference/data-types/decimal.md), [String](../../../sql-reference/data-types/string.md).<br/>ClickHouse пытается привести значение из словаря к заданному типу данных. Например, в случае MySQL, в таблице-источнике поле может быть `TEXT`, `VARCHAR`, `BLOB`, но загружено может быть как `String`. <br/>[Nullable](../../../sql-reference/data-types/nullable.md) в настоящее время поддерживается для словарей [Flat](external-dicts-dict-layout.md#flat), [Hashed](external-dicts-dict-layout.md#dicts-external_dicts_dict_layout-hashed), [ComplexKeyHashed](external-dicts-dict-layout.md#complex-key-hashed), [Direct](external-dicts-dict-layout.md#direct), [ComplexKeyDirect](external-dicts-dict-layout.md#complex-key-direct), [RangeHashed](external-dicts-dict-layout.md#range-hashed), [Polygon](external-dicts-dict-polygon.md), [Cache](external-dicts-dict-layout.md#cache), [ComplexKeyCache](external-dicts-dict-layout.md#complex-key-cache), [SSDCache](external-dicts-dict-layout.md#ssd-cache), [SSDComplexKeyCache](external-dicts-dict-layout.md#complex-key-ssd-cache). Для словарей [IPTrie](external-dicts-dict-layout.md#ip-trie) `Nullable`-типы не поддерживаются. | Да |
| `null_value` | Значение по умолчанию для несуществующего элемента.<br/>В примере это пустая строка. Значение [NULL](../../syntax.md#null-literal) можно указывать только для типов `Nullable` (см. предыдущую строку с описанием типов). | Да |
| `expression` | [Выражение](../../syntax.md#syntax-expressions), которое ClickHouse выполняет со значением.<br/>Выражением может быть имя столбца в удаленной SQL базе. Таким образом, вы можете использовать его для создания псевдонима удаленного столбца.<br/><br/>Значение по умолчанию: нет выражения. | Нет |
| <a name="hierarchical-dict-attr"></a> `hierarchical` | Если `true`, то атрибут содержит ключ предка для текущего элемента. Смотрите [Иерархические словари](external-dicts-dict-hierarchical.md).<br/><br/>Значение по умолчанию: `false`. | Нет |

View File

@ -1336,7 +1336,7 @@ private:
fmt::print(
stderr,
"IAST::clone() is broken for some AST node. This is a bug. The original AST ('dump before fuzz') and its cloned copy ('dump of cloned AST') refer to the same nodes, which must never happen. This means that their parent node doesn't implement clone() correctly.");
"Found error: IAST::clone() is broken for some AST node. This is a bug. The original AST ('dump before fuzz') and its cloned copy ('dump of cloned AST') refer to the same nodes, which must never happen. This means that their parent node doesn't implement clone() correctly.");
exit(1);
}
@ -1461,7 +1461,7 @@ private:
const auto text_3 = ast_3->formatForErrorMessage();
if (text_3 != text_2)
{
fmt::print(stderr, "The query formatting is broken.\n");
fmt::print(stderr, "Found error: The query formatting is broken.\n");
printChangedSettings();

View File

@ -325,14 +325,14 @@ void QueryFuzzer::fuzzColumnLikeExpressionList(IAST * ast)
// the generic recursion into IAST.children.
}
void QueryFuzzer::fuzzWindowFrame(WindowFrame & frame)
void QueryFuzzer::fuzzWindowFrame(ASTWindowDefinition & def)
{
switch (fuzz_rand() % 40)
{
case 0:
{
const auto r = fuzz_rand() % 3;
frame.type = r == 0 ? WindowFrame::FrameType::Rows
def.frame_type = r == 0 ? WindowFrame::FrameType::Rows
: r == 1 ? WindowFrame::FrameType::Range
: WindowFrame::FrameType::Groups;
break;
@ -340,44 +340,65 @@ void QueryFuzzer::fuzzWindowFrame(WindowFrame & frame)
case 1:
{
const auto r = fuzz_rand() % 3;
frame.begin_type = r == 0 ? WindowFrame::BoundaryType::Unbounded
def.frame_begin_type = r == 0 ? WindowFrame::BoundaryType::Unbounded
: r == 1 ? WindowFrame::BoundaryType::Current
: WindowFrame::BoundaryType::Offset;
if (def.frame_begin_type == WindowFrame::BoundaryType::Offset)
{
// The offsets are fuzzed normally through 'children'.
def.frame_begin_offset
= std::make_shared<ASTLiteral>(getRandomField(0));
}
else
{
def.frame_begin_offset = nullptr;
}
break;
}
case 2:
{
const auto r = fuzz_rand() % 3;
frame.end_type = r == 0 ? WindowFrame::BoundaryType::Unbounded
def.frame_end_type = r == 0 ? WindowFrame::BoundaryType::Unbounded
: r == 1 ? WindowFrame::BoundaryType::Current
: WindowFrame::BoundaryType::Offset;
break;
}
case 3:
{
frame.begin_offset = getRandomField(0).get<Int64>();
break;
}
case 4:
{
frame.end_offset = getRandomField(0).get<Int64>();
if (def.frame_end_type == WindowFrame::BoundaryType::Offset)
{
def.frame_end_offset
= std::make_shared<ASTLiteral>(getRandomField(0));
}
else
{
def.frame_end_offset = nullptr;
}
break;
}
case 5:
{
frame.begin_preceding = fuzz_rand() % 2;
def.frame_begin_preceding = fuzz_rand() % 2;
break;
}
case 6:
{
frame.end_preceding = fuzz_rand() % 2;
def.frame_end_preceding = fuzz_rand() % 2;
break;
}
default:
break;
}
frame.is_default = (frame == WindowFrame{});
if (def.frame_type == WindowFrame::FrameType::Range
&& def.frame_begin_type == WindowFrame::BoundaryType::Unbounded
&& def.frame_begin_preceding
&& def.frame_end_type == WindowFrame::BoundaryType::Current)
{
def.frame_is_default = true; /* NOLINT clang-tidy could you just shut up please */
}
else
{
def.frame_is_default = false;
}
}
void QueryFuzzer::fuzz(ASTs & asts)
@ -462,7 +483,7 @@ void QueryFuzzer::fuzz(ASTPtr & ast)
auto & def = fn->window_definition->as<ASTWindowDefinition &>();
fuzzColumnLikeExpressionList(def.partition_by.get());
fuzzOrderByList(def.order_by.get());
fuzzWindowFrame(def.frame);
fuzzWindowFrame(def);
}
fuzz(fn->children);

View File

@ -17,7 +17,7 @@ namespace DB
class ASTExpressionList;
class ASTOrderByElement;
struct WindowFrame;
struct ASTWindowDefinition;
/*
* This is an AST-based query fuzzer that makes random modifications to query
@ -69,7 +69,7 @@ struct QueryFuzzer
void fuzzOrderByElement(ASTOrderByElement * elem);
void fuzzOrderByList(IAST * ast);
void fuzzColumnLikeExpressionList(IAST * ast);
void fuzzWindowFrame(WindowFrame & frame);
void fuzzWindowFrame(ASTWindowDefinition & def);
void fuzz(ASTs & asts);
void fuzz(ASTPtr & ast);
void collectFuzzInfoMain(const ASTPtr ast);

File diff suppressed because it is too large Load Diff

View File

@ -18,12 +18,13 @@ public:
ClusterCopier(const String & task_path_,
const String & host_id_,
const String & proxy_database_name_,
ContextMutablePtr context_)
ContextMutablePtr context_,
Poco::Logger * log_)
: WithMutableContext(context_),
task_zookeeper_path(task_path_),
host_id(host_id_),
working_database_name(proxy_database_name_),
log(&Poco::Logger::get("ClusterCopier")) {}
log(log_) {}
void init();
@ -117,14 +118,14 @@ protected:
TaskStatus tryMoveAllPiecesToDestinationTable(const TaskTable & task_table, const String & partition_name);
/// Removes MATERIALIZED and ALIAS columns from create table query
static ASTPtr removeAliasColumnsFromCreateQuery(const ASTPtr & query_ast);
static ASTPtr removeAliasMaterializedAndTTLColumnsFromCreateQuery(const ASTPtr & query_ast, bool allow_to_copy_alias_and_materialized_columns);
bool tryDropPartitionPiece(ShardPartition & task_partition, size_t current_piece_number,
const zkutil::ZooKeeperPtr & zookeeper, const CleanStateClock & clean_state_clock);
static constexpr UInt64 max_table_tries = 3;
static constexpr UInt64 max_shard_partition_tries = 3;
static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 3;
static constexpr UInt64 max_shard_partition_piece_tries_for_alter = 10;
bool tryProcessTable(const ConnectionTimeouts & timeouts, TaskTable & task_table);
@ -189,9 +190,7 @@ protected:
const ClusterPtr & cluster,
const String & query,
const Settings & current_settings,
PoolMode pool_mode = PoolMode::GET_ALL,
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD,
UInt64 max_successful_executions_per_shard = 0) const;
ClusterExecutionMode execution_mode = ClusterExecutionMode::ON_EACH_SHARD) const;
private:
String task_zookeeper_path;
@ -208,7 +207,6 @@ private:
ConfigurationPtr task_cluster_initial_config;
ConfigurationPtr task_cluster_current_config;
Coordination::Stat task_description_current_stat{};
std::unique_ptr<TaskCluster> task_cluster;

View File

@ -22,8 +22,9 @@ void ClusterCopierApp::initialize(Poco::Util::Application & self)
config_xml_path = config().getString("config-file");
task_path = config().getString("task-path");
log_level = config().getString("log-level", "trace");
log_level = config().getString("log-level", "info");
is_safe_mode = config().has("safe-mode");
is_status_mode = config().has("status");
if (config().has("copy-fault-probability"))
copy_fault_probability = std::max(std::min(config().getDouble("copy-fault-probability"), 1.0), 0.0);
if (config().has("move-fault-probability"))
@ -97,6 +98,7 @@ void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
.argument("base-dir").binding("base-dir"));
options.addOption(Poco::Util::Option("experimental-use-sample-offset", "", "Use SAMPLE OFFSET query instead of cityHash64(PRIMARY KEY) % n == k")
.argument("experimental-use-sample-offset").binding("experimental-use-sample-offset"));
options.addOption(Poco::Util::Option("status", "", "Get for status for current execution").binding("status"));
using Me = std::decay_t<decltype(*this)>;
options.addOption(Poco::Util::Option("help", "", "produce this help message").binding("help")
@ -106,6 +108,25 @@ void ClusterCopierApp::defineOptions(Poco::Util::OptionSet & options)
void ClusterCopierApp::mainImpl()
{
/// Status command
{
if (is_status_mode)
{
SharedContextHolder shared_context = Context::createShared();
auto context = Context::createGlobal(shared_context.get());
context->makeGlobalContext();
SCOPE_EXIT_SAFE(context->shutdown());
auto zookeeper = context->getZooKeeper();
auto status_json = zookeeper->get(task_path + "/status");
LOG_INFO(&logger(), "{}", status_json);
std::cout << status_json << std::endl;
context->resetZooKeeper();
return;
}
}
StatusFile status_file(process_path + "/status", StatusFile::write_full_info);
ThreadStatus thread_status;
@ -136,7 +157,7 @@ void ClusterCopierApp::mainImpl()
/// Initialize query scope just in case.
CurrentThread::QueryScope query_scope(context);
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, context);
auto copier = std::make_unique<ClusterCopier>(task_path, host_id, default_database, context, log);
copier->setSafeMode(is_safe_mode);
copier->setCopyFaultProbability(copy_fault_probability);
copier->setMoveFaultProbability(move_fault_probability);

View File

@ -76,8 +76,9 @@ private:
std::string config_xml_path;
std::string task_path;
std::string log_level = "trace";
std::string log_level = "info";
bool is_safe_mode = false;
bool is_status_mode = false;
double copy_fault_probability = 0.0;
double move_fault_probability = 0.0;
bool is_help = false;

View File

@ -0,0 +1,65 @@
#pragma once
#include <Poco/JSON/Parser.h>
#include <Poco/JSON/JSON.h>
#include <Poco/JSON/Object.h>
#include <Poco/JSON/Stringifier.h>
#include <unordered_map>
#include <memory>
#include <string>
#include <iostream>
namespace DB
{
class StatusAccumulator
{
public:
struct TableStatus
{
size_t all_partitions_count;
size_t processed_partitions_count;
};
using Map = std::unordered_map<std::string, TableStatus>;
using MapPtr = std::shared_ptr<Map>;
static MapPtr fromJSON(std::string state_json)
{
Poco::JSON::Parser parser;
auto state = parser.parse(state_json).extract<Poco::JSON::Object::Ptr>();
MapPtr result_ptr = std::make_shared<Map>();
for (const auto & table_name : state->getNames())
{
auto table_status_json = state->getValue<std::string>(table_name);
auto table_status = parser.parse(table_status_json).extract<Poco::JSON::Object::Ptr>();
/// Map entry will be created if it is absent
auto & map_table_status = (*result_ptr)[table_name];
map_table_status.all_partitions_count += table_status->getValue<size_t>("all_partitions_count");
map_table_status.processed_partitions_count += table_status->getValue<size_t>("processed_partitions_count");
}
return result_ptr;
}
static std::string serializeToJSON(MapPtr statuses)
{
Poco::JSON::Object result_json;
for (const auto & [table_name, table_status] : *statuses)
{
Poco::JSON::Object status_json;
status_json.set("all_partitions_count", table_status.all_partitions_count);
status_json.set("processed_partitions_count", table_status.processed_partitions_count);
result_json.set(table_name, status_json);
}
std::ostringstream oss; // STYLE_CHECK_ALLOW_STD_STRING_STREAM
oss.exceptions(std::ios::failbit);
Poco::JSON::Stringifier::stringify(result_json, oss);
auto result = oss.str();
return result;
}
};
}

View File

@ -77,6 +77,8 @@ inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfigurat
if (config.has(prefix + "settings"))
settings_common.loadSettingsFromConfig(prefix + "settings", config);
settings_common.prefer_localhost_replica = 0;
settings_pull = settings_common;
if (config.has(prefix + "settings_pull"))
settings_pull.loadSettingsFromConfig(prefix + "settings_pull", config);
@ -92,11 +94,15 @@ inline void DB::TaskCluster::reloadSettings(const Poco::Util::AbstractConfigurat
/// Override important settings
settings_pull.readonly = 1;
settings_push.insert_distributed_sync = 1;
settings_pull.prefer_localhost_replica = false;
settings_push.insert_distributed_sync = true;
settings_push.prefer_localhost_replica = false;
set_default_value(settings_pull.load_balancing, LoadBalancing::NEAREST_HOSTNAME);
set_default_value(settings_pull.max_threads, 1);
set_default_value(settings_pull.max_block_size, 8192UL);
set_default_value(settings_pull.preferred_block_size_bytes, 0);
set_default_value(settings_push.insert_distributed_timeout, 0);
set_default_value(settings_push.replication_alter_partitions_sync, 2);
}

View File

@ -36,27 +36,33 @@ struct TaskTable
String getPartitionAttachIsDonePath(const String & partition_name) const;
String getPartitionPiecePath(const String & partition_name, const size_t piece_number) const;
String getPartitionPiecePath(const String & partition_name, size_t piece_number) const;
String getCertainPartitionIsDirtyPath(const String & partition_name) const;
String getCertainPartitionPieceIsDirtyPath(const String & partition_name, const size_t piece_number) const;
String getCertainPartitionPieceIsDirtyPath(const String & partition_name, size_t piece_number) const;
String getCertainPartitionIsCleanedPath(const String & partition_name) const;
String getCertainPartitionPieceIsCleanedPath(const String & partition_name, const size_t piece_number) const;
String getCertainPartitionPieceIsCleanedPath(const String & partition_name, size_t piece_number) const;
String getCertainPartitionTaskStatusPath(const String & partition_name) const;
String getCertainPartitionPieceTaskStatusPath(const String & partition_name, const size_t piece_number) const;
String getCertainPartitionPieceTaskStatusPath(const String & partition_name, size_t piece_number) const;
bool isReplicatedTable() const { return is_replicated_table; }
/// These nodes are used for check-status option
String getStatusAllPartitionCount() const;
String getStatusProcessedPartitionsCount() const;
/// Partitions will be split into number-of-splits pieces.
/// Each piece will be copied independently. (10 by default)
size_t number_of_splits;
bool allow_to_copy_alias_and_materialized_columns{false};
bool allow_to_drop_target_partitions{false};
String name_in_config;
/// Used as task ID
@ -83,7 +89,7 @@ struct TaskTable
String engine_push_zk_path;
bool is_replicated_table;
ASTPtr rewriteReplicatedCreateQueryToPlain();
ASTPtr rewriteReplicatedCreateQueryToPlain() const;
/*
* A Distributed table definition used to split data
@ -181,6 +187,7 @@ struct TaskShard
/// Last CREATE TABLE query of the table of the shard
ASTPtr current_pull_table_create_query;
ASTPtr current_push_table_create_query;
/// Internal distributed tables
DatabaseAndTableName table_read_shard;
@ -242,6 +249,16 @@ inline String TaskTable::getCertainPartitionPieceTaskStatusPath(const String & p
return getPartitionPiecePath(partition_name, piece_number) + "/shards";
}
inline String TaskTable::getStatusAllPartitionCount() const
{
return task_cluster.task_zookeeper_path + "/status/all_partitions_count";
}
inline String TaskTable::getStatusProcessedPartitionsCount() const
{
return task_cluster.task_zookeeper_path + "/status/processed_partitions_count";
}
inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConfiguration & config,
const String & prefix_, const String & table_key)
: task_cluster(parent)
@ -250,7 +267,10 @@ inline TaskTable::TaskTable(TaskCluster & parent, const Poco::Util::AbstractConf
name_in_config = table_key;
number_of_splits = config.getUInt64(table_prefix + "number_of_splits", 10);
number_of_splits = config.getUInt64(table_prefix + "number_of_splits", 3);
allow_to_copy_alias_and_materialized_columns = config.getBool(table_prefix + "allow_to_copy_alias_and_materialized_columns", false);
allow_to_drop_target_partitions = config.getBool(table_prefix + "allow_to_drop_target_partitions", false);
cluster_pull_name = config.getString(table_prefix + "cluster_pull");
cluster_push_name = config.getString(table_prefix + "cluster_push");
@ -343,7 +363,7 @@ inline void TaskTable::initShards(RandomEngine && random_engine)
std::uniform_int_distribution<UInt8> get_urand(0, std::numeric_limits<UInt8>::max());
// Compute the priority
for (auto & shard_info : cluster_pull->getShardsInfo())
for (const auto & shard_info : cluster_pull->getShardsInfo())
{
TaskShardPtr task_shard = std::make_shared<TaskShard>(*this, shard_info);
const auto & replicas = cluster_pull->getShardsAddresses().at(task_shard->indexInCluster());
@ -369,7 +389,7 @@ inline void TaskTable::initShards(RandomEngine && random_engine)
local_shards.assign(all_shards.begin(), it_first_remote);
}
inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain()
inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain() const
{
ASTPtr prev_engine_push_ast = engine_push_ast->clone();
@ -383,9 +403,15 @@ inline ASTPtr TaskTable::rewriteReplicatedCreateQueryToPlain()
{
auto & replicated_table_arguments = new_engine_ast.arguments->children;
/// Delete first two arguments of Replicated...MergeTree() table.
replicated_table_arguments.erase(replicated_table_arguments.begin());
replicated_table_arguments.erase(replicated_table_arguments.begin());
/// In some cases of Atomic database engine usage ReplicatedMergeTree tables
/// could be created without arguments.
if (!replicated_table_arguments.empty())
{
/// Delete first two arguments of Replicated...MergeTree() table.
replicated_table_arguments.erase(replicated_table_arguments.begin());
replicated_table_arguments.erase(replicated_table_arguments.begin());
}
}
return new_storage_ast.clone();
@ -400,7 +426,7 @@ inline String DB::TaskShard::getDescription() const
inline String DB::TaskShard::getHostNameExample() const
{
auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster());
const auto & replicas = task_table.cluster_pull->getShardsAddresses().at(indexInCluster());
return replicas.at(0).readableString();
}

View File

@ -19,6 +19,7 @@
#include <Common/StringUtils/StringUtils.h>
#include <Common/getHashOfLoadedBinary.h>
#include <Common/IO.h>
#include <common/phdr_cache.h>
#include <ext/scope_guard.h>
@ -172,11 +173,11 @@ enum class InstructionFail
AVX512 = 8
};
std::pair<const char *, size_t> instructionFailToString(InstructionFail fail)
auto instructionFailToString(InstructionFail fail)
{
switch (fail)
{
#define ret(x) return std::make_pair(x, ARRAY_SIZE(x) - 1)
#define ret(x) return std::make_tuple(STDERR_FILENO, x, ARRAY_SIZE(x) - 1)
case InstructionFail::NONE:
ret("NONE");
case InstructionFail::SSE3:
@ -260,28 +261,12 @@ void checkRequiredInstructionsImpl(volatile InstructionFail & fail)
fail = InstructionFail::NONE;
}
/// This function is safe to use in static initializers.
void writeErrorLen(const char * data, size_t size)
{
while (size != 0)
{
ssize_t res = ::write(STDERR_FILENO, data, size);
if ((-1 == res || 0 == res) && errno != EINTR)
_Exit(1);
if (res > 0)
{
data += res;
size -= res;
}
}
}
/// Macros to avoid using strlen(), since it may fail if SSE is not supported.
#define writeError(data) do \
{ \
static_assert(__builtin_constant_p(data)); \
writeErrorLen(data, ARRAY_SIZE(data) - 1); \
if (!writeRetry(STDERR_FILENO, data, ARRAY_SIZE(data) - 1)) \
_Exit(1); \
} while (false)
/// Check SSE and others instructions availability. Calls exit on fail.
@ -310,7 +295,8 @@ void checkRequiredInstructions()
if (sigsetjmp(jmpbuf, 1))
{
writeError("Instruction check fail. The CPU does not support ");
std::apply(writeErrorLen, instructionFailToString(fail));
if (!std::apply(writeRetry, instructionFailToString(fail)))
_Exit(1);
writeError(" instruction set.\n");
_Exit(1);
}

View File

@ -637,7 +637,7 @@ struct AggregateFunctionAnyLastData : Data
template <typename Data>
struct AggregateFunctionAnyHeavyData : Data
{
size_t counter = 0;
UInt64 counter = 0;
using Self = AggregateFunctionAnyHeavyData;

View File

@ -11,6 +11,7 @@
#include <algorithm>
#include <functional>
#include <filesystem>
#include <boost/algorithm/string.hpp>
#include <Poco/DOM/Text.h>
#include <Poco/DOM/Attr.h>
#include <Poco/DOM/Comment.h>
@ -36,6 +37,7 @@ namespace DB
namespace ErrorCodes
{
extern const int FILE_DOESNT_EXIST;
extern const int CANNOT_LOAD_CONFIG;
}
/// For cutting preprocessed path to this base
@ -437,6 +439,8 @@ ConfigProcessor::Files ConfigProcessor::getConfigMergeFiles(const std::string &
std::string extension = path.extension();
std::string base_name = path.stem();
boost::algorithm::to_lower(extension);
// Skip non-config and temporary files
if (fs::is_regular_file(path)
&& (extension == ".xml" || extension == ".conf" || extension == ".yaml" || extension == ".yml")
@ -462,13 +466,21 @@ XMLDocumentPtr ConfigProcessor::processConfig(
if (fs::exists(path))
{
fs::path p(path);
if (p.extension() == ".xml")
std::string extension = p.extension();
boost::algorithm::to_lower(extension);
if (extension == ".yaml" || extension == ".yml")
{
config = YAMLParser::parse(path);
}
else if (extension == ".xml" || extension == ".conf" || extension.empty())
{
config = dom_parser.parse(path);
}
else if (p.extension() == ".yaml" || p.extension() == ".yml")
else
{
config = YAMLParser::parse(path);
throw Exception(ErrorCodes::CANNOT_LOAD_CONFIG, "Unknown format of '{}' config", path);
}
}
else
@ -507,7 +519,10 @@ XMLDocumentPtr ConfigProcessor::processConfig(
XMLDocumentPtr with;
fs::path p(merge_file);
if (p.extension() == ".yaml" || p.extension() == ".yml")
std::string extension = p.extension();
boost::algorithm::to_lower(extension);
if (extension == ".yaml" || extension == ".yml")
{
with = YAMLParser::parse(merge_file);
}

View File

@ -269,7 +269,6 @@ public:
void operator() (const AggregateFunctionStateData & x) const;
};
template <typename T> constexpr bool isDecimalField() { return false; }
template <> constexpr bool isDecimalField<DecimalField<Decimal32>>() { return true; }
template <> constexpr bool isDecimalField<DecimalField<Decimal64>>() { return true; }

27
src/Common/IO.cpp Normal file
View File

@ -0,0 +1,27 @@
#include <Common/IO.h>
#include <unistd.h>
#include <errno.h>
#include <cstring>
bool writeRetry(int fd, const char * data, size_t size)
{
if (!size)
size = strlen(data);
while (size != 0)
{
ssize_t res = ::write(fd, data, size);
if ((-1 == res || 0 == res) && errno != EINTR)
return false;
if (res > 0)
{
data += res;
size -= res;
}
}
return true;
}

13
src/Common/IO.h Normal file
View File

@ -0,0 +1,13 @@
#pragma once
#include <cstddef>
/// IO helpers
/// Write loop with EINTR handling.
///
/// This function is safe to use in static initializers.
///
/// @param size - len of @data or 0 to use strlen()
/// @return true if write was succeed, otherwise false.
bool writeRetry(int fd, const char * data, size_t size = 0);

View File

@ -46,6 +46,7 @@ SRCS(
ExternalLoaderStatus.cpp
FieldVisitors.cpp
FileChecker.cpp
IO.cpp
IPv6ToBinary.cpp
IntervalKind.cpp
JSONBuilder.cpp

View File

@ -5,7 +5,7 @@ LIBRARY()
ADDINCL(
contrib/libs/lz4
contrib/libs/zstd
contrib/libs/zstd/include
)
PEERDIR(

View File

@ -4,7 +4,7 @@ LIBRARY()
ADDINCL(
contrib/libs/lz4
contrib/libs/zstd
contrib/libs/zstd/include
)
PEERDIR(

View File

@ -80,7 +80,7 @@ class IColumn;
M(UInt64, background_pool_size, 16, "Number of threads performing background work for tables (for example, merging in merge tree). Only has meaning at server startup.", 0) \
M(UInt64, background_move_pool_size, 8, "Number of threads performing background moves for tables. Only has meaning at server startup.", 0) \
M(UInt64, background_fetches_pool_size, 8, "Number of threads performing background fetches for replicated tables. Only has meaning at server startup.", 0) \
M(UInt64, background_schedule_pool_size, 16, "Number of threads performing background tasks for replicated tables, dns cache updates. Only has meaning at server startup.", 0) \
M(UInt64, background_schedule_pool_size, 128, "Number of threads performing background tasks for replicated tables, dns cache updates. Only has meaning at server startup.", 0) \
M(UInt64, background_message_broker_schedule_pool_size, 16, "Number of threads performing background tasks for message streaming. Only has meaning at server startup.", 0) \
M(UInt64, background_distributed_schedule_pool_size, 16, "Number of threads performing background tasks for distributed sends. Only has meaning at server startup.", 0) \
M(UInt64, max_replicated_fetches_network_bandwidth_for_server, 0, "The maximum speed of data exchange over the network in bytes per second for replicated fetches. Zero means unlimited. Only has meaning at server startup.", 0) \
@ -403,6 +403,7 @@ class IColumn;
M(Bool, optimize_if_chain_to_multiif, false, "Replace if(cond1, then1, if(cond2, ...)) chains to multiIf. Currently it's not beneficial for numeric types.", 0) \
M(Bool, optimize_if_transform_strings_to_enum, false, "Replaces string-type arguments in If and Transform to enum. Disabled by default cause it could make inconsistent change in distributed query that would lead to its fail.", 0) \
M(Bool, optimize_monotonous_functions_in_order_by, true, "Replace monotonous function with its argument in ORDER BY", 0) \
M(Bool, optimize_functions_to_subcolumns, false, "Transform functions to subcolumns, if possible, to reduce amount of read data. E.g. 'length(arr)' -> 'arr.size0', 'col IS NULL' -> 'col.null' ", 0) \
M(Bool, normalize_function_names, true, "Normalize function names to their canonical names", 0) \
M(Bool, allow_experimental_alter_materialized_view_structure, false, "Allow atomic alter on Materialized views. Work in progress.", 0) \
M(Bool, enable_early_constant_folding, true, "Enable query optimization where we analyze function and subqueries results and rewrite query if there're constants there", 0) \

View File

@ -26,6 +26,8 @@ inline StringRef checkAndReturnHost(const Pos & pos, const Pos & dot_pos, const
}
/// Extracts host from given url.
///
/// @return empty StringRef if the host is not valid (i.e. it does not have dot, or there no symbol after dot).
inline StringRef getURLHost(const char * data, size_t size)
{
Pos pos = data;

View File

@ -28,7 +28,10 @@ struct ExtractTopLevelDomain
return;
/// For IPv4 addresses select nothing.
if (last_dot[1] <= '9')
///
/// NOTE: it is safe to access last_dot[1]
/// since getURLHost() will not return a host if there is symbol after dot.
if (isNumericASCII(last_dot[1]))
return;
res_data = last_dot + 1;

View File

@ -182,18 +182,20 @@ public:
ColumnPtr executeImpl(const ColumnsWithTypeAndName & arguments, const DataTypePtr & result_type, size_t input_rows_count) const override
{
const ColumnMap * col_map = typeid_cast<const ColumnMap *>(arguments[0].column.get());
bool is_const = isColumnConst(*arguments[0].column);
const ColumnMap * col_map = is_const ? checkAndGetColumnConstData<ColumnMap>(arguments[0].column.get()) : checkAndGetColumn<ColumnMap>(arguments[0].column.get());
if (!col_map)
return nullptr;
throw Exception{"First argument for function " + getName() + " must be a map", ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT};
const auto & nested_column = col_map->getNestedColumn();
const auto & keys_data = col_map->getNestedData().getColumn(0);
/// Prepare arguments to call arrayIndex for check has the array element.
ColumnPtr column_array = ColumnArray::create(keys_data.getPtr(), nested_column.getOffsetsPtr());
ColumnsWithTypeAndName new_arguments =
{
{
ColumnArray::create(keys_data.getPtr(), nested_column.getOffsetsPtr()),
is_const ? ColumnConst::create(std::move(column_array), keys_data.size()) : std::move(column_array),
std::make_shared<DataTypeArray>(result_type),
""
},

View File

@ -4,7 +4,7 @@ OWNER(g:clickhouse)
LIBRARY()
ADDINCL(
contrib/libs/zstd
contrib/libs/zstd/include
contrib/restricted/fast_float
)

View File

@ -3,7 +3,7 @@ OWNER(g:clickhouse)
LIBRARY()
ADDINCL(
contrib/libs/zstd
contrib/libs/zstd/include
contrib/restricted/fast_float
)

View File

@ -1894,11 +1894,11 @@ void NO_INLINE Aggregator::mergeWithoutKeyStreamsImpl(
res = place;
}
if (block.rows() > 0)
for (size_t row = 0, rows = block.rows(); row < rows; ++row)
{
/// Adding Values
for (size_t i = 0; i < params.aggregates_size; ++i)
aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns[i])[0], result.aggregates_pool);
aggregate_functions[i]->merge(res + offsets_of_aggregate_states[i], (*aggregate_columns[i])[row], result.aggregates_pool);
}
/// Early release memory.

View File

@ -14,17 +14,18 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/IColumn.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ArrayJoinAction.h>
#include <Interpreters/Context.h>
#include <Interpreters/DictionaryReader.h>
#include <Interpreters/evaluateConstantExpression.h>
#include <Interpreters/ExpressionActions.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExternalDictionariesLoader.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/JoinSwitcher.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/Set.h>
#include <Interpreters/TableJoin.h>
#include <Interpreters/JoinSwitcher.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/DictionaryReader.h>
#include <Interpreters/Context.h>
#include <Processors/QueryPlan/ExpressionStep.h>
@ -476,7 +477,8 @@ bool ExpressionAnalyzer::makeAggregateDescriptions(ActionsDAGPtr & actions)
return !aggregates().empty();
}
void makeWindowDescriptionFromAST(const WindowDescriptions & existing_descriptions,
void makeWindowDescriptionFromAST(const Context & context,
const WindowDescriptions & existing_descriptions,
WindowDescription & desc, const IAST * ast)
{
const auto & definition = ast->as<const ASTWindowDefinition &>();
@ -568,20 +570,35 @@ void makeWindowDescriptionFromAST(const WindowDescriptions & existing_descriptio
desc.full_sort_description.insert(desc.full_sort_description.end(),
desc.order_by.begin(), desc.order_by.end());
if (definition.frame.type != WindowFrame::FrameType::Rows
&& definition.frame.type != WindowFrame::FrameType::Range)
if (definition.frame_type != WindowFrame::FrameType::Rows
&& definition.frame_type != WindowFrame::FrameType::Range)
{
std::string name = definition.frame.type == WindowFrame::FrameType::Rows
? "ROWS"
: definition.frame.type == WindowFrame::FrameType::Groups
? "GROUPS" : "RANGE";
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Window frame '{}' is not implemented (while processing '{}')",
name, ast->formatForErrorMessage());
WindowFrame::toString(definition.frame_type),
ast->formatForErrorMessage());
}
desc.frame = definition.frame;
desc.frame.is_default = definition.frame_is_default;
desc.frame.type = definition.frame_type;
desc.frame.begin_type = definition.frame_begin_type;
desc.frame.begin_preceding = definition.frame_begin_preceding;
desc.frame.end_type = definition.frame_end_type;
desc.frame.end_preceding = definition.frame_end_preceding;
if (definition.frame_end_type == WindowFrame::BoundaryType::Offset)
{
auto [value, _] = evaluateConstantExpression(definition.frame_end_offset,
context.shared_from_this());
desc.frame.end_offset = value;
}
if (definition.frame_begin_type == WindowFrame::BoundaryType::Offset)
{
auto [value, _] = evaluateConstantExpression(definition.frame_begin_offset,
context.shared_from_this());
desc.frame.begin_offset = value;
}
}
void ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr actions)
@ -607,7 +624,8 @@ void ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr actions)
const auto & elem = ptr->as<const ASTWindowListElement &>();
WindowDescription desc;
desc.window_name = elem.name;
makeWindowDescriptionFromAST(window_descriptions, desc, elem.definition.get());
makeWindowDescriptionFromAST(*getContext(), window_descriptions,
desc, elem.definition.get());
auto [it, inserted] = window_descriptions.insert(
{desc.window_name, desc});
@ -692,7 +710,8 @@ void ExpressionAnalyzer::makeWindowDescriptions(ActionsDAGPtr actions)
const ASTWindowDefinition &>();
WindowDescription desc;
desc.window_name = definition.getDefaultWindowName();
makeWindowDescriptionFromAST(window_descriptions, desc, &definition);
makeWindowDescriptionFromAST(*getContext(), window_descriptions,
desc, &definition);
auto [it, inserted] = window_descriptions.insert(
{desc.window_name, desc});

View File

@ -85,7 +85,8 @@ public:
/// If this is already an external table, you do not need to add anything. Just remember its presence.
auto temporary_table_name = getIdentifierName(subquery_or_table_name);
bool exists_in_local_map = external_tables.end() != external_tables.find(temporary_table_name);
bool exists_in_context = getContext()->tryResolveStorageID(StorageID("", temporary_table_name), Context::ResolveExternal);
bool exists_in_context = static_cast<bool>(getContext()->tryResolveStorageID(
StorageID("", temporary_table_name), Context::ResolveExternal));
if (exists_in_local_map || exists_in_context)
return;
}

View File

@ -44,7 +44,8 @@ BlockInputStreamPtr InterpreterExistsQuery::executeImpl()
{
if (exists_query->temporary)
{
result = getContext()->tryResolveStorageID({"", exists_query->table}, Context::ResolveExternal);
result = static_cast<bool>(getContext()->tryResolveStorageID(
{"", exists_query->table}, Context::ResolveExternal));
}
else
{

View File

@ -91,7 +91,20 @@ public:
{
if (should_add_column_predicate(column.name))
{
auto identifier = std::make_shared<ASTIdentifier>(std::vector<String>{it->first, column.name});
ASTPtr identifier;
if (it->first.empty())
/// We want tables from JOIN to have aliases.
/// But it is possible to set joined_subquery_requires_alias = 0,
/// and write a query like `select * FROM (SELECT 1), (SELECT 1), (SELECT 1)`.
/// If so, table name will be empty here.
///
/// We cannot create compound identifier with empty part (there is an assert).
/// So, try our luck and use only column name.
/// (Rewriting AST for JOIN is not an efficient design).
identifier = std::make_shared<ASTIdentifier>(column.name);
else
identifier = std::make_shared<ASTIdentifier>(std::vector<String>{it->first, column.name});
new_select_expression_list->children.emplace_back(std::move(identifier));
}
}

View File

@ -80,6 +80,9 @@ void QueryNormalizer::visit(ASTIdentifier & node, ASTPtr & ast, Data & data)
/// If it is an alias, but not a parent alias (for constructs like "SELECT column + 1 AS column").
auto it_alias = data.aliases.find(node.name());
if (!data.allow_self_aliases && current_alias == node.name())
throw Exception(ErrorCodes::CYCLIC_ALIASES, "Self referencing of {} to {}. Cyclic alias", backQuote(current_alias), backQuote(node.name()));
if (it_alias != data.aliases.end() && current_alias != node.name())
{
if (!IdentifierSemantic::canBeAlias(node))

View File

@ -48,18 +48,22 @@ public:
MapOfASTs finished_asts; /// already processed vertices (and by what they replaced)
SetOfASTs current_asts; /// vertices in the current call stack of this method
std::string current_alias; /// the alias referencing to the ancestor of ast (the deepest ancestor with aliases)
bool ignore_alias; /// normalize query without any aliases
const bool ignore_alias; /// normalize query without any aliases
Data(const Aliases & aliases_, const NameSet & source_columns_set_, bool ignore_alias_, ExtractedSettings && settings_)
/// It's Ok to have "c + 1 AS c" in queries, but not in table definition
const bool allow_self_aliases; /// for constructs like "SELECT column + 1 AS column"
Data(const Aliases & aliases_, const NameSet & source_columns_set_, bool ignore_alias_, ExtractedSettings && settings_, bool allow_self_aliases_)
: aliases(aliases_)
, source_columns_set(source_columns_set_)
, settings(settings_)
, level(0)
, ignore_alias(ignore_alias_)
, allow_self_aliases(allow_self_aliases_)
{}
};
QueryNormalizer(Data & data)
explicit QueryNormalizer(Data & data)
: visitor_data(data)
{}

View File

@ -0,0 +1,80 @@
#include <Interpreters/RewriteFunctionToSubcolumnVisitor.h>
#include <DataTypes/NestedUtils.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTLiteral.h>
namespace DB
{
namespace
{
ASTPtr transformToSubcolumn(const String & name_in_storage, const String & subcolumn_name)
{
return std::make_shared<ASTIdentifier>(Nested::concatenateName(name_in_storage, subcolumn_name));
}
ASTPtr transformEmptyToSubcolumn(const String & name_in_storage, const String & subcolumn_name)
{
auto ast = transformToSubcolumn(name_in_storage, subcolumn_name);
return makeASTFunction("equals", ast, std::make_shared<ASTLiteral>(0u));
}
ASTPtr transformNotEmptyToSubcolumn(const String & name_in_storage, const String & subcolumn_name)
{
auto ast = transformToSubcolumn(name_in_storage, subcolumn_name);
return makeASTFunction("notEquals", ast, std::make_shared<ASTLiteral>(0u));
}
ASTPtr transformIsNotNullToSubcolumn(const String & name_in_storage, const String & subcolumn_name)
{
auto ast = transformToSubcolumn(name_in_storage, subcolumn_name);
return makeASTFunction("not", ast);
}
ASTPtr transformCountNullableToSubcolumn(const String & name_in_storage, const String & subcolumn_name)
{
auto ast = transformToSubcolumn(name_in_storage, subcolumn_name);
return makeASTFunction("sum", makeASTFunction("not", ast));
}
const std::unordered_map<String, std::tuple<TypeIndex, String, decltype(&transformToSubcolumn)>> function_to_subcolumn =
{
{"length", {TypeIndex::Array, "size0", transformToSubcolumn}},
{"empty", {TypeIndex::Array, "size0", transformEmptyToSubcolumn}},
{"notEmpty", {TypeIndex::Array, "size0", transformNotEmptyToSubcolumn}},
{"isNull", {TypeIndex::Nullable, "null", transformToSubcolumn}},
{"isNotNull", {TypeIndex::Nullable, "null", transformIsNotNullToSubcolumn}},
{"count", {TypeIndex::Nullable, "null", transformCountNullableToSubcolumn}},
{"mapKeys", {TypeIndex::Map, "keys", transformToSubcolumn}},
{"mapValues", {TypeIndex::Map, "values", transformToSubcolumn}},
};
}
void RewriteFunctionToSubcolumnData::visit(ASTFunction & function, ASTPtr & ast) const
{
const auto & arguments = function.arguments->children;
if (arguments.size() != 1)
return;
const auto * identifier = arguments[0]->as<ASTIdentifier>();
if (!identifier)
return;
auto it = function_to_subcolumn.find(function.name);
if (it == function_to_subcolumn.end())
return;
const auto & [type_id, subcolumn_name, transformer] = it->second;
const auto & columns = metadata_snapshot->getColumns();
const auto & name_in_storage = identifier->name();
if (columns.has(name_in_storage)
&& columns.get(name_in_storage).type->getTypeId() == type_id)
{
ast = transformer(name_in_storage, subcolumn_name);
}
}
}

View File

@ -0,0 +1,24 @@
#pragma once
#include <Parsers/ASTFunction.h>
#include <Interpreters/InDepthNodeVisitor.h>
#include <Storages/StorageInMemoryMetadata.h>
namespace DB
{
/// Rewrites functions to subcolumns, if possible, to reduce amount of read data.
/// E.g. 'length(arr)' -> 'arr.size0', 'col IS NULL' -> 'col.null'
class RewriteFunctionToSubcolumnData
{
public:
using TypeToVisit = ASTFunction;
void visit(ASTFunction & function, ASTPtr & ast) const;
StorageMetadataPtr metadata_snapshot;
};
using RewriteFunctionToSubcolumnMatcher = OneTypeMatcher<RewriteFunctionToSubcolumnData>;
using RewriteFunctionToSubcolumnVisitor = InDepthNodeVisitor<RewriteFunctionToSubcolumnMatcher, true>;
}

View File

@ -79,6 +79,15 @@ bool StorageID::operator<(const StorageID & rhs) const
return !hasUUID();
}
bool StorageID::operator==(const StorageID & rhs) const
{
assertNotEmpty();
if (hasUUID() && rhs.hasUUID())
return uuid == rhs.uuid;
else
return std::tie(database_name, table_name) == std::tie(rhs.database_name, rhs.table_name);
}
String StorageID::getFullTableName() const
{
return backQuoteIfNeed(getDatabaseName()) + "." + backQuoteIfNeed(table_name);

View File

@ -54,7 +54,7 @@ struct StorageID
String getNameForLogs() const;
operator bool () const
explicit operator bool () const
{
return !empty();
}
@ -70,6 +70,7 @@ struct StorageID
}
bool operator<(const StorageID & rhs) const;
bool operator==(const StorageID & rhs) const;
void assertNotEmpty() const
{

View File

@ -1,6 +1,7 @@
#include <Core/Settings.h>
#include <Interpreters/TreeOptimizer.h>
#include <Interpreters/TreeRewriter.h>
#include <Interpreters/OptimizeIfChains.h>
#include <Interpreters/OptimizeIfWithConstantConditionVisitor.h>
#include <Interpreters/ArithmeticOperationsInAgrFuncOptimize.h>
@ -14,6 +15,7 @@
#include <Interpreters/MonotonicityCheckVisitor.h>
#include <Interpreters/ConvertStringsToEnumVisitor.h>
#include <Interpreters/PredicateExpressionsOptimizer.h>
#include <Interpreters/RewriteFunctionToSubcolumnVisitor.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExternalDictionariesLoader.h>
@ -27,7 +29,7 @@
#include <Parsers/ASTTablesInSelectQuery.h>
#include <Functions/FunctionFactory.h>
#include <Storages/StorageInMemoryMetadata.h>
#include <Storages/IStorage.h>
#include <Interpreters/RewriteSumIfFunctionVisitor.h>
@ -579,6 +581,12 @@ void transformIfStringsIntoEnum(ASTPtr & query)
ConvertStringsToEnumVisitor(convert_data).visit(query);
}
void optimizeFunctionsToSubcolumns(ASTPtr & query, const StorageMetadataPtr & metadata_snapshot)
{
RewriteFunctionToSubcolumnVisitor::Data data{metadata_snapshot};
RewriteFunctionToSubcolumnVisitor(data).visit(query);
}
}
void TreeOptimizer::optimizeIf(ASTPtr & query, Aliases & aliases, bool if_chain_to_multiif)
@ -590,10 +598,8 @@ void TreeOptimizer::optimizeIf(ASTPtr & query, Aliases & aliases, bool if_chain_
OptimizeIfChainsVisitor().visit(query);
}
void TreeOptimizer::apply(ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set,
const std::vector<TableWithColumnNamesAndTypes> & tables_with_columns,
ContextConstPtr context, const StorageMetadataPtr & metadata_snapshot,
bool & rewrite_subqueries)
void TreeOptimizer::apply(ASTPtr & query, TreeRewriterResult & result,
const std::vector<TableWithColumnNamesAndTypes> & tables_with_columns, ContextConstPtr context)
{
const auto & settings = context->getSettingsRef();
@ -601,17 +607,21 @@ void TreeOptimizer::apply(ASTPtr & query, Aliases & aliases, const NameSet & sou
if (!select_query)
throw Exception("Select analyze for not select asts.", ErrorCodes::LOGICAL_ERROR);
optimizeIf(query, aliases, settings.optimize_if_chain_to_multiif);
if (settings.optimize_functions_to_subcolumns && result.storage
&& result.storage->supportsSubcolumns() && result.metadata_snapshot)
optimizeFunctionsToSubcolumns(query, result.metadata_snapshot);
optimizeIf(query, result.aliases, settings.optimize_if_chain_to_multiif);
/// Move arithmetic operations out of aggregation functions
if (settings.optimize_arithmetic_operations_in_aggregate_functions)
optimizeAggregationFunctions(query);
/// Push the predicate expression down to the subqueries.
rewrite_subqueries = PredicateExpressionsOptimizer(context, tables_with_columns, settings).optimize(*select_query);
result.rewrite_subqueries = PredicateExpressionsOptimizer(context, tables_with_columns, settings).optimize(*select_query);
/// GROUP BY injective function elimination.
optimizeGroupBy(select_query, source_columns_set, context);
optimizeGroupBy(select_query, result.source_columns_set, context);
/// GROUP BY functions of other keys elimination.
if (settings.optimize_group_by_function_keys)
@ -658,7 +668,7 @@ void TreeOptimizer::apply(ASTPtr & query, Aliases & aliases, const NameSet & sou
/// Replace monotonous functions with its argument
if (settings.optimize_monotonous_functions_in_order_by)
optimizeMonotonousFunctionsInOrderBy(select_query, context, tables_with_columns,
metadata_snapshot ? metadata_snapshot->getSortingKeyColumns() : Names{});
result.metadata_snapshot ? result.metadata_snapshot->getSortingKeyColumns() : Names{});
/// Remove duplicate items from ORDER BY.
/// Execute it after all order by optimizations,

View File

@ -8,8 +8,7 @@
namespace DB
{
struct StorageInMemoryMetadata;
using StorageMetadataPtr = std::shared_ptr<const StorageInMemoryMetadata>;
struct TreeRewriterResult;
/// Part of of Tree Rewriter (SyntaxAnalyzer) that optimizes AST.
/// Query should be ready to execute either before either after it. But resulting query could be faster.
@ -18,12 +17,9 @@ class TreeOptimizer
public:
static void apply(
ASTPtr & query,
Aliases & aliases,
const NameSet & source_columns_set,
TreeRewriterResult & result,
const std::vector<TableWithColumnNamesAndTypes> & tables_with_columns,
ContextConstPtr context,
const StorageMetadataPtr & metadata_snapshot,
bool & rewrite_subqueries);
ContextConstPtr context);
static void optimizeIf(ASTPtr & query, Aliases & aliases, bool if_chain_to_multiif);
};

View File

@ -913,7 +913,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
all_source_columns_set.insert(name);
}
normalize(query, result.aliases, all_source_columns_set, select_options.ignore_alias, settings);
normalize(query, result.aliases, all_source_columns_set, select_options.ignore_alias, settings, /* allow_self_aliases = */ true);
/// Remove unneeded columns according to 'required_result_columns'.
/// Leave all selected columns in case of DISTINCT; columns that contain arrayJoin function inside.
@ -924,8 +924,7 @@ TreeRewriterResultPtr TreeRewriter::analyzeSelect(
/// Executing scalar subqueries - replacing them with constant values.
executeScalarSubqueries(query, getContext(), subquery_depth, result.scalars, select_options.only_analyze);
TreeOptimizer::apply(
query, result.aliases, source_columns_set, tables_with_columns, getContext(), result.metadata_snapshot, result.rewrite_subqueries);
TreeOptimizer::apply(query, result, tables_with_columns, getContext());
/// array_join_alias_to_name, array_join_result_to_source.
getArrayJoinedColumns(query, result, select_query, result.source_columns, source_columns_set);
@ -959,7 +958,8 @@ TreeRewriterResultPtr TreeRewriter::analyze(
const NamesAndTypesList & source_columns,
ConstStoragePtr storage,
const StorageMetadataPtr & metadata_snapshot,
bool allow_aggregations) const
bool allow_aggregations,
bool allow_self_aliases) const
{
if (query->as<ASTSelectQuery>())
throw Exception("Not select analyze for select asts.", ErrorCodes::LOGICAL_ERROR);
@ -968,7 +968,7 @@ TreeRewriterResultPtr TreeRewriter::analyze(
TreeRewriterResult result(source_columns, storage, metadata_snapshot, false);
normalize(query, result.aliases, result.source_columns_set, false, settings);
normalize(query, result.aliases, result.source_columns_set, false, settings, allow_self_aliases);
/// Executing scalar subqueries. Column defaults could be a scalar subquery.
executeScalarSubqueries(query, getContext(), 0, result.scalars, false);
@ -994,7 +994,7 @@ TreeRewriterResultPtr TreeRewriter::analyze(
}
void TreeRewriter::normalize(
ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings)
ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings, bool allow_self_aliases)
{
CustomizeCountDistinctVisitor::Data data_count_distinct{settings.count_distinct_implementation};
CustomizeCountDistinctVisitor(data_count_distinct).visit(query);
@ -1054,7 +1054,7 @@ void TreeRewriter::normalize(
FunctionNameNormalizer().visit(query.get());
/// Common subexpression elimination. Rewrite rules.
QueryNormalizer::Data normalizer_data(aliases, source_columns_set, ignore_alias, settings);
QueryNormalizer::Data normalizer_data(aliases, source_columns_set, ignore_alias, settings, allow_self_aliases);
QueryNormalizer(normalizer_data).visit(query);
}

View File

@ -103,7 +103,8 @@ public:
const NamesAndTypesList & source_columns_,
ConstStoragePtr storage = {},
const StorageMetadataPtr & metadata_snapshot = {},
bool allow_aggregations = false) const;
bool allow_aggregations = false,
bool allow_self_aliases = true) const;
/// Analyze and rewrite select query
TreeRewriterResultPtr analyzeSelect(
@ -115,7 +116,7 @@ public:
std::shared_ptr<TableJoin> table_join = {}) const;
private:
static void normalize(ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings);
static void normalize(ASTPtr & query, Aliases & aliases, const NameSet & source_columns_set, bool ignore_alias, const Settings & settings, bool allow_self_aliases);
};
}

View File

@ -9,6 +9,21 @@
using namespace DB;
TEST(QueryNormalizer, SimpleLoopAlias)
{
String query = "a as a";
ParserExpressionList parser(false);
ASTPtr ast = parseQuery(parser, query, 0, 0);
Aliases aliases;
aliases["a"] = parseQuery(parser, "a as a", 0, 0)->children[0];
Settings settings;
QueryNormalizer::Data normalizer_data(aliases, {}, false, settings, false);
EXPECT_THROW(QueryNormalizer(normalizer_data).visit(ast), Exception);
}
TEST(QueryNormalizer, SimpleCycleAlias)
{
String query = "a as b, b as a";
@ -20,6 +35,6 @@ TEST(QueryNormalizer, SimpleCycleAlias)
aliases["b"] = parseQuery(parser, "a as b", 0, 0)->children[0];
Settings settings;
QueryNormalizer::Data normalizer_data(aliases, {}, false, settings);
QueryNormalizer::Data normalizer_data(aliases, {}, false, settings, true);
EXPECT_THROW(QueryNormalizer(normalizer_data).visit(ast), Exception);
}

View File

@ -113,8 +113,14 @@ void ASTIdentifier::formatImplWithoutAlias(const FormatSettings & settings, Form
if (i != 0)
settings.ostr << '.';
if (name_parts[i].empty())
children[j++]->formatImpl(settings, state, frame);
/// Some AST rewriting code, like IdentifierSemantic::setColumnLongName,
/// does not respect children of identifier.
/// Here we also ignore children if they are empty.
if (name_parts[i].empty() && j < children.size())
{
children[j]->formatImpl(settings, state, frame);
++j;
}
else
format_element(name_parts[i]);
}
@ -122,7 +128,7 @@ void ASTIdentifier::formatImplWithoutAlias(const FormatSettings & settings, Form
else
{
const auto & name = shortName();
if (name.empty())
if (name.empty() && !children.empty())
children.front()->formatImpl(settings, state, frame);
else
format_element(name);

View File

@ -26,7 +26,24 @@ ASTPtr ASTWindowDefinition::clone() const
result->children.push_back(result->order_by);
}
result->frame = frame;
result->frame_is_default = frame_is_default;
result->frame_type = frame_type;
result->frame_begin_type = frame_begin_type;
result->frame_begin_preceding = frame_begin_preceding;
result->frame_end_type = frame_end_type;
result->frame_end_preceding = frame_end_preceding;
if (frame_begin_offset)
{
result->frame_begin_offset = frame_begin_offset->clone();
result->children.push_back(result->frame_begin_offset);
}
if (frame_end_offset)
{
result->frame_end_offset = frame_end_offset->clone();
result->children.push_back(result->frame_end_offset);
}
return result;
}
@ -75,44 +92,42 @@ void ASTWindowDefinition::formatImpl(const FormatSettings & settings,
need_space = true;
}
if (!frame.is_default)
if (!frame_is_default)
{
if (need_space)
{
settings.ostr << " ";
}
settings.ostr << WindowFrame::toString(frame.type) << " BETWEEN ";
if (frame.begin_type == WindowFrame::BoundaryType::Current)
settings.ostr << WindowFrame::toString(frame_type) << " BETWEEN ";
if (frame_begin_type == WindowFrame::BoundaryType::Current)
{
settings.ostr << "CURRENT ROW";
}
else if (frame.begin_type == WindowFrame::BoundaryType::Unbounded)
else if (frame_begin_type == WindowFrame::BoundaryType::Unbounded)
{
settings.ostr << "UNBOUNDED PRECEDING";
}
else
{
settings.ostr << applyVisitor(FieldVisitorToString(),
frame.begin_offset);
frame_begin_offset->formatImpl(settings, state, format_frame);
settings.ostr << " "
<< (!frame.begin_preceding ? "FOLLOWING" : "PRECEDING");
<< (!frame_begin_preceding ? "FOLLOWING" : "PRECEDING");
}
settings.ostr << " AND ";
if (frame.end_type == WindowFrame::BoundaryType::Current)
if (frame_end_type == WindowFrame::BoundaryType::Current)
{
settings.ostr << "CURRENT ROW";
}
else if (frame.end_type == WindowFrame::BoundaryType::Unbounded)
else if (frame_end_type == WindowFrame::BoundaryType::Unbounded)
{
settings.ostr << "UNBOUNDED FOLLOWING";
}
else
{
settings.ostr << applyVisitor(FieldVisitorToString(),
frame.end_offset);
frame_end_offset->formatImpl(settings, state, format_frame);
settings.ostr << " "
<< (!frame.end_preceding ? "FOLLOWING" : "PRECEDING");
<< (!frame_end_preceding ? "FOLLOWING" : "PRECEDING");
}
}
}

View File

@ -16,8 +16,14 @@ struct ASTWindowDefinition : public IAST
ASTPtr order_by;
WindowFrame frame;
bool frame_is_default = true;
WindowFrame::FrameType frame_type = WindowFrame::FrameType::Range;
WindowFrame::BoundaryType frame_begin_type = WindowFrame::BoundaryType::Unbounded;
ASTPtr frame_begin_offset;
bool frame_begin_preceding = true;
WindowFrame::BoundaryType frame_end_type = WindowFrame::BoundaryType::Current;
ASTPtr frame_end_offset;
bool frame_end_preceding = false;
ASTPtr clone() const override;

View File

@ -539,23 +539,23 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
ParserKeyword keyword_groups("GROUPS");
ParserKeyword keyword_range("RANGE");
node->frame.is_default = false;
node->frame_is_default = false;
if (keyword_rows.ignore(pos, expected))
{
node->frame.type = WindowFrame::FrameType::Rows;
node->frame_type = WindowFrame::FrameType::Rows;
}
else if (keyword_groups.ignore(pos, expected))
{
node->frame.type = WindowFrame::FrameType::Groups;
node->frame_type = WindowFrame::FrameType::Groups;
}
else if (keyword_range.ignore(pos, expected))
{
node->frame.type = WindowFrame::FrameType::Range;
node->frame_type = WindowFrame::FrameType::Range;
}
else
{
/* No frame clause. */
node->frame.is_default = true;
node->frame_is_default = true;
return true;
}
@ -574,21 +574,19 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
if (keyword_current_row.ignore(pos, expected))
{
node->frame.begin_type = WindowFrame::BoundaryType::Current;
node->frame_begin_type = WindowFrame::BoundaryType::Current;
}
else
{
ParserLiteral parser_literal;
ASTPtr ast_literal;
ParserExpression parser_expression;
if (keyword_unbounded.ignore(pos, expected))
{
node->frame.begin_type = WindowFrame::BoundaryType::Unbounded;
node->frame_begin_type = WindowFrame::BoundaryType::Unbounded;
}
else if (parser_literal.parse(pos, ast_literal, expected))
else if (parser_expression.parse(pos, node->frame_begin_offset, expected))
{
const Field & value = ast_literal->as<ASTLiteral &>().value;
node->frame.begin_offset = value;
node->frame.begin_type = WindowFrame::BoundaryType::Offset;
// We will evaluate the expression for offset expression later.
node->frame_begin_type = WindowFrame::BoundaryType::Offset;
}
else
{
@ -597,12 +595,12 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
if (keyword_preceding.ignore(pos, expected))
{
node->frame.begin_preceding = true;
node->frame_begin_preceding = true;
}
else if (keyword_following.ignore(pos, expected))
{
node->frame.begin_preceding = false;
if (node->frame.begin_type == WindowFrame::BoundaryType::Unbounded)
node->frame_begin_preceding = false;
if (node->frame_begin_type == WindowFrame::BoundaryType::Unbounded)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame start cannot be UNBOUNDED FOLLOWING");
@ -623,21 +621,19 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
if (keyword_current_row.ignore(pos, expected))
{
node->frame.end_type = WindowFrame::BoundaryType::Current;
node->frame_end_type = WindowFrame::BoundaryType::Current;
}
else
{
ParserLiteral parser_literal;
ASTPtr ast_literal;
ParserExpression parser_expression;
if (keyword_unbounded.ignore(pos, expected))
{
node->frame.end_type = WindowFrame::BoundaryType::Unbounded;
node->frame_end_type = WindowFrame::BoundaryType::Unbounded;
}
else if (parser_literal.parse(pos, ast_literal, expected))
else if (parser_expression.parse(pos, node->frame_end_offset, expected))
{
const Field & value = ast_literal->as<ASTLiteral &>().value;
node->frame.end_offset = value;
node->frame.end_type = WindowFrame::BoundaryType::Offset;
// We will evaluate the expression for offset expression later.
node->frame_end_type = WindowFrame::BoundaryType::Offset;
}
else
{
@ -646,8 +642,8 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
if (keyword_preceding.ignore(pos, expected))
{
node->frame.end_preceding = true;
if (node->frame.end_type == WindowFrame::BoundaryType::Unbounded)
node->frame_end_preceding = true;
if (node->frame_end_type == WindowFrame::BoundaryType::Unbounded)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Frame end cannot be UNBOUNDED PRECEDING");
@ -656,7 +652,7 @@ static bool tryParseFrameDefinition(ASTWindowDefinition * node, IParser::Pos & p
else if (keyword_following.ignore(pos, expected))
{
// Positive offset or UNBOUNDED FOLLOWING.
node->frame.end_preceding = false;
node->frame_end_preceding = false;
}
else
{

View File

@ -590,7 +590,7 @@ Block validateColumnsDefaultsAndGetSampleBlock(ASTPtr default_expr_list, const N
try
{
auto syntax_analyzer_result = TreeRewriter(context).analyze(default_expr_list, all_columns);
auto syntax_analyzer_result = TreeRewriter(context).analyze(default_expr_list, all_columns, {}, {}, false, /* allow_self_aliases = */ false);
const auto actions = ExpressionAnalyzer(default_expr_list, syntax_analyzer_result, context).getActions(true);
for (const auto & action : actions->getActions())
if (action.node->type == ActionsDAG::ActionType::ARRAY_JOIN)

View File

@ -417,7 +417,11 @@ void DistributedBlockOutputStream::writeSync(const Block & block)
/// Deferred initialization. Only for sync insertion.
initWritingJobs(block, start, end);
pool.emplace(remote_jobs_count + local_jobs_count);
size_t jobs_count = remote_jobs_count + local_jobs_count;
size_t max_threads = std::min<size_t>(settings.max_distributed_connections, jobs_count);
pool.emplace(/* max_threads_= */ max_threads,
/* max_free_threads_= */ max_threads,
/* queue_size_= */ jobs_count);
if (!throttler && (settings.max_network_bandwidth || settings.max_network_bytes))
{

View File

@ -8,9 +8,8 @@
namespace DB
{
MergeListElement::MergeListElement(const std::string & database_, const std::string & table_, const FutureMergedMutatedPart & future_part)
: database{database_}
, table{table_}
MergeListElement::MergeListElement(const StorageID & table_id_, const FutureMergedMutatedPart & future_part)
: table_id{table_id_}
, partition_id{future_part.part_info.partition_id}
, result_part_name{future_part.name}
, result_part_path{future_part.path}
@ -60,8 +59,8 @@ MergeListElement::MergeListElement(const std::string & database_, const std::str
MergeInfo MergeListElement::getInfo() const
{
MergeInfo res;
res.database = database;
res.table = table;
res.database = table_id.getDatabaseName();
res.table = table_id.getTableName();
res.result_part_name = result_part_name;
res.result_part_path = result_part_path;
res.partition_id = partition_id;

View File

@ -8,6 +8,7 @@
#include <Storages/MergeTree/MergeType.h>
#include <Storages/MergeTree/MergeAlgorithm.h>
#include <Storages/MergeTree/BackgroundProcessList.h>
#include <Interpreters/StorageID.h>
#include <boost/noncopyable.hpp>
#include <memory>
#include <list>
@ -54,8 +55,7 @@ struct FutureMergedMutatedPart;
struct MergeListElement : boost::noncopyable
{
const std::string database;
const std::string table;
const StorageID table_id;
std::string partition_id;
const std::string result_part_name;
@ -94,7 +94,7 @@ struct MergeListElement : boost::noncopyable
/// Detected after merge already started
std::atomic<MergeAlgorithm> merge_algorithm;
MergeListElement(const std::string & database, const std::string & table, const FutureMergedMutatedPart & future_part);
MergeListElement(const StorageID & table_id_, const FutureMergedMutatedPart & future_part);
MergeInfo getInfo() const;
@ -122,12 +122,13 @@ public:
--merges_with_ttl_counter;
}
void cancelPartMutations(const String & partition_id, Int64 mutation_version)
void cancelPartMutations(const StorageID & table_id, const String & partition_id, Int64 mutation_version)
{
std::lock_guard lock{mutex};
for (auto & merge_element : entries)
{
if ((partition_id.empty() || merge_element.partition_id == partition_id)
&& merge_element.table_id == table_id
&& merge_element.source_data_version < mutation_version
&& merge_element.result_data_version >= mutation_version)
merge_element.is_cancelled = true;

View File

@ -234,7 +234,6 @@ QueryPlanPtr MergeTreeDataSelectExecutor::read(
select.setExpression(ASTSelectQuery::Expression::WHERE, given_select.where()->clone());
if (given_select.prewhere())
select.setExpression(ASTSelectQuery::Expression::WHERE, given_select.prewhere()->clone());
// TODO will row policy filter work?
// After overriding the group by clause, we finish the possible aggregations directly
if (processed_stage >= QueryProcessingStage::Enum::WithMergeableState && given_select.groupBy())

View File

@ -14,11 +14,28 @@
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
namespace
{
/// This is a special visitor which is used to get partition ID.
/// Calculate hash for UUID the same way as for UInt128.
/// It worked this way until 21.5, and we cannot change it,
/// or partition ID will be different in case UUID is used in partition key.
/// (It is not recommended to use UUID as partition key).
class LegacyFieldVisitorHash : public FieldVisitorHash
{
public:
using FieldVisitorHash::FieldVisitorHash;
using FieldVisitorHash::operator();
void operator() (const UUID & x) const { FieldVisitorHash::operator()(x.toUnderType()); }
};
}
static std::unique_ptr<ReadBufferFromFileBase> openForReading(const DiskPtr & disk, const String & path)
{
return disk->readFile(path, std::min(size_t(DBMS_DEFAULT_BUFFER_SIZE), disk->getFileSize(path)));
@ -74,7 +91,7 @@ String MergeTreePartition::getID(const Block & partition_key_sample) const
}
SipHash hash;
FieldVisitorHash hashing_visitor(hash);
LegacyFieldVisitorHash hashing_visitor(hash);
for (const Field & field : value)
applyVisitor(hashing_visitor, field);

View File

@ -1042,7 +1042,7 @@ ClusterPtr StorageDistributed::skipUnusedShards(
if (!limit)
{
LOG_TRACE(log,
LOG_DEBUG(log,
"Number of values for sharding key exceeds optimize_skip_unused_shards_limit={}, "
"try to increase it, but note that this may increase query processing time.",
local_context->getSettingsRef().optimize_skip_unused_shards_limit);

View File

@ -625,7 +625,7 @@ CancellationCode StorageMergeTree::killMutation(const String & mutation_id)
if (!to_kill)
return CancellationCode::NotFound;
getContext()->getMergeList().cancelPartMutations({}, to_kill->block_number);
getContext()->getMergeList().cancelPartMutations(getStorageID(), {}, to_kill->block_number);
to_kill->removeFile();
LOG_TRACE(log, "Cancelled part mutations and removed mutation file {}", mutation_id);
{
@ -817,9 +817,8 @@ bool StorageMergeTree::mergeSelectedParts(
auto & future_part = merge_mutate_entry.future_part;
Stopwatch stopwatch;
MutableDataPartPtr new_part;
auto table_id = getStorageID();
auto merge_list_entry = getContext()->getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
auto merge_list_entry = getContext()->getMergeList().insert(getStorageID(), future_part);
auto write_part_log = [&] (const ExecutionStatus & execution_status)
{
@ -964,9 +963,8 @@ std::shared_ptr<StorageMergeTree::MergeMutateSelectedEntry> StorageMergeTree::se
bool StorageMergeTree::mutateSelectedPart(const StorageMetadataPtr & metadata_snapshot, MergeMutateSelectedEntry & merge_mutate_entry, TableLockHolder & table_lock_holder)
{
auto & future_part = merge_mutate_entry.future_part;
auto table_id = getStorageID();
auto merge_list_entry = getContext()->getMergeList().insert(table_id.database_name, table_id.table_name, future_part);
auto merge_list_entry = getContext()->getMergeList().insert(getStorageID(), future_part);
Stopwatch stopwatch;
MutableDataPartPtr new_part;

View File

@ -1726,7 +1726,7 @@ bool StorageReplicatedMergeTree::tryExecuteMerge(const LogEntry & entry)
auto table_id = getStorageID();
/// Add merge to list
MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(table_id.database_name, table_id.table_name, future_merged_part);
MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(getStorageID(), future_merged_part);
Transaction transaction(*this);
MutableDataPartPtr part;
@ -1871,9 +1871,7 @@ bool StorageReplicatedMergeTree::tryExecutePartMutation(const StorageReplicatedM
future_mutated_part.updatePath(*this, reserved_space);
future_mutated_part.type = source_part->getType();
auto table_id = getStorageID();
MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(
table_id.database_name, table_id.table_name, future_mutated_part);
MergeList::EntryPtr merge_entry = getContext()->getMergeList().insert(getStorageID(), future_mutated_part);
Stopwatch stopwatch;
@ -5934,7 +5932,7 @@ CancellationCode StorageReplicatedMergeTree::killMutation(const String & mutatio
{
const String & partition_id = pair.first;
Int64 block_number = pair.second;
getContext()->getMergeList().cancelPartMutations(partition_id, block_number);
getContext()->getMergeList().cancelPartMutations(getStorageID(), partition_id, block_number);
}
return CancellationCode::CancelSent;
}

View File

@ -1852,7 +1852,7 @@ class ClickHouseInstance:
wait_duration = time.time() - start_time
logging.debug('{} log line matching "{}" appeared in a {} seconds'.format(repetitions, regexp, wait_duration))
logging.debug('{} log line(s) matching "{}" appeared in a {:.3f} seconds'.format(repetitions, regexp, wait_duration))
return wait_duration
def file_exists(self, path):
@ -2188,6 +2188,7 @@ class ClickHouseInstance:
odbc_bridge_volume = "- " + self.odbc_bridge_bin_path + ":/usr/share/clickhouse-odbc-bridge_fresh"
library_bridge_volume = "- " + self.library_bridge_bin_path + ":/usr/share/clickhouse-library-bridge_fresh"
with open(self.docker_compose_path, 'w') as docker_compose:
docker_compose.write(DOCKER_COMPOSE_TEMPLATE.format(
image=self.image,

View File

@ -0,0 +1,21 @@
<?xml version="1.0"?>
<yandex>
<remote_servers>
<source_trivial_cluster>
<shard>
<replica>
<host>first_trivial</host>
<port>9000</port>
</replica>
</shard>
</source_trivial_cluster>
<destination_trivial_cluster>
<shard>
<replica>
<host>second_trivial</host>
<port>9000</port>
</replica>
</shard>
</destination_trivial_cluster>
</remote_servers>
</yandex>

View File

@ -1,6 +1,6 @@
<yandex>
<logger>
<level>trace</level>
<level>information</level>
<log>/var/log/clickhouse-server/copier/log.log</log>
<errorlog>/var/log/clickhouse-server/copier/log.err.log</errorlog>
<size>1000M</size>

View File

@ -0,0 +1,28 @@
<?xml version="1.0"?>
<yandex>
<remote_servers>
<events>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>first</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>second</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>third</host>
<port>9000</port>
</replica>
</shard>
</events>
</remote_servers>
</yandex>

View File

@ -0,0 +1,6 @@
<?xml version="1.0"?>
<yandex>
<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path>
</distributed_ddl>
</yandex>

View File

@ -0,0 +1,28 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>information</level>
<log>/var/log/clickhouse-server/copier/log.log</log>
<errorlog>/var/log/clickhouse-server/copier/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/copier/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/copier/stdout.log</stdout>
</logger>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<node index="2">
<host>zoo2</host>
<port>2181</port>
</node>
<node index="3">
<host>zoo3</host>
<port>2181</port>
</node>
<session_timeout_ms>2000</session_timeout_ms>
</zookeeper>
</yandex>

View File

@ -0,0 +1,32 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<log_queries>1</log_queries>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
<dbuser>
<password>12345678</password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</dbuser>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,23 @@
<?xml version="1.0"?>
<yandex>
<remote_servers>
<source>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>first_of_two</host>
<port>9000</port>
</replica>
</shard>
</source>
<destination>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>second_of_two</host>
<port>9000</port>
</replica>
</shard>
</destination>
</remote_servers>
</yandex>

View File

@ -0,0 +1,6 @@
<?xml version="1.0"?>
<yandex>
<distributed_ddl>
<path>/clickhouse/task_queue/ddl</path>
</distributed_ddl>
</yandex>

View File

@ -0,0 +1,34 @@
<yandex>
<storage_configuration>
<disks>
<default>
</default>
<jbod1>
<path>/jbod1/</path>
</jbod1>
<jbod2>
<path>/jbod2/</path>
</jbod2>
<external>
<path>/external/</path>
</external>
</disks>
<policies>
<external_with_jbods>
<volumes>
<external>
<disk>external</disk>
</external>
<main>
<disk>jbod1</disk>
<disk>jbod2</disk>
</main>
</volumes>
</external_with_jbods>
</policies>
</storage_configuration>
</yandex>

View File

@ -0,0 +1,20 @@
<?xml version="1.0"?>
<yandex>
<logger>
<level>information</level>
<log>/var/log/clickhouse-server/copier/log.log</log>
<errorlog>/var/log/clickhouse-server/copier/log.err.log</errorlog>
<size>1000M</size>
<count>10</count>
<stderr>/var/log/clickhouse-server/copier/stderr.log</stderr>
<stdout>/var/log/clickhouse-server/copier/stdout.log</stdout>
</logger>
<zookeeper>
<node index="1">
<host>zoo1</host>
<port>2181</port>
</node>
<session_timeout_ms>2000</session_timeout_ms>
</zookeeper>
</yandex>

View File

@ -0,0 +1,32 @@
<?xml version="1.0"?>
<yandex>
<profiles>
<default>
<log_queries>1</log_queries>
</default>
</profiles>
<users>
<default>
<password></password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</default>
<dbuser>
<password>12345678</password>
<networks incl="networks" replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
<quota>default</quota>
</dbuser>
</users>
<quotas>
<default>
</default>
</quotas>
</yandex>

View File

@ -0,0 +1,42 @@
<?xml version="1.0"?>
<yandex>
<remote_servers>
<source>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>first_of_two</host>
<port>9000</port>
</replica>
</shard>
</source>
<destination>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>second_of_two</host>
<port>9000</port>
</replica>
</shard>
</destination>
</remote_servers>
<max_workers>2</max_workers>
<tables>
<table_events>
<cluster_pull>source</cluster_pull>
<database_pull>db_drop_target_partition</database_pull>
<table_pull>source</table_pull>
<cluster_push>destination</cluster_push>
<database_push>db_drop_target_partition</database_push>
<table_push>destination</table_push>
<allow_to_drop_target_partitions>true</allow_to_drop_target_partitions>
<engine>ENGINE = MergeTree() PARTITION BY toYYYYMMDD(Column3) ORDER BY (Column3, Column2, Column1)</engine>
<sharding_key>rand()</sharding_key>
</table_events>
</tables>
</yandex>

View File

@ -0,0 +1,40 @@
<?xml version="1.0"?>
<yandex>
<remote_servers>
<source>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>first_of_two</host>
<port>9000</port>
</replica>
</shard>
</source>
<destination>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>second_of_two</host>
<port>9000</port>
</replica>
</shard>
</destination>
</remote_servers>
<max_workers>2</max_workers>
<tables>
<table_events>
<cluster_pull>source</cluster_pull>
<database_pull>db_skip_index</database_pull>
<table_pull>source</table_pull>
<cluster_push>destination</cluster_push>
<database_push>db_skip_index</database_push>
<table_push>destination</table_push>
<engine>ENGINE = MergeTree() PARTITION BY toYYYYMMDD(Column3) ORDER BY (Column3, Column2, Column1)</engine>
<sharding_key>rand()</sharding_key>
</table_events>
</tables>
</yandex>

View File

@ -0,0 +1,43 @@
<?xml version="1.0"?>
<yandex>
<remote_servers>
<events>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>first</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>second</host>
<port>9000</port>
</replica>
</shard>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>third</host>
<port>9000</port>
</replica>
</shard>
</events>
</remote_servers>
<max_workers>2</max_workers>
<tables>
<table_events>
<cluster_pull>events</cluster_pull>
<database_pull>dailyhistory</database_pull>
<table_pull>yellow_tripdata_staging</table_pull>
<cluster_push>events</cluster_push>
<database_push>monthlyhistory</database_push>
<table_push>yellow_tripdata_staging</table_push>
<engine>Engine=ReplacingMergeTree() PRIMARY KEY (tpep_pickup_datetime, id) ORDER BY (tpep_pickup_datetime, id) PARTITION BY (pickup_location_id, toYYYYMM(tpep_pickup_datetime))</engine>
<sharding_key>sipHash64(id) % 3</sharding_key>
</table_events>
</tables>
</yandex>

View File

@ -44,7 +44,7 @@
<source_trivial_cluster>
<shard>
<replica>
<host>s0_0_0</host>
<host>first_trivial</host>
<port>9000</port>
</replica>
</shard>
@ -54,11 +54,11 @@
<destination_trivial_cluster>
<shard>
<replica>
<host>s1_0_0</host>
<host>second_trivial</host>
<port>9000</port>
</replica>
</shard>
</destination_trivial_cluster>
</remote_servers>
</yandex>
</yandex>

View File

@ -0,0 +1,64 @@
<?xml version="1.0"?>
<yandex>
<!-- How many simualteneous workers are posssible -->
<max_workers>3</max_workers>
<!-- Common setting for pull and push operations -->
<settings>
<connect_timeout>1</connect_timeout>
</settings>
<!-- Setting used to fetch data -->
<settings_pull>
<max_rows_in_distinct>0</max_rows_in_distinct>
</settings_pull>
<!-- Setting used to insert data -->
<settings_push>
</settings_push>
<!-- Tasks -->
<tables>
<hits>
<cluster_pull>source_trivial_cluster</cluster_pull>
<database_pull>default</database_pull>
<table_pull>trivial_without_arguments</table_pull>
<cluster_push>destination_trivial_cluster</cluster_push>
<database_push>default</database_push>
<table_push>trivial_without_arguments</table_push>
<!-- Engine of destination tables -->
<engine>ENGINE=ReplicatedMergeTree() PARTITION BY d % 5 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16</engine>
<!-- Which sarding key to use while copying -->
<sharding_key>d + 1</sharding_key>
<!-- Optional expression that filter copying data -->
<where_condition>d - d = 0</where_condition>
</hits>
</tables>
<!-- Configuration of clusters -->
<remote_servers>
<source_trivial_cluster>
<shard>
<replica>
<host>first_trivial</host>
<port>9000</port>
</replica>
</shard>
</source_trivial_cluster>
<destination_trivial_cluster>
<shard>
<replica>
<host>second_trivial</host>
<port>9000</port>
</replica>
</shard>
</destination_trivial_cluster>
</remote_servers>
</yandex>

View File

@ -0,0 +1,40 @@
<?xml version="1.0"?>
<yandex>
<remote_servers>
<source>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>first_of_two</host>
<port>9000</port>
</replica>
</shard>
</source>
<destination>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>second_of_two</host>
<port>9000</port>
</replica>
</shard>
</destination>
</remote_servers>
<max_workers>2</max_workers>
<tables>
<table_events>
<cluster_pull>source</cluster_pull>
<database_pull>db_ttl_columns</database_pull>
<table_pull>source</table_pull>
<cluster_push>destination</cluster_push>
<database_push>db_ttl_columns</database_push>
<table_push>destination</table_push>
<engine>ENGINE = MergeTree() PARTITION BY toYYYYMMDD(Column3) ORDER BY (Column3, Column2, Column1)</engine>
<sharding_key>rand()</sharding_key>
</table_events>
</tables>
</yandex>

View File

@ -0,0 +1,40 @@
<?xml version="1.0"?>
<yandex>
<remote_servers>
<source>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>first_of_two</host>
<port>9000</port>
</replica>
</shard>
</source>
<destination>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>second_of_two</host>
<port>9000</port>
</replica>
</shard>
</destination>
</remote_servers>
<max_workers>2</max_workers>
<tables>
<table_events>
<cluster_pull>source</cluster_pull>
<database_pull>db_move_to_volume</database_pull>
<table_pull>source</table_pull>
<cluster_push>destination</cluster_push>
<database_push>db_move_to_volume</database_push>
<table_push>destination</table_push>
<engine>ENGINE = MergeTree() PARTITION BY toYYYYMMDD(Column3) ORDER BY (Column3, Column2, Column1) TTL Column3 + INTERVAL 1 MONTH TO VOLUME 'external' SETTINGS storage_policy = 'external_with_jbods'</engine>
<sharding_key>rand()</sharding_key>
</table_events>
</tables>
</yandex>

View File

@ -0,0 +1,40 @@
<?xml version="1.0"?>
<yandex>
<remote_servers>
<source>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>first_of_two</host>
<port>9000</port>
</replica>
</shard>
</source>
<destination>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>second_of_two</host>
<port>9000</port>
</replica>
</shard>
</destination>
</remote_servers>
<max_workers>2</max_workers>
<tables>
<table_events>
<cluster_pull>source</cluster_pull>
<database_pull>db_different_schema</database_pull>
<table_pull>source</table_pull>
<cluster_push>destination</cluster_push>
<database_push>db_different_schema</database_push>
<table_push>destination</table_push>
<engine>ENGINE = MergeTree() PARTITION BY toYYYYMMDD(Column3) ORDER BY (Column9, Column1, Column2, Column3, Column4)</engine>
<sharding_key>rand()</sharding_key>
</table_events>
</tables>
</yandex>

View File

@ -2,21 +2,26 @@ import os
import random
import sys
import time
from contextlib import contextmanager
import docker
import kazoo
import pytest
import string
import random
from contextlib import contextmanager
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
import docker
CURRENT_TEST_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(CURRENT_TEST_DIR))
COPYING_FAIL_PROBABILITY = 0.2
MOVING_FAIL_PROBABILITY = 0.2
cluster = ClickHouseCluster(__file__)
cluster = ClickHouseCluster(__file__, name='copier_test')
def generateRandomString(count):
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(count))
def check_all_hosts_sucesfully_executed(tsv_content, num_hosts):
@ -72,8 +77,13 @@ class Task1:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_simple"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task0_description.xml'), 'r').read()
self.zk_task_path = "/clickhouse-copier/task_simple_" + generateRandomString(10)
self.container_task_file = "/task0_description.xml"
for instance_name, _ in cluster.instances.items():
instance = cluster.instances[instance_name]
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, './task0_description.xml'), self.container_task_file)
print("Copied task file to container of '{}' instance. Path {}".format(instance_name, self.container_task_file))
def start(self):
instance = cluster.instances['s0_0_0']
@ -112,9 +122,14 @@ class Task2:
def __init__(self, cluster, unique_zk_path):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_month_to_week_partition"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_month_to_week_description.xml'), 'r').read()
self.unique_zk_path = unique_zk_path
self.zk_task_path = "/clickhouse-copier/task_month_to_week_partition_" + generateRandomString(5)
self.unique_zk_path = generateRandomString(10)
self.container_task_file = "/task_month_to_week_description.xml"
for instance_name, _ in cluster.instances.items():
instance = cluster.instances[instance_name]
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, './task_month_to_week_description.xml'), self.container_task_file)
print("Copied task file to container of '{}' instance. Path {}".format(instance_name, self.container_task_file))
def start(self):
instance = cluster.instances['s0_0_0']
@ -163,9 +178,14 @@ class Task_test_block_size:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_test_block_size"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_test_block_size.xml'), 'r').read()
self.zk_task_path = "/clickhouse-copier/task_test_block_size_" + generateRandomString(5)
self.rows = 1000000
self.container_task_file = "/task_test_block_size.xml"
for instance_name, _ in cluster.instances.items():
instance = cluster.instances[instance_name]
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, './task_test_block_size.xml'), self.container_task_file)
print("Copied task file to container of '{}' instance. Path {}".format(instance_name, self.container_task_file))
def start(self):
instance = cluster.instances['s0_0_0']
@ -192,13 +212,19 @@ class Task_no_index:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_no_index"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_no_index.xml'), 'r').read()
self.zk_task_path = "/clickhouse-copier/task_no_index_" + generateRandomString(5)
self.rows = 1000000
self.container_task_file = "/task_no_index.xml"
for instance_name, _ in cluster.instances.items():
instance = cluster.instances[instance_name]
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, './task_no_index.xml'), self.container_task_file)
print("Copied task file to container of '{}' instance. Path {}".format(instance_name, self.container_task_file))
def start(self):
instance = cluster.instances['s0_0_0']
instance.query("create table ontime (Year UInt16, FlightDate String) ENGINE = Memory")
instance.query("DROP TABLE IF EXISTS ontime SYNC")
instance.query("create table IF NOT EXISTS ontime (Year UInt16, FlightDate String) ENGINE = Memory")
instance.query("insert into ontime values (2016, 'test6'), (2017, 'test7'), (2018, 'test8')")
def check(self):
@ -214,32 +240,44 @@ class Task_no_arg:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_no_arg"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_no_arg.xml'), 'r').read()
self.rows = 1000000
self.container_task_file = "/task_no_arg.xml"
for instance_name, _ in cluster.instances.items():
instance = cluster.instances[instance_name]
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, './task_no_arg.xml'), self.container_task_file)
print("Copied task file to container of '{}' instance. Path {}".format(instance_name, self.container_task_file))
def start(self):
instance = cluster.instances['s0_0_0']
instance.query("DROP TABLE IF EXISTS copier_test1 SYNC")
instance.query(
"create table copier_test1 (date Date, id UInt32) engine = MergeTree PARTITION BY date ORDER BY date SETTINGS index_granularity = 8192")
"create table if not exists copier_test1 (date Date, id UInt32) engine = MergeTree PARTITION BY date ORDER BY date SETTINGS index_granularity = 8192")
instance.query("insert into copier_test1 values ('2016-01-01', 10);")
def check(self):
assert TSV(self.cluster.instances['s1_1_0'].query("SELECT date FROM copier_test1_1")) == TSV("2016-01-01\n")
instance = cluster.instances['s0_0_0']
instance.query("DROP TABLE copier_test1")
instance.query("DROP TABLE copier_test1 SYNC")
instance = cluster.instances['s1_1_0']
instance.query("DROP TABLE copier_test1_1")
instance.query("DROP TABLE copier_test1_1 SYNC")
class Task_non_partitioned_table:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_non_partitoned_table"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_non_partitioned_table.xml'), 'r').read()
self.rows = 1000000
self.container_task_file = "/task_non_partitioned_table.xml"
for instance_name, _ in cluster.instances.items():
instance = cluster.instances[instance_name]
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, './task_non_partitioned_table.xml'), self.container_task_file)
print("Copied task file to container of '{}' instance. Path {}".format(instance_name, self.container_task_file))
def start(self):
instance = cluster.instances['s0_0_0']
instance.query("DROP TABLE IF EXISTS copier_test1 SYNC")
instance.query(
"create table copier_test1 (date Date, id UInt32) engine = MergeTree ORDER BY date SETTINGS index_granularity = 8192")
instance.query("insert into copier_test1 values ('2016-01-01', 10);")
@ -256,16 +294,23 @@ class Task_self_copy:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_self_copy"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_self_copy.xml'), 'r').read()
self.container_task_file = "/task_self_copy.xml"
for instance_name, _ in cluster.instances.items():
instance = cluster.instances[instance_name]
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, './task_self_copy.xml'), self.container_task_file)
print("Copied task file to container of '{}' instance. Path {}".format(instance_name, self.container_task_file))
def start(self):
instance = cluster.instances['s0_0_0']
instance.query("CREATE DATABASE db1;")
instance.query("DROP DATABASE IF EXISTS db1 SYNC")
instance.query("DROP DATABASE IF EXISTS db2 SYNC")
instance.query("CREATE DATABASE IF NOT EXISTS db1;")
instance.query(
"CREATE TABLE db1.source_table (`a` Int8, `b` String, `c` Int8) ENGINE = MergeTree PARTITION BY a ORDER BY a SETTINGS index_granularity = 8192")
instance.query("CREATE DATABASE db2;")
"CREATE TABLE IF NOT EXISTS db1.source_table (`a` Int8, `b` String, `c` Int8) ENGINE = MergeTree PARTITION BY a ORDER BY a SETTINGS index_granularity = 8192")
instance.query("CREATE DATABASE IF NOT EXISTS db2;")
instance.query(
"CREATE TABLE db2.destination_table (`a` Int8, `b` String, `c` Int8) ENGINE = MergeTree PARTITION BY a ORDER BY a SETTINGS index_granularity = 8192")
"CREATE TABLE IF NOT EXISTS db2.destination_table (`a` Int8, `b` String, `c` Int8) ENGINE = MergeTree PARTITION BY a ORDER BY a SETTINGS index_granularity = 8192")
instance.query("INSERT INTO db1.source_table VALUES (1, 'ClickHouse', 1);")
instance.query("INSERT INTO db1.source_table VALUES (2, 'Copier', 2);")
@ -273,8 +318,8 @@ class Task_self_copy:
instance = cluster.instances['s0_0_0']
assert TSV(instance.query("SELECT * FROM db2.destination_table ORDER BY a")) == TSV(instance.query("SELECT * FROM db1.source_table ORDER BY a"))
instance = cluster.instances['s0_0_0']
instance.query("DROP DATABASE db1 SYNC")
instance.query("DROP DATABASE db2 SYNC")
instance.query("DROP DATABASE IF EXISTS db1 SYNC")
instance.query("DROP DATABASE IF EXISTS db2 SYNC")
def execute_task(started_cluster, task, cmd_options):
@ -283,26 +328,27 @@ def execute_task(started_cluster, task, cmd_options):
zk = started_cluster.get_kazoo_client('zoo1')
print("Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1]))
try:
zk.delete("/clickhouse-copier", recursive=True)
except kazoo.exceptions.NoNodeError:
print("No node /clickhouse-copier. It is Ok in first test.")
zk_task_path = task.zk_task_path
zk.ensure_path(zk_task_path)
zk.create(zk_task_path + "/description", task.copier_task_config.encode())
# Run cluster-copier processes on each node
docker_api = started_cluster.docker_client.api
copiers_exec_ids = []
cmd = ['/usr/bin/clickhouse', 'copier',
'--config', '/etc/clickhouse-server/config-copier.xml',
'--task-path', zk_task_path,
'--task-path', task.zk_task_path,
'--task-file', task.container_task_file,
'--task-upload-force', 'true',
'--base-dir', '/var/log/clickhouse-server/copier']
cmd += cmd_options
copiers = random.sample(list(cluster.instances.keys()), 3)
print(cmd)
copiers = random.sample(list(started_cluster.instances.keys()), 3)
for instance_name in copiers:
instance = started_cluster.instances[instance_name]
@ -330,18 +376,12 @@ def execute_task(started_cluster, task, cmd_options):
try:
task.check()
finally:
zk.delete(zk_task_path, recursive=True)
zk.delete(task.zk_task_path, recursive=True)
# Tests
@pytest.mark.parametrize(
('use_sample_offset'),
[
False,
True
]
)
@pytest.mark.parametrize(('use_sample_offset'), [False, True])
def test_copy_simple(started_cluster, use_sample_offset):
if use_sample_offset:
execute_task(started_cluster, Task1(started_cluster), ['--experimental-use-sample-offset', '1'])
@ -349,13 +389,7 @@ def test_copy_simple(started_cluster, use_sample_offset):
execute_task(started_cluster, Task1(started_cluster), [])
@pytest.mark.parametrize(
('use_sample_offset'),
[
False,
True
]
)
@pytest.mark.parametrize(('use_sample_offset'),[False, True])
def test_copy_with_recovering(started_cluster, use_sample_offset):
if use_sample_offset:
execute_task(started_cluster, Task1(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY),
@ -364,13 +398,7 @@ def test_copy_with_recovering(started_cluster, use_sample_offset):
execute_task(started_cluster, Task1(started_cluster), ['--copy-fault-probability', str(COPYING_FAIL_PROBABILITY)])
@pytest.mark.parametrize(
('use_sample_offset'),
[
False,
True
]
)
@pytest.mark.parametrize(('use_sample_offset'),[False, True])
def test_copy_with_recovering_after_move_faults(started_cluster, use_sample_offset):
if use_sample_offset:
execute_task(started_cluster, Task1(started_cluster), ['--move-fault-probability', str(MOVING_FAIL_PROBABILITY),
@ -412,9 +440,3 @@ def test_non_partitioned_table(started_cluster):
def test_self_copy(started_cluster):
execute_task(started_cluster, Task_self_copy(started_cluster), [])
if __name__ == '__main__':
with contextmanager(started_cluster)() as cluster:
for name, instance in list(cluster.instances.items()):
print(name, instance.ip_address)
input("Cluster created, press any key to destroy...")

View File

@ -0,0 +1,238 @@
import os
import sys
import time
import logging
import pytest
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
import docker
CURRENT_TEST_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(CURRENT_TEST_DIR))
cluster = ClickHouseCluster(__file__, name='copier_test_three_nodes')
@pytest.fixture(scope="module")
def started_cluster():
global cluster
try:
for name in ["first", "second", "third"]:
cluster.add_instance(name,
main_configs=["configs_three_nodes/conf.d/clusters.xml", "configs_three_nodes/conf.d/ddl.xml"], user_configs=["configs_three_nodes/users.xml"],
with_zookeeper=True)
cluster.start()
yield cluster
finally:
cluster.shutdown()
class Task:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = '/clickhouse-copier/task'
self.container_task_file = "/task_taxi_data.xml"
for instance_name, _ in cluster.instances.items():
instance = cluster.instances[instance_name]
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, './task_taxi_data.xml'), self.container_task_file)
print("Copied task file to container of '{}' instance. Path {}".format(instance_name, self.container_task_file))
def start(self):
for name in ["first", "second", "third"]:
node = cluster.instances[name]
node.query("DROP DATABASE IF EXISTS dailyhistory SYNC;")
node.query("DROP DATABASE IF EXISTS monthlyhistory SYNC;")
instance = cluster.instances['first']
# daily partition database
instance.query("CREATE DATABASE IF NOT EXISTS dailyhistory on cluster events;")
instance.query("""CREATE TABLE dailyhistory.yellow_tripdata_staging ON CLUSTER events
(
id UUID DEFAULT generateUUIDv4(),
vendor_id String,
tpep_pickup_datetime DateTime('UTC'),
tpep_dropoff_datetime DateTime('UTC'),
passenger_count Nullable(Float64),
trip_distance String,
pickup_longitude Float64,
pickup_latitude Float64,
rate_code_id String,
store_and_fwd_flag String,
dropoff_longitude Float64,
dropoff_latitude Float64,
payment_type String,
fare_amount String,
extra String,
mta_tax String,
tip_amount String,
tolls_amount String,
improvement_surcharge String,
total_amount String,
pickup_location_id String,
dropoff_location_id String,
congestion_surcharge String,
junk1 String, junk2 String
)
Engine = ReplacingMergeTree()
PRIMARY KEY (tpep_pickup_datetime, id)
ORDER BY (tpep_pickup_datetime, id)
PARTITION BY (toYYYYMMDD(tpep_pickup_datetime))""")
instance.query("""CREATE TABLE dailyhistory.yellow_tripdata
ON CLUSTER events
AS dailyhistory.yellow_tripdata_staging
ENGINE = Distributed('events', 'dailyhistory', yellow_tripdata_staging, sipHash64(id) % 3);""")
instance.query("""INSERT INTO dailyhistory.yellow_tripdata
SELECT * FROM generateRandom(
'id UUID DEFAULT generateUUIDv4(),
vendor_id String,
tpep_pickup_datetime DateTime(\\'UTC\\'),
tpep_dropoff_datetime DateTime(\\'UTC\\'),
passenger_count Nullable(Float64),
trip_distance String,
pickup_longitude Float64,
pickup_latitude Float64,
rate_code_id String,
store_and_fwd_flag String,
dropoff_longitude Float64,
dropoff_latitude Float64,
payment_type String,
fare_amount String,
extra String,
mta_tax String,
tip_amount String,
tolls_amount String,
improvement_surcharge String,
total_amount String,
pickup_location_id String,
dropoff_location_id String,
congestion_surcharge String,
junk1 String,
junk2 String',
1, 10, 2) LIMIT 50;""")
# monthly partition database
instance.query("create database IF NOT EXISTS monthlyhistory on cluster events;")
instance.query("""CREATE TABLE monthlyhistory.yellow_tripdata_staging ON CLUSTER events
(
id UUID DEFAULT generateUUIDv4(),
vendor_id String,
tpep_pickup_datetime DateTime('UTC'),
tpep_dropoff_datetime DateTime('UTC'),
passenger_count Nullable(Float64),
trip_distance String,
pickup_longitude Float64,
pickup_latitude Float64,
rate_code_id String,
store_and_fwd_flag String,
dropoff_longitude Float64,
dropoff_latitude Float64,
payment_type String,
fare_amount String,
extra String,
mta_tax String,
tip_amount String,
tolls_amount String,
improvement_surcharge String,
total_amount String,
pickup_location_id String,
dropoff_location_id String,
congestion_surcharge String,
junk1 String,
junk2 String
)
Engine = ReplacingMergeTree()
PRIMARY KEY (tpep_pickup_datetime, id)
ORDER BY (tpep_pickup_datetime, id)
PARTITION BY (pickup_location_id, toYYYYMM(tpep_pickup_datetime))""")
instance.query("""CREATE TABLE monthlyhistory.yellow_tripdata
ON CLUSTER events
AS monthlyhistory.yellow_tripdata_staging
ENGINE = Distributed('events', 'monthlyhistory', yellow_tripdata_staging, sipHash64(id) % 3);""")
def check(self):
instance = cluster.instances["first"]
a = TSV(instance.query("SELECT count() from dailyhistory.yellow_tripdata"))
b = TSV(instance.query("SELECT count() from monthlyhistory.yellow_tripdata"))
assert a == b, "Distributed tables"
for instance_name, instance in cluster.instances.items():
instance = cluster.instances[instance_name]
a = instance.query("SELECT count() from dailyhistory.yellow_tripdata_staging")
b = instance.query("SELECT count() from monthlyhistory.yellow_tripdata_staging")
assert a == b, "MergeTree tables on each shard"
a = TSV(instance.query("SELECT sipHash64(*) from dailyhistory.yellow_tripdata_staging ORDER BY id"))
b = TSV(instance.query("SELECT sipHash64(*) from monthlyhistory.yellow_tripdata_staging ORDER BY id"))
assert a == b, "Data on each shard"
for name in ["first", "second", "third"]:
node = cluster.instances[name]
node.query("DROP DATABASE IF EXISTS dailyhistory SYNC;")
node.query("DROP DATABASE IF EXISTS monthlyhistory SYNC;")
def execute_task(started_cluster, task, cmd_options):
task.start()
zk = started_cluster.get_kazoo_client('zoo1')
print("Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1]))
# Run cluster-copier processes on each node
docker_api = docker.from_env().api
copiers_exec_ids = []
cmd = ['/usr/bin/clickhouse', 'copier',
'--config', '/etc/clickhouse-server/config-copier.xml',
'--task-path', task.zk_task_path,
'--task-file', task.container_task_file,
'--task-upload-force', 'true',
'--base-dir', '/var/log/clickhouse-server/copier']
cmd += cmd_options
print(cmd)
for instance_name, instance in started_cluster.instances.items():
instance = started_cluster.instances[instance_name]
container = instance.get_docker_handle()
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, "configs_three_nodes/config-copier.xml"), "/etc/clickhouse-server/config-copier.xml")
logging.info("Copied copier config to {}".format(instance.name))
exec_id = docker_api.exec_create(container.id, cmd, stderr=True)
output = docker_api.exec_start(exec_id).decode('utf8')
logging.info(output)
copiers_exec_ids.append(exec_id)
logging.info("Copier for {} ({}) has started".format(instance.name, instance.ip_address))
# time.sleep(1000)
# Wait for copiers stopping and check their return codes
for exec_id, instance in zip(copiers_exec_ids, iter(started_cluster.instances.values())):
while True:
res = docker_api.exec_inspect(exec_id)
if not res['Running']:
break
time.sleep(1)
assert res['ExitCode'] == 0, "Instance: {} ({}). Info: {}".format(instance.name, instance.ip_address, repr(res))
try:
task.check()
finally:
zk.delete(task.zk_task_path, recursive=True)
# Tests
@pytest.mark.timeout(600)
def test(started_cluster):
execute_task(started_cluster, Task(started_cluster), [])

View File

@ -0,0 +1,182 @@
import os
import sys
import time
import random
import string
from helpers.cluster import ClickHouseCluster
from helpers.test_tools import TSV
import kazoo
import pytest
import docker
CURRENT_TEST_DIR = os.path.dirname(os.path.abspath(__file__))
sys.path.insert(0, os.path.dirname(CURRENT_TEST_DIR))
COPYING_FAIL_PROBABILITY = 0.1
MOVING_FAIL_PROBABILITY = 0.1
cluster = ClickHouseCluster(__file__, name='copier_test_trivial')
def generateRandomString(count):
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(count))
@pytest.fixture(scope="module")
def started_cluster():
global cluster
try:
for name in ["first_trivial", "second_trivial"]:
instance = cluster.add_instance(name,
main_configs=["configs/conf.d/clusters_trivial.xml"],
user_configs=["configs_two_nodes/users.xml"],
macros={"cluster" : name, "shard" : "the_only_shard", "replica" : "the_only_replica"},
with_zookeeper=True)
cluster.start()
yield cluster
finally:
cluster.shutdown()
class TaskTrivial:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_trivial"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_trivial.xml'), 'r').read()
def start(self):
source = cluster.instances['first_trivial']
destination = cluster.instances['second_trivial']
for node in [source, destination]:
node.query("DROP DATABASE IF EXISTS default")
node.query("CREATE DATABASE IF NOT EXISTS default")
source.query("CREATE TABLE trivial (d UInt64, d1 UInt64 MATERIALIZED d+1)"
"ENGINE=ReplicatedMergeTree('/clickhouse/tables/source_trivial_cluster/1/trivial/{}', '1') "
"PARTITION BY d % 5 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16".format(generateRandomString(10)))
source.query("INSERT INTO trivial SELECT * FROM system.numbers LIMIT 1002",
settings={"insert_distributed_sync": 1})
def check(self):
zk = cluster.get_kazoo_client('zoo1')
status_data, _ = zk.get(self.zk_task_path + "/status")
assert status_data == b'{"hits":{"all_partitions_count":5,"processed_partitions_count":5}}'
source = cluster.instances['first_trivial']
destination = cluster.instances['second_trivial']
assert TSV(source.query("SELECT count() FROM trivial")) == TSV("1002\n")
assert TSV(destination.query("SELECT count() FROM trivial")) == TSV("1002\n")
for node in [source, destination]:
node.query("DROP TABLE trivial")
class TaskReplicatedWithoutArguments:
def __init__(self, cluster):
self.cluster = cluster
self.zk_task_path = "/clickhouse-copier/task_trivial_without_arguments"
self.copier_task_config = open(os.path.join(CURRENT_TEST_DIR, 'task_trivial_without_arguments.xml'), 'r').read()
def start(self):
source = cluster.instances['first_trivial']
destination = cluster.instances['second_trivial']
for node in [source, destination]:
node.query("DROP DATABASE IF EXISTS default")
node.query("CREATE DATABASE IF NOT EXISTS default")
source.query("CREATE TABLE trivial_without_arguments ON CLUSTER source_trivial_cluster (d UInt64, d1 UInt64 MATERIALIZED d+1) "
"ENGINE=ReplicatedMergeTree() "
"PARTITION BY d % 5 ORDER BY (d, sipHash64(d)) SAMPLE BY sipHash64(d) SETTINGS index_granularity = 16")
source.query("INSERT INTO trivial_without_arguments SELECT * FROM system.numbers LIMIT 1002",
settings={"insert_distributed_sync": 1})
def check(self):
zk = cluster.get_kazoo_client('zoo1')
status_data, _ = zk.get(self.zk_task_path + "/status")
assert status_data == b'{"hits":{"all_partitions_count":5,"processed_partitions_count":5}}'
source = cluster.instances['first_trivial']
destination = cluster.instances['second_trivial']
assert TSV(source.query("SELECT count() FROM trivial_without_arguments")) == TSV("1002\n")
assert TSV(destination.query("SELECT count() FROM trivial_without_arguments")) == TSV("1002\n")
for node in [source, destination]:
node.query("DROP TABLE trivial_without_arguments")
def execute_task(started_cluster, task, cmd_options):
task.start()
zk = started_cluster.get_kazoo_client('zoo1')
print("Use ZooKeeper server: {}:{}".format(zk.hosts[0][0], zk.hosts[0][1]))
try:
zk.delete("/clickhouse-copier", recursive=True)
except kazoo.exceptions.NoNodeError:
print("No node /clickhouse-copier. It is Ok in first test.")
zk_task_path = task.zk_task_path
zk.ensure_path(zk_task_path)
zk.create(zk_task_path + "/description", task.copier_task_config.encode())
# Run cluster-copier processes on each node
docker_api = started_cluster.docker_client.api
copiers_exec_ids = []
cmd = ['/usr/bin/clickhouse', 'copier',
'--config', '/etc/clickhouse-server/config-copier.xml',
'--task-path', zk_task_path,
'--base-dir', '/var/log/clickhouse-server/copier']
cmd += cmd_options
copiers = list(started_cluster.instances.keys())
for instance_name in copiers:
instance = started_cluster.instances[instance_name]
container = instance.get_docker_handle()
instance.copy_file_to_container(os.path.join(CURRENT_TEST_DIR, "configs/config-copier.xml"),
"/etc/clickhouse-server/config-copier.xml")
print("Copied copier config to {}".format(instance.name))
exec_id = docker_api.exec_create(container.id, cmd, stderr=True)
output = docker_api.exec_start(exec_id).decode('utf8')
print(output)
copiers_exec_ids.append(exec_id)
print("Copier for {} ({}) has started".format(instance.name, instance.ip_address))
# Wait for copiers stopping and check their return codes
for exec_id, instance_name in zip(copiers_exec_ids, copiers):
instance = started_cluster.instances[instance_name]
while True:
res = docker_api.exec_inspect(exec_id)
if not res['Running']:
break
time.sleep(0.5)
assert res['ExitCode'] == 0, "Instance: {} ({}). Info: {}".format(instance.name, instance.ip_address, repr(res))
try:
task.check()
finally:
zk.delete(zk_task_path, recursive=True)
# Tests
def test_trivial_copy(started_cluster):
execute_task(started_cluster, TaskTrivial(started_cluster), [])
def test_trivial_without_arguments(started_cluster):
execute_task(started_cluster, TaskReplicatedWithoutArguments(started_cluster), [])

Some files were not shown because too many files have changed in this diff Show More