Merge branch 'master' into fix-31538

This commit is contained in:
Nikolai Kochetov 2021-12-09 19:00:22 +03:00 committed by GitHub
commit e41974fabf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
209 changed files with 4590 additions and 1337 deletions

View File

@ -1498,7 +1498,7 @@ jobs:
############################# INTEGRATION TESTS #############################################
#############################################################################################
IntegrationTestsAsan:
needs: [BuilderDebAsan, FunctionalStatelessTestAsan]
needs: [BuilderDebAsan]
runs-on: [self-hosted, stress-tester]
steps:
- name: Download json reports
@ -1526,7 +1526,7 @@ jobs:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
IntegrationTestsTsan:
needs: [BuilderDebTsan, FunctionalStatelessTestTsan]
needs: [BuilderDebTsan]
runs-on: [self-hosted, stress-tester]
steps:
- name: Download json reports
@ -1554,7 +1554,7 @@ jobs:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
IntegrationTestsRelease:
needs: [BuilderDebRelease, FunctionalStatelessTestRelease]
needs: [BuilderDebRelease]
runs-on: [self-hosted, stress-tester]
steps:
- name: Download json reports

View File

@ -1268,7 +1268,7 @@ jobs:
############################# INTEGRATION TESTS #############################################
#############################################################################################
IntegrationTestsAsan:
needs: [BuilderDebAsan, FunctionalStatelessTestAsan]
needs: [BuilderDebAsan]
runs-on: [self-hosted, stress-tester]
steps:
- name: Download json reports
@ -1296,7 +1296,7 @@ jobs:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
IntegrationTestsTsan:
needs: [BuilderDebTsan, FunctionalStatelessTestTsan]
needs: [BuilderDebTsan]
runs-on: [self-hosted, stress-tester]
steps:
- name: Download json reports
@ -1324,7 +1324,7 @@ jobs:
docker rm -f $(docker ps -a -q) ||:
sudo rm -fr $TEMP_PATH
IntegrationTestsRelease:
needs: [BuilderDebRelease, FunctionalStatelessTestRelease]
needs: [BuilderDebRelease]
runs-on: [self-hosted, stress-tester]
steps:
- name: Download json reports
@ -1623,7 +1623,7 @@ jobs:
env:
TEMP_PATH: ${{runner.temp}}/unit_tests_ubsan
REPORTS_PATH: ${{runner.temp}}/reports_dir
CHECK_NAME: 'Unit tests (msan, actions)'
CHECK_NAME: 'Unit tests (ubsan, actions)'
REPO_COPY: ${{runner.temp}}/unit_tests_ubsan/ClickHouse
run: |
sudo rm -fr $TEMP_PATH

View File

@ -149,6 +149,10 @@ if (ENABLE_FUZZING)
set (ENABLE_JEMALLOC 0)
set (ENABLE_CHECK_HEAVY_BUILDS 1)
set (GLIBC_COMPATIBILITY OFF)
# For codegen_select_fuzzer
set (ENABLE_PROTOBUF 1)
set (USE_INTERNAL_PROTOBUF_LIBRARY 1)
endif()
# Global libraries

View File

@ -2,11 +2,11 @@
# NOTE: has nothing common with DBMS_TCP_PROTOCOL_VERSION,
# only DBMS_TCP_PROTOCOL_VERSION should be incremented on protocol changes.
SET(VERSION_REVISION 54457)
SET(VERSION_REVISION 54458)
SET(VERSION_MAJOR 21)
SET(VERSION_MINOR 12)
SET(VERSION_MINOR 13)
SET(VERSION_PATCH 1)
SET(VERSION_GITHASH 503a418dedf0011e9040c3a1b6913e0b5488be4c)
SET(VERSION_DESCRIBE v21.12.1.1-prestable)
SET(VERSION_STRING 21.12.1.1)
SET(VERSION_GITHASH 4cc45c1e15912ee300bca7cc8b8da2b888a70e2a)
SET(VERSION_DESCRIBE v21.13.1.1-prestable)
SET(VERSION_STRING 21.13.1.1)
# end of autochange

4
debian/changelog vendored
View File

@ -1,5 +1,5 @@
clickhouse (21.12.1.1) unstable; urgency=low
clickhouse (21.13.1.1) unstable; urgency=low
* Modified source code
-- clickhouse-release <clickhouse-release@yandex-team.ru> Tue, 02 Nov 2021 00:56:42 +0300
-- clickhouse-release <clickhouse-release@yandex-team.ru> Thu, 09 Dec 2021 00:32:58 +0300

View File

@ -5,7 +5,7 @@ ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
ARG repository="deb https://repo.clickhouse.com/deb/stable/ main/"
ARG version=21.12.1.*
ARG version=21.13.1.*
RUN apt-get update \
&& apt-get install --yes --no-install-recommends \

View File

@ -31,5 +31,6 @@ do
mv "$FUZZER_PATH" /output/fuzzers
done
tar -zcvf /output/fuzzers.tar.gz /output/fuzzers
rm -rf /output/fuzzers

View File

@ -5,7 +5,7 @@ ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
ARG repository="deb https://repo.clickhouse.com/deb/stable/ main/"
ARG version=21.12.1.*
ARG version=21.13.1.*
ARG gosu_ver=1.10
# set non-empty deb_location_url url to create a docker image

View File

@ -1,7 +1,7 @@
FROM ubuntu:18.04
ARG repository="deb https://repo.clickhouse.com/deb/stable/ main/"
ARG version=21.12.1.*
ARG version=21.13.1.*
RUN apt-get update && \
apt-get install -y apt-transport-https dirmngr && \

View File

@ -1,5 +1,5 @@
#!/bin/bash
# shellcheck disable=SC2086,SC2001,SC2046
# shellcheck disable=SC2086,SC2001,SC2046,SC2030,SC2031
set -eux
set -o pipefail
@ -35,7 +35,7 @@ function clone
fi
git diff --name-only master HEAD | tee ci-changed-files.txt
else
if [ -v COMMIT_SHA ]; then
if [ -v SHA_TO_TEST ]; then
git fetch --depth 2 origin "$SHA_TO_TEST"
git checkout "$SHA_TO_TEST"
echo "Checked out nominal SHA $SHA_TO_TEST for master"
@ -189,6 +189,7 @@ continue
--receive_data_timeout_ms=10000 \
--stacktrace \
--query-fuzzer-runs=1000 \
--testmode \
--queries-file $(ls -1 ch/tests/queries/0_stateless/*.sql | sort -R) \
$NEW_TESTS_OPT \
> >(tail -n 100000 > fuzzer.log) \

View File

@ -61,7 +61,7 @@ function configure
cp -rv right/config left ||:
# Start a temporary server to rename the tables
while killall clickhouse-server; do echo . ; sleep 1 ; done
while pkill clickhouse-serv; do echo . ; sleep 1 ; done
echo all killed
set -m # Spawn temporary in its own process groups
@ -88,7 +88,7 @@ function configure
clickhouse-client --port $LEFT_SERVER_PORT --query "create database test" ||:
clickhouse-client --port $LEFT_SERVER_PORT --query "rename table datasets.hits_v1 to test.hits" ||:
while killall clickhouse-server; do echo . ; sleep 1 ; done
while pkill clickhouse-serv; do echo . ; sleep 1 ; done
echo all killed
# Make copies of the original db for both servers. Use hardlinks instead
@ -106,7 +106,7 @@ function configure
function restart
{
while killall clickhouse-server; do echo . ; sleep 1 ; done
while pkill clickhouse-serv; do echo . ; sleep 1 ; done
echo all killed
# Change the jemalloc settings here.
@ -1409,7 +1409,7 @@ case "$stage" in
while env kill -- -$watchdog_pid ; do sleep 1; done
# Stop the servers to free memory for the subsequent query analysis.
while killall clickhouse; do echo . ; sleep 1 ; done
while pkill clickhouse-serv; do echo . ; sleep 1 ; done
echo Servers stopped.
;&
"analyze_queries")

View File

@ -61,6 +61,7 @@ chmod 777 -R /var/lib/clickhouse
clickhouse-client --query "SHOW DATABASES"
clickhouse-client --query "ATTACH DATABASE datasets ENGINE = Ordinary"
service clickhouse-server restart
# Wait for server to start accepting connections
@ -109,8 +110,13 @@ function run_tests()
fi
set +e
clickhouse-test --testname --shard --zookeeper --check-zookeeper-session --no-stateless --hung-check --print-time "${ADDITIONAL_OPTIONS[@]}" \
clickhouse-test --testname --shard --zookeeper --check-zookeeper-session --no-stateless --hung-check --print-time \
--skip 00168_parallel_processing_on_replicas "${ADDITIONAL_OPTIONS[@]}" \
"$SKIP_TESTS_OPTION" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee test_output/test_result.txt
clickhouse-test --timeout 1200 --testname --shard --zookeeper --check-zookeeper-session --no-stateless --hung-check --print-time \
00168_parallel_processing_on_replicas "${ADDITIONAL_OPTIONS[@]}" 2>&1 | ts '%Y-%m-%d %H:%M:%S' | tee -a test_output/test_result.txt
set -e
}

View File

@ -55,9 +55,41 @@ function configure()
echo "<clickhouse><asynchronous_metrics_update_period_s>1</asynchronous_metrics_update_period_s></clickhouse>" \
> /etc/clickhouse-server/config.d/asynchronous_metrics_update_period_s.xml
local total_mem
total_mem=$(awk '/MemTotal/ { print $(NF-1) }' /proc/meminfo) # KiB
total_mem=$(( total_mem*1024 )) # bytes
# Set maximum memory usage as half of total memory (less chance of OOM).
echo "<clickhouse><max_server_memory_usage_to_ram_ratio>0.5</max_server_memory_usage_to_ram_ratio></clickhouse>" \
> /etc/clickhouse-server/config.d/max_server_memory_usage_to_ram_ratio.xml
#
# But not via max_server_memory_usage but via max_memory_usage_for_user,
# so that we can override this setting and execute service queries, like:
# - hung check
# - show/drop database
# - ...
#
# So max_memory_usage_for_user will be a soft limit, and
# max_server_memory_usage will be hard limit, and queries that should be
# executed regardless memory limits will use max_memory_usage_for_user=0,
# instead of relying on max_untracked_memory
local max_server_mem
max_server_mem=$((total_mem*75/100)) # 75%
echo "Setting max_server_memory_usage=$max_server_mem"
cat > /etc/clickhouse-server/config.d/max_server_memory_usage.xml <<EOL
<clickhouse>
<max_server_memory_usage>${max_server_mem}</max_server_memory_usage>
</clickhouse>
EOL
local max_users_mem
max_users_mem=$((total_mem*50/100)) # 50%
echo "Setting max_memory_usage_for_user=$max_users_mem"
cat > /etc/clickhouse-server/users.d/max_memory_usage_for_user.xml <<EOL
<clickhouse>
<profiles>
<default>
<max_memory_usage_for_user>${max_users_mem}</max_memory_usage_for_user>
</default>
</profiles>
</clickhouse>
EOL
}
function stop()

View File

@ -75,6 +75,9 @@ def call_with_retry(query, timeout=30, retry_count=5):
else:
break
def make_query_command(query):
return f"""clickhouse client -q "{query}" --max_untracked_memory=1Gi --memory_profiler_step=1Gi --max_memory_usage_for_user=0"""
def prepare_for_hung_check(drop_databases):
# FIXME this function should not exist, but...
@ -88,40 +91,41 @@ def prepare_for_hung_check(drop_databases):
logging.info("Will terminate gdb (if any)")
call_with_retry("kill -TERM $(pidof gdb)")
# Some tests set too low memory limit for default user and forget to reset in back.
# It may cause SYSTEM queries to fail, let's disable memory limit.
call_with_retry("clickhouse client --max_memory_usage_for_user=0 -q 'SELECT 1 FORMAT Null'")
call_with_retry(make_query_command('SELECT 1 FORMAT Null'))
# Some tests execute SYSTEM STOP MERGES or similar queries.
# It may cause some ALTERs to hang.
# Possibly we should fix tests and forbid to use such queries without specifying table.
call_with_retry("clickhouse client -q 'SYSTEM START MERGES'")
call_with_retry("clickhouse client -q 'SYSTEM START DISTRIBUTED SENDS'")
call_with_retry("clickhouse client -q 'SYSTEM START TTL MERGES'")
call_with_retry("clickhouse client -q 'SYSTEM START MOVES'")
call_with_retry("clickhouse client -q 'SYSTEM START FETCHES'")
call_with_retry("clickhouse client -q 'SYSTEM START REPLICATED SENDS'")
call_with_retry("clickhouse client -q 'SYSTEM START REPLICATION QUEUES'")
call_with_retry(make_query_command('SYSTEM START MERGES'))
call_with_retry(make_query_command('SYSTEM START DISTRIBUTED SENDS'))
call_with_retry(make_query_command('SYSTEM START TTL MERGES'))
call_with_retry(make_query_command('SYSTEM START MOVES'))
call_with_retry(make_query_command('SYSTEM START FETCHES'))
call_with_retry(make_query_command('SYSTEM START REPLICATED SENDS'))
call_with_retry(make_query_command('SYSTEM START REPLICATION QUEUES'))
call_with_retry(make_query_command('SYSTEM DROP MARK CACHE'))
# Issue #21004, live views are experimental, so let's just suppress it
call_with_retry("""clickhouse client -q "KILL QUERY WHERE upper(query) LIKE 'WATCH %'" """)
call_with_retry(make_query_command("KILL QUERY WHERE upper(query) LIKE 'WATCH %'"))
# Kill other queries which known to be slow
# It's query from 01232_preparing_sets_race_condition_long, it may take up to 1000 seconds in slow builds
call_with_retry("""clickhouse client -q "KILL QUERY WHERE query LIKE 'insert into tableB select %'" """)
call_with_retry(make_query_command("KILL QUERY WHERE query LIKE 'insert into tableB select %'"))
# Long query from 00084_external_agregation
call_with_retry("""clickhouse client -q "KILL QUERY WHERE query LIKE 'SELECT URL, uniq(SearchPhrase) AS u FROM test.hits GROUP BY URL ORDER BY u %'" """)
call_with_retry(make_query_command("KILL QUERY WHERE query LIKE 'SELECT URL, uniq(SearchPhrase) AS u FROM test.hits GROUP BY URL ORDER BY u %'"))
if drop_databases:
for i in range(5):
try:
# Here we try to drop all databases in async mode. If some queries really hung, than drop will hung too.
# Otherwise we will get rid of queries which wait for background pool. It can take a long time on slow builds (more than 900 seconds).
databases = check_output('clickhouse client -q "SHOW DATABASES"', shell=True, timeout=30).decode('utf-8').strip().split()
#
# Also specify max_untracked_memory to allow 1GiB of memory to overcommit.
databases = check_output(make_query_command('SHOW DATABASES'), shell=True, timeout=30).decode('utf-8').strip().split()
for db in databases:
if db == "system":
continue
command = f'clickhouse client -q "DROP DATABASE {db}"'
command = make_query_command(f'DROP DATABASE {db}')
# we don't wait for drop
Popen(command, shell=True)
break
@ -133,9 +137,15 @@ def prepare_for_hung_check(drop_databases):
# Wait for last queries to finish if any, not longer than 300 seconds
call("""clickhouse client -q "select sleepEachRow((
select maxOrDefault(300 - elapsed) + 1 from system.processes where query not like '%from system.processes%' and elapsed < 300
) / 300) from numbers(300) format Null" """, shell=True, stderr=STDOUT, timeout=330)
call(make_query_command("""
select sleepEachRow((
select maxOrDefault(300 - elapsed) + 1
from system.processes
where query not like '%from system.processes%' and elapsed < 300
) / 300)
from numbers(300)
format Null
"""), shell=True, stderr=STDOUT, timeout=330)
# Even if all clickhouse-test processes are finished, there are probably some sh scripts,
# which still run some new queries. Let's ignore them.
@ -188,7 +198,24 @@ if __name__ == "__main__":
if args.hung_check:
have_long_running_queries = prepare_for_hung_check(args.drop_databases)
logging.info("Checking if some queries hung")
cmd = "{} {} {}".format(args.test_cmd, "--hung-check", "00001_select_1")
cmd = ' '.join([args.test_cmd,
# Do not track memory allocations up to 1Gi,
# this will allow to ignore server memory limit (max_server_memory_usage) for this query.
#
# NOTE: memory_profiler_step should be also adjusted, because:
#
# untracked_memory_limit = min(settings.max_untracked_memory, settings.memory_profiler_step)
#
# NOTE: that if there will be queries with GROUP BY, this trick
# will not work due to CurrentMemoryTracker::check() from
# Aggregator code.
# But right now it should work, since neither hung check, nor 00001_select_1 has GROUP BY.
"--client-option", "max_untracked_memory=1Gi",
"--client-option", "max_memory_usage_for_user=0",
"--client-option", "memory_profiler_step=1Gi",
"--hung-check",
"00001_select_1"
])
res = call(cmd, shell=True, stderr=STDOUT)
hung_check_status = "No queries hung\tOK\n"
if res != 0 and have_long_running_queries:

View File

@ -262,7 +262,7 @@ In the example below, the index cant be used.
SELECT count() FROM table WHERE CounterID = 34 OR URL LIKE '%upyachka%'
```
To check whether ClickHouse can use the index when running a query, use the settings [force_index_by_date](../../../operations/settings/settings.md#settings-force_index_by_date) and [force_primary_key](../../../operations/settings/settings.md).
To check whether ClickHouse can use the index when running a query, use the settings [force_index_by_date](../../../operations/settings/settings.md#settings-force_index_by_date) and [force_primary_key](../../../operations/settings/settings.md#force-primary-key).
The key for partitioning by month allows reading only those data blocks which contain dates from the proper range. In this case, the data block may contain data for many dates (up to an entire month). Within a block, data is sorted by primary key, which might not contain the date as the first column. Because of this, using a query with only a date condition that does not specify the primary key prefix will cause more data to be read than for a single date.

View File

@ -356,3 +356,24 @@ Possible values:
- 1 — Parts are detached.
Default value: `0`.
## merge_tree_clear_old_temporary_directories_interval_seconds {#setting-merge-tree-clear-old-temporary-directories-interval-seconds}
Sets the interval in seconds for ClickHouse to execute the cleanup of old temporary directories.
Possible values:
- Any positive integer.
Default value: `60` seconds.
## merge_tree_clear_old_parts_interval_seconds {#setting-merge-tree-clear-old-parts-interval-seconds}
Sets the interval in seconds for ClickHouse to execute the cleanup of old parts, WALs, and mutations.
Possible values:
- Any positive integer.
Default value: `1` second.

View File

@ -885,26 +885,6 @@ Possible values:
Default value: 2013265920.
## merge_tree_clear_old_temporary_directories_interval_seconds {#setting-merge-tree-clear-old-temporary-directories-interval-seconds}
Sets the interval in seconds for ClickHouse to execute the cleanup of old temporary directories.
Possible values:
- Any positive integer.
Default value: `60` seconds.
## merge_tree_clear_old_parts_interval_seconds {#setting-merge-tree-clear-old-parts-interval-seconds}
Sets the interval in seconds for ClickHouse to execute the cleanup of old parts, WALs, and mutations.
Possible values:
- Any positive integer.
Default value: `1` second.
## min_bytes_to_use_direct_io {#settings-min-bytes-to-use-direct-io}
The minimum data volume required for using direct I/O access to the storage disk.
@ -992,9 +972,16 @@ log_queries_min_type='EXCEPTION_WHILE_PROCESSING'
Setting up query threads logging.
Queries threads run by ClickHouse with this setup are logged according to the rules in the [query_thread_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_thread_log) server configuration parameter.
Query threads log into [system.query_thread_log](../../operations/system-tables/query_thread_log.md) table. This setting have effect only when [log_queries](#settings-log-queries) is true. Queries threads run by ClickHouse with this setup are logged according to the rules in the [query_thread_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_thread_log) server configuration parameter.
Example:
Possible values:
- 0 — Disabled.
- 1 — Enabled.
Default value: `1`.
**Example**
``` text
log_query_threads=1

View File

@ -4,8 +4,8 @@ Contains information about the dependent views executed when running a query, fo
To start logging:
1. Configure parameters in the [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log) section.
2. Set [log_query_views](../../operations/settings/settings.md#settings-log-query-views) to 1.
1. Configure parameters in the [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log) section.
2. Set [log_query_views](../../operations/settings/settings.md#settings-log-query-views) to 1.
The flushing period of data is set in `flush_interval_milliseconds` parameter of the [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log) server settings section. To force flushing, use the [SYSTEM FLUSH LOGS](../../sql-reference/statements/system.md#query_language-system-flush_logs) query.

View File

@ -0,0 +1,64 @@
---
toc_priority: 311
toc_title: sparkbar
---
# sparkbar {#sparkbar}
The function plots a frequency histogram for values `x` and the repetition rate `y` of these values over the interval `[min_x, max_x]`.
If no interval is specified, then the minimum `x` is used as the interval start, and the maximum `x` — as the interval end.
**Syntax**
``` sql
sparkbar(width[, min_x, max_x])(x, y)
```
**Parameters**
- `width` — The number of segments. Type: [Integer](../../../sql-reference/data-types/int-uint.md).
- `min_x` — The interval start. Optional parameter.
- `max_x` — The interval end. Optional parameter.
**Arguments**
- `x` — The field with values.
- `y` — The field with the frequency of values.
**Returned value**
- The frequency histogram.
**Example**
Query:
``` sql
CREATE TABLE spark_bar_data (`cnt` UInt64,`event_date` Date) ENGINE = MergeTree ORDER BY event_date SETTINGS index_granularity = 8192;
INSERT INTO spark_bar_data VALUES(1,'2020-01-01'),(4,'2020-01-02'),(5,'2020-01-03'),(2,'2020-01-04'),(3,'2020-01-05'),(7,'2020-01-06'),(6,'2020-01-07'),(8,'2020-01-08'),(2,'2020-01-11');
SELECT sparkbar(9)(event_date,cnt) FROM spark_bar_data;
SELECT sparkbar(9,toDate('2020-01-01'),toDate('2020-01-10'))(event_date,cnt) FROM spark_bar_data;
```
Result:
``` text
┌─sparkbar(9)(event_date, cnt)─┐
│ │
│ ▁▅▄▃██▅ ▁ │
│ │
└──────────────────────────────┘
┌─sparkbar(9, toDate('2020-01-01'), toDate('2020-01-10'))(event_date, cnt)─┐
│ │
│▁▄▄▂▅▇█▁ │
│ │
└──────────────────────────────────────────────────────────────────────────┘
```

View File

@ -175,6 +175,7 @@ in which the `Strings` represents the named fields of the tuple and `T` are the
``` sql
tupleToNameValuePairs(tuple)
```
**Arguments**
@ -196,7 +197,7 @@ CREATE TABLE tupletest (`col` Tuple(user_ID UInt64, session_ID UInt64) ENGINE =
INSERT INTO tupletest VALUES (tuple( 100, 2502)), (tuple(1,100));
SELECT tupleToNameValuePairs(col) FROM tupletest;
```
```
Result:

View File

@ -0,0 +1,112 @@
---
toc_priority: 68
toc_title: Window View
---
# Window View Functions {#window-view-functions}
Window view functions return the inclusive lower and exclusive upper bound of the corresponding window. The functions for working with WindowView are listed below:
## tumble {#window-view-functions-tumble}
A tumbling time window assigns records to non-overlapping, continuous windows with a fixed duration (`interval`).
``` sql
tumble(time_attr, interval [, timezone])
```
**Arguments**
- `time_attr` - Date and time. [DateTime](../../sql-reference/data-types/datetime.md) data type.
- `interval` - Window interval in [Interval](../../sql-reference/data-types/special-data-types/interval.md) data type.
- `timezone` — [Timezone name](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone) (optional).
**Returned values**
- The inclusive lower and exclusive upper bound of the corresponding tumbling window.
Type: `Tuple(DateTime, DateTime)`
**Example**
Query:
``` sql
SELECT tumble(now(), toIntervalDay('1'))
```
Result:
``` text
┌─tumble(now(), toIntervalDay('1'))─────────────┐
│ ['2020-01-01 00:00:00','2020-01-02 00:00:00'] │
└───────────────────────────────────────────────┘
```
## hop {#window-view-functions-hop}
A hopping time window has a fixed duration (`window_interval`) and hops by a specified hop interval (`hop_interval`). If the `hop_interval` is smaller than the `window_interval`, hopping windows are overlapping. Thus, records can be assigned to multiple windows.
``` sql
hop(time_attr, hop_interval, window_interval [, timezone])
```
**Arguments**
- `time_attr` - Date and time. [DateTime](../../sql-reference/data-types/datetime.md) data type.
- `hop_interval` - Hop interval in [Interval](../../sql-reference/data-types/special-data-types/interval.md) data type. Should be a positive number.
- `window_interval` - Window interval in [Interval](../../sql-reference/data-types/special-data-types/interval.md) data type. Should be a positive number.
- `timezone` — [Timezone name](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone) (optional).
**Returned values**
- The inclusive lower and exclusive upper bound of the corresponding hopping window. Since one record can be assigned to multiple hop windows, the function only returns the bound of the **first** window when hop function is used **without** `WINDOW VIEW`.
Type: `Tuple(DateTime, DateTime)`
**Example**
Query:
``` sql
SELECT hop(now(), INTERVAL '1' SECOND, INTERVAL '2' SECOND)
```
Result:
``` text
┌─hop(now(), toIntervalSecond('1'), toIntervalSecond('2'))──┐
│ ('2020-01-14 16:58:22','2020-01-14 16:58:24') │
└───────────────────────────────────────────────────────────┘
```
## tumbleStart {#window-view-functions-tumblestart}
Returns the inclusive lower bound of the corresponding tumbling window.
``` sql
tumbleStart(time_attr, interval [, timezone]);
```
## tumbleEnd {#window-view-functions-tumbleend}
Returns the exclusive upper bound of the corresponding tumbling window.
``` sql
tumbleEnd(time_attr, interval [, timezone]);
```
## hopStart {#window-view-functions-hopstart}
Returns the inclusive lower bound of the corresponding hopping window.
``` sql
hopStart(time_attr, hop_interval, window_interval [, timezone]);
```
## hopEnd {#window-view-functions-hopend}
Returns the exclusive upper bound of the corresponding hopping window.
``` sql
hopEnd(time_attr, hop_interval, window_interval [, timezone]);
```

View File

@ -5,7 +5,7 @@ toc_title: VIEW
# CREATE VIEW {#create-view}
Creates a new view. Views can be [normal](#normal), [materialized](#materialized) and [live](#live-view) (the latter is an experimental feature).
Creates a new view. Views can be [normal](#normal), [materialized](#materialized), [live](#live-view), and [window](#window-view) (live view and window view are experimental features).
## Normal View {#normal}
@ -243,3 +243,119 @@ Most common uses of live view tables include:
**See Also**
- [ALTER LIVE VIEW](../alter/view.md#alter-live-view)
## Window View [Experimental] {#window-view}
!!! important "Important"
This is an experimental feature that may change in backwards-incompatible ways in the future releases.
Enable usage of window views and `WATCH` query using [allow_experimental_window_view](../../../operations/settings/settings.md#allow-experimental-window-view) setting. Input the command `set allow_experimental_window_view = 1`.
``` sql
CREATE WINDOW VIEW [IF NOT EXISTS] [db.]table_name [TO [db.]table_name] [ENGINE = engine] [WATERMARK = strategy] [ALLOWED_LATENESS = interval_function] AS SELECT ... GROUP BY window_view_function
```
Window view can aggregate data by time window and output the results when the window is ready to fire. It stores the partial aggregation results in an inner(or specified) table to reduce latency and can push the processing result to a specified table or push notifications using the WATCH query.
Creating a window view is similar to creating `MATERIALIZED VIEW`. Window view needs an inner storage engine to store intermediate data. The inner storage will use `AggregatingMergeTree` as the default engine.
### Window View Functions {#window-view-windowviewfunctions}
[Window view functions](../../functions/window-view-functions.md) are used to get the lower and upper window bound of records. The window view needs to be used with a window view function.
### TIME ATTRIBUTES {#window-view-timeattributes}
Window view supports **processing time** and **event time** process.
**Processing time** allows window view to produce results based on the local machine's time and is used by default. It is the most straightforward notion of time but does not provide determinism. The processing time attribute can be defined by setting the `time_attr` of the window view function to a table column or using the function `now()`. The following query creates a window view with processing time.
``` sql
CREATE WINDOW VIEW wv AS SELECT count(number), tumbleStart(w_id) as w_start from date GROUP BY tumble(now(), INTERVAL '5' SECOND) as w_id
```
**Event time** is the time that each individual event occurred on its producing device. This time is typically embedded within the records when it is generated. Event time processing allows for consistent results even in case of out-of-order events or late events. Window view supports event time processing by using `WATERMARK` syntax.
Window view provides three watermark strategies:
* `STRICTLY_ASCENDING`: Emits a watermark of the maximum observed timestamp so far. Rows that have a timestamp smaller to the max timestamp are not late.
* `ASCENDING`: Emits a watermark of the maximum observed timestamp so far minus 1. Rows that have a timestamp equal and smaller to the max timestamp are not late.
* `BOUNDED`: WATERMARK=INTERVAL. Emits watermarks, which are the maximum observed timestamp minus the specified delay.
The following queries are examples of creating a window view with `WATERMARK`:
``` sql
CREATE WINDOW VIEW wv WATERMARK=STRICTLY_ASCENDING AS SELECT count(number) FROM date GROUP BY tumble(timestamp, INTERVAL '5' SECOND);
CREATE WINDOW VIEW wv WATERMARK=ASCENDING AS SELECT count(number) FROM date GROUP BY tumble(timestamp, INTERVAL '5' SECOND);
CREATE WINDOW VIEW wv WATERMARK=INTERVAL '3' SECOND AS SELECT count(number) FROM date GROUP BY tumble(timestamp, INTERVAL '5' SECOND);
```
By default, the window will be fired when the watermark comes, and elements that arrived behind the watermark will be dropped. Window view supports late event processing by setting `ALLOWED_LATENESS=INTERVAL`. An example of lateness handling is:
``` sql
CREATE WINDOW VIEW test.wv TO test.dst WATERMARK=ASCENDING ALLOWED_LATENESS=INTERVAL '2' SECOND AS SELECT count(a) AS count, tumbleEnd(wid) AS w_end FROM test.mt GROUP BY tumble(timestamp, INTERVAL '5' SECOND) AS wid;
```
Note that elements emitted by a late firing should be treated as updated results of a previous computation. Instead of firing at the end of windows, the window view will fire immediately when the late event arrives. Thus, it will result in multiple outputs for the same window. Users need to take these duplicated results into account or deduplicate them.
### Monitoring New Windows{#window-view-monitoring}
Window view supports the `WATCH` query to constantly append the processing results to the console or use `TO` syntax to output the results to a table.
``` sql
WATCH [db.]name [LIMIT n]
```
`WATCH` query acts similar as in `LIVE VIEW`. A `LIMIT` can be specified to set the number of updates to receive before terminating the query.
### Settings {#window-view-settings}
- `window_view_clean_interval`: The clean interval of window view in seconds to free outdated data. The system will retain the windows that have not been fully triggered according to the system time or `WATERMARK` configuration, and the other data will be deleted.
- `window_view_heartbeat_interval`: The heartbeat interval in seconds to indicate the watch query is alive.
### Example {#window-view-example}
Suppose we need to count the number of click logs per 10 seconds in a log table called `data`, and its table structure is:
``` sql
CREATE TABLE data ( `id` UInt64, `timestamp` DateTime) ENGINE = Memory;
```
First, we create a window view with tumble window of 10 seconds interval:
``` sql
CREATE WINDOW VIEW wv as select count(id), tumbleStart(w_id) as window_start from data group by tumble(timestamp, INTERVAL '10' SECOND) as w_id
```
Then, we use the `WATCH` query to get the results.
``` sql
WATCH wv
```
When logs are inserted into table `data`,
``` sql
INSERT INTO data VALUES(1,now())
```
The `WATCH` query should print the results as follows:
``` text
┌─count(id)─┬────────window_start─┐
│ 1 │ 2020-01-14 16:56:40 │
└───────────┴─────────────────────┘
```
Alternatively, we can attach the output to another table using `TO` syntax.
``` sql
CREATE WINDOW VIEW wv TO dst AS SELECT count(id), tumbleStart(w_id) as window_start FROM data GROUP BY tumble(timestamp, INTERVAL '10' SECOND) as w_id
```
Additional examples can be found among stateful tests of ClickHouse (they are named `*window_view*` there).
### Window View Usage {#window-view-usage}
The window view is useful in the following scenarios:
* **Monitoring**: Aggregate and calculate the metrics logs by time, and output the results to a target table. The dashboard can use the target table as a source table.
* **Analyzing**: Automatically aggregate and preprocess data in the time window. This can be useful when analyzing a large number of logs. The preprocessing eliminates repeated calculations in multiple queries and reduces query latency.

View File

@ -999,14 +999,14 @@ ClickHouse проверяет условия для `min_part_size` и `min_part
Настройки логирования информации о зависимых представлениях (materialized, live и т.п.) в запросах принятых с настройкой [log_query_views=1](../../operations/settings/settings.md#settings-log-query-views).
Запросы сохраняются в таблицу system.query_views_log. Вы можете изменить название этой таблицы в параметре `table` (см. ниже).
Запросы логируются в таблице [system.query_views_log](../../operations/system-tables/query_views_log.md#system_tables-query_views_log). Вы можете изменить название этой таблицы в параметре `table` (см. ниже).
При настройке логирования используются следующие параметры:
- `database` имя базы данных.
- `table` имя таблицы куда будут записываться использованные представления.
- `partition_by` — устанавливает [произвольный ключ партиционирования](../../engines/table-engines/mergetree-family/custom-partitioning-key.md). Нельзя использовать если используется `engine`
- `engine` - устанавливает [настройки MergeTree Engine](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) для системной таблицы. Нельзя использовать если используется `partition_by`.
- `table` имя системной таблицы, где будут логироваться запросы.
- `partition_by` — устанавливает [произвольный ключ партиционирования](../../engines/table-engines/mergetree-family/custom-partitioning-key.md). Нельзя использовать, если задан параметр `engine`.
- `engine` устанавливает [настройки MergeTree Engine](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) для системной таблицы. Нельзя использовать, если задан параметр `partition_by`.
- `flush_interval_milliseconds` — период сброса данных из буфера в памяти в таблицу.
Если таблица не существует, то ClickHouse создаст её. Если структура журнала запросов изменилась при обновлении сервера ClickHouse, то таблица со старой структурой переименовывается, а новая таблица создается автоматически.

View File

@ -355,3 +355,23 @@ Eсли суммарное число активных кусков во все
- 1 — куски данных открепляются.
Значение по умолчанию: `0`.
## merge_tree_clear_old_temporary_directories_interval_seconds {#setting-merge-tree-clear-old-temporary-directories-interval-seconds}
Задает интервал в секундах для удаления старых временных каталогов на сервере ClickHouse.
Возможные значения:
- Положительное целое число.
Значение по умолчанию: `60` секунд.
## merge_tree_clear_old_parts_interval_seconds {#setting-merge-tree-clear-old-parts-interval-seconds}
Задает интервал в секундах для удаления старых кусков данных, журналов предзаписи (WAL) и мутаций на сервере ClickHouse.
Возможные значения:
- Положительное целое число.
Значение по умолчанию: `1` секунда.

View File

@ -807,26 +807,6 @@ ClickHouse может парсить только базовый формат `Y
Значение по умолчанию: 2013265920.
## merge_tree_clear_old_temporary_directories_interval_seconds {#setting-merge-tree-clear-old-temporary-directories-interval-seconds}
Задает интервал в секундах для удаления старых временных каталогов на сервере ClickHouse.
Возможные значения:
- Положительное целое число.
Значение по умолчанию: `60` секунд.
## merge_tree_clear_old_parts_interval_seconds {#setting-merge-tree-clear-old-parts-interval-seconds}
Задает интервал в секундах для удаления старых кусков данных, журналов предзаписи (WAL) и мутаций на сервере ClickHouse .
Возможные значения:
- Положительное целое число.
Значение по умолчанию: `1` секунда.
## min_bytes_to_use_direct_io {#settings-min-bytes-to-use-direct-io}
Минимальный объём данных, необходимый для прямого (небуферизованного) чтения/записи (direct I/O) на диск.
@ -912,11 +892,18 @@ log_queries_min_type='EXCEPTION_WHILE_PROCESSING'
## log_query_threads {#settings-log-query-threads}
Установка логирования информации о потоках выполнения запроса.
Управляет логированием информации о потоках выполнения запросов.
Лог информации о потоках выполнения запросов, переданных в ClickHouse с этой установкой, записывается согласно правилам конфигурационного параметра сервера [query_thread_log](../server-configuration-parameters/settings.md#server_configuration_parameters-query_thread_log).
Информация о потоках выполнения запросов сохраняется в системной таблице [system.query_thread_log](../../operations/system-tables/query_thread_log.md). Работает только в том случае, если включена настройка [log_queries](#settings-log-queries). Лог информации о потоках выполнения запросов, переданных в ClickHouse с этой установкой, записывается согласно правилам конфигурационного параметра сервера [query_thread_log](../server-configuration-parameters/settings.md#server_configuration_parameters-query_thread_log).
Пример:
Возможные значения:
- 0 — отключено.
- 1 — включено.
Значение по умолчанию: `1`.
**Пример**
``` text
log_query_threads=1

View File

@ -55,6 +55,7 @@ ClickHouse не удаляет данные из таблица автомати
- `query_kind` ([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md)) — тип запроса.
- `databases` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — имена баз данных, присутствующих в запросе.
- `tables` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — имена таблиц, присутствующих в запросе.
- `views` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — имена представлений (материализованные или live), которые представленны в запросе.
- `columns` ([Array](../../sql-reference/data-types/array.md)([LowCardinality(String)](../../sql-reference/data-types/lowcardinality.md))) — имена столбцов, присутствующих в запросе.
- `projections` ([String](../../sql-reference/data-types/string.md)) — имена проекций, использованных при выполнении запроса.
- `exception_code` ([Int32](../../sql-reference/data-types/int-uint.md)) — код исключения.

View File

@ -112,5 +112,5 @@ ProfileEvents: {'Query':1,'SelectQuery':1,'ReadCompressedBytes':36,'Compr
**Смотрите также**
- [system.query_log](../../operations/system-tables/query_log.md#system_tables-query_log) — описание системной таблицы `query_log`, которая содержит общую информацию о выполненных запросах.
- [system.query_log](../../operations/system-tables/query_log.md#system_tables-query_log) — описание системной таблицы `query_log`, которая содержит общую информацию о выполненных запросах.
- [system.query_views_log](../../operations/system-tables/query_views_log.md#system_tables-query_views_log) — описание системной таблицы `query_views_log`, которая содержит информацию о всех представлениях, участвующих в выполненных запросах.

View File

@ -1 +0,0 @@
../../../en/operations/system-tables/query_views_log.md

View File

@ -0,0 +1,84 @@
# system.query_views_log {#system_tables-query_views_log}
Содержит информацию о зависимых представлениях, выполняемых при выполнении запроса, например, тип представления или время выполнения.
Чтобы начать ведение журнала:
1. Настройте параметры в разделе [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log).
2. Включите настройку [log_query_views=1](../../operations/settings/settings.md#settings-log-query-views).
Период сброса данных из буфера в памяти задается в параметре `flush_interval_milliseconds` в разделе настроек сервера [query_views_log](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-query_views_log ). Для принудительного сброса используйте запрос [SYSTEM FLUSH LOGS](../../sql-reference/statements/system.md#query_language-system-flush_logs).
ClickHouse не удаляет данные из таблицы автоматически. Подробнее смотрите раздел [Системные таблицы](../../operations/system-tables/index.md#system-tables-introduction).
Чтобы уменьшить количество запросов, регистрируемых в таблице `query_views_log`, вы можете включить настройку [log_queries_probability](../../operations/settings/settings.md#log-queries-probability).
Столбцы:
- `event_date` ([Date](../../sql-reference/data-types/date.md)) — дата, когда произошло последнее событие с представлением.
- `event_time` ([DateTime](../../sql-reference/data-types/datetime.md)) — дата и время завершения выполнения представления.
- `event_time_microseconds` ([DateTime](../../sql-reference/data-types/datetime.md)) — дата и время завершения выполнения представления с точностью до микросекунд.
- `view_duration_ms` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — продолжительность выполнения представления (сумма его этапов) в миллисекундах.
- `initial_query_id` ([String](../../sql-reference/data-types/string.md)) — идентификатор начального запроса (при распределённом выполнении запроса).
- `view_name` ([String](../../sql-reference/data-types/string.md)) — имя представления.
- `view_uuid` ([UUID](../../sql-reference/data-types/uuid.md)) — UUID представления.
- `view_type` ([Enum8](../../sql-reference/data-types/enum.md)) — тип представления. Возможные значения:
- `'Default' = 1` — [обычные представления](../../sql-reference/statements/create/view.md#normal). Не должно появляться в этом журнале.
- `'Materialized' = 2` — [материализованные представления](../../sql-reference/statements/create/view.md#materialized).
- `'Live' = 3` — [live представления](../../sql-reference/statements/create/view.md#live-view).
- `view_query` ([String](../../sql-reference/data-types/string.md)) — запрос, выполняемый представлением.
- `view_target` ([String](../../sql-reference/data-types/string.md)) — имя целевой таблицы представления.
- `read_rows` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — количество прочитанных строк.
- `read_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — количество прочитанных байт.
- `written_rows` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — количество записанных строк.
- `written_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md#uint-ranges)) — количество записанных байт.
- `peak_memory_usage` ([Int64](../../sql-reference/data-types/int-uint.md)) — максимальная разница между объемом выделенной и освобожденной памяти в контексте этого представления.
- `ProfileEvents` ([Map(String, UInt64)](../../sql-reference/data-types/array.md)) — события профиля, которые измеряют различные показатели. Их описание можно найти в таблице [system.events](../../operations/system-tables/events.md#system_tables-events).
- `status` ([Enum8](../../sql-reference/data-types/enum.md)) — статус представления. Возможные значения:
- `'QueryStart' = 1` — успешное начало выполнения представления. Не должно отображаться.
- `'QueryFinish' = 2` — успешное завершение выполнения представления.
- `'ExceptionBeforeStart' = 3` — исключение до начала выполнения представления.
- `'ExceptionWhileProcessing' = 4` — исключение во время выполнения представления.
- `exception_code` ([Int32](../../sql-reference/data-types/int-uint.md)) — код исключения.
- `exception` ([String](../../sql-reference/data-types/string.md)) — сообщение исключения.
- `stack_trace` ([String](../../sql-reference/data-types/string.md)) — [трассировка стека](https://ru.wikipedia.org/wiki/Трассировка_стека). Пустая строка, если запрос был успешно выполнен.
**Пример**
Запрос:
``` sql
SELECT * FROM system.query_views_log LIMIT 1 \G;
```
Результат:
``` text
Row 1:
──────
event_date: 2021-06-22
event_time: 2021-06-22 13:23:07
event_time_microseconds: 2021-06-22 13:23:07.738221
view_duration_ms: 0
initial_query_id: c3a1ac02-9cad-479b-af54-9e9c0a7afd70
view_name: default.matview_inner
view_uuid: 00000000-0000-0000-0000-000000000000
view_type: Materialized
view_query: SELECT * FROM default.table_b
view_target: default.`.inner.matview_inner`
read_rows: 4
read_bytes: 64
written_rows: 2
written_bytes: 32
peak_memory_usage: 4196188
ProfileEvents: {'FileOpen':2,'WriteBufferFromFileDescriptorWrite':2,'WriteBufferFromFileDescriptorWriteBytes':187,'IOBufferAllocs':3,'IOBufferAllocBytes':3145773,'FunctionExecute':3,'DiskWriteElapsedMicroseconds':13,'InsertedRows':2,'InsertedBytes':16,'SelectedRows':4,'SelectedBytes':48,'ContextLock':16,'RWLockAcquiredReadLocks':1,'RealTimeMicroseconds':698,'SoftPageFaults':4,'OSReadChars':463}
status: QueryFinish
exception_code: 0
exception:
stack_trace:
```
**См. также**
- [system.query_log](../../operations/system-tables/query_log.md#system_tables-query_log) — описание системной таблицы `query_log`, которая содержит общую информацию о выполненных запросах.
- [system.query_thread_log](../../operations/system-tables/query_thread_log.md#system_tables-query_thread_log) — описание системной таблицы `query_thread_log`, которая содержит информацию о каждом потоке выполнения запроса.

View File

@ -0,0 +1,66 @@
---
toc_priority: 311
toc_title: sparkbar
---
# sparkbar {#sparkbar}
Функция строит гистограмму частот по заданным значениям `x` и частоте повторения этих значений `y` на интервале `[min_x, max_x]`.
Если интервал для построения не указан, то в качестве нижней границы интервала будет взято минимальное значение `x`, а в качестве верхней границы — максимальное значение `x`.
**Синтаксис**
``` sql
sparkbar(width[, min_x, max_x])(x, y)
```
**Параметры**
- `width` — Количество столбцов гистограммы. Тип: [Integer](../../../sql-reference/data-types/int-uint.md).
- `min_x` — Начало интервала. Необязательный параметр.
- `max_x` — Конец интервала. Необязательный параметр.
**Аргументы**
- `x` — Поле со значениями.
- `y` — Поле с частотой повторения значений.
**Возвращаемые значения**
- Гистограмма частот.
**Пример**
Запрос:
``` sql
CREATE TABLE spark_bar_data (`cnt` UInt64,`event_date` Date) ENGINE = MergeTree ORDER BY event_date SETTINGS index_granularity = 8192;
INSERT INTO spark_bar_data VALUES(1,'2020-01-01'),(4,'2020-01-02'),(5,'2020-01-03'),(2,'2020-01-04'),(3,'2020-01-05'),(7,'2020-01-06'),(6,'2020-01-07'),(8,'2020-01-08'),(2,'2020-01-11');
SELECT sparkbar(9)(event_date,cnt) FROM spark_bar_data;
SELECT sparkbar(9,toDate('2020-01-01'),toDate('2020-01-10'))(event_date,cnt) FROM spark_bar_data;
```
Результат:
``` text
┌─sparkbar(9)(event_date, cnt)─┐
│ │
│ ▁▅▄▃██▅ ▁ │
│ │
└──────────────────────────────┘
┌─sparkbar(9, toDate('2020-01-01'), toDate('2020-01-10'))(event_date, cnt)─┐
│ │
│▁▄▄▂▅▇█▁ │
│ │
└──────────────────────────────────────────────────────────────────────────┘
```

View File

@ -357,7 +357,7 @@ Result:
## multiFuzzyMatchAny(haystack, distance, \[pattern<sub>1</sub>, pattern<sub>2</sub>, …, pattern<sub>n</sub>\]) {#multifuzzymatchanyhaystack-distance-pattern1-pattern2-patternn}
То же, что и `multiMatchAny`, но возвращает 1 если любой pattern соответствует haystack в пределах константного [редакционного расстояния](https://en.wikipedia.org/wiki/Edit_distance). Эта функция также находится в экспериментальном режиме и может быть очень медленной. За подробностями обращайтесь к [документации hyperscan](https://intel.github.io/hyperscan/dev-reference/compilation.html#approximate-matching).
То же, что и `multiMatchAny`, но возвращает 1 если любой шаблон соответствует haystack в пределах константного [редакционного расстояния](https://en.wikipedia.org/wiki/Edit_distance). Эта функция основана на экспериментальной библиотеке [hyperscan](https://intel.github.io/hyperscan/dev-reference/compilation.html#approximate-matching) и может быть медленной для некоторых частных случаев. Производительность зависит от значения редакционного расстояния и используемых шаблонов, но всегда медленнее по сравнению с non-fuzzy вариантами.
## multiFuzzyMatchAnyIndex(haystack, distance, \[pattern<sub>1</sub>, pattern<sub>2</sub>, …, pattern<sub>n</sub>\]) {#multifuzzymatchanyindexhaystack-distance-pattern1-pattern2-patternn}

View File

@ -0,0 +1,112 @@
---
toc_priority: 68
toc_title: Window View
---
# Window View 函数{#window-view-han-shu}
Window view函数用于获取窗口的起始(包含边界)和结束时间(不包含边界)。系统支持的window view函数如下
## tumble {#window-view-functions-tumble}
tumble窗口是连续的、不重叠的固定大小(`interval`)时间窗口。
``` sql
tumble(time_attr, interval [, timezone])
```
**参数**
- `time_attr` - [DateTime](../../sql-reference/data-types/datetime.md)类型的时间数据。
- `interval` - [Interval](../../sql-reference/data-types/special-data-types/interval.md)类型的窗口大小。
- `timezone` — [Timezone name](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone) 类型的时区(可选参数).
**返回值**
- tumble窗口的开始(包含边界)和结束时间(不包含边界)
类型: `Tuple(DateTime, DateTime)`
**示例**
查询:
``` sql
SELECT tumble(now(), toIntervalDay('1'))
```
结果:
``` text
┌─tumble(now(), toIntervalDay('1'))─────────────┐
│ ['2020-01-01 00:00:00','2020-01-02 00:00:00'] │
└───────────────────────────────────────────────┘
```
## hop {#window-view-functions-hop}
hop窗口是一个固定大小(`window_interval`)的时间窗口,并按照一个固定的滑动间隔(`hop_interval`)滑动。当滑动间隔小于窗口大小时,滑动窗口间存在重叠,此时一个数据可能存在于多个窗口。
``` sql
hop(time_attr, hop_interval, window_interval [, timezone])
```
**参数**
- `time_attr` - [DateTime](../../sql-reference/data-types/datetime.md)类型的时间数据。
- `hop_interval` - [Interval](../../sql-reference/data-types/special-data-types/interval.md)类型的滑动间隔需要大于0。
- `window_interval` - [Interval](../../sql-reference/data-types/special-data-types/interval.md)类型的窗口大小需要大于0。
- `timezone` — [Timezone name](../../operations/server-configuration-parameters/settings.md#server_configuration_parameters-timezone) 类型的时区(可选参数)。
**返回值**
- hop窗口的开始(包含边界)和结束时间(不包含边界)。由于一个数据可能存在于多个窗口脱离window view单独调用该函数时只返回第一个窗口数据。
类型: `Tuple(DateTime, DateTime)`
**示例**
查询:
``` sql
SELECT hop(now(), INTERVAL '1' SECOND, INTERVAL '2' SECOND)
```
结果:
``` text
┌─hop(now(), toIntervalSecond('1'), toIntervalSecond('2'))──┐
│ ('2020-01-14 16:58:22','2020-01-14 16:58:24') │
└───────────────────────────────────────────────────────────┘
```
## tumbleStart {#window-view-functions-tumblestart}
返回tumble窗口的开始时间(包含边界)。
``` sql
tumbleStart(time_attr, interval [, timezone]);
```
## tumbleEnd {#window-view-functions-tumbleend}
返回tumble窗口的结束时间(不包含边界)。
``` sql
tumbleEnd(time_attr, interval [, timezone]);
```
## hopStart {#window-view-functions-hopstart}
返回hop窗口的开始时间(包含边界)。
``` sql
hopStart(time_attr, hop_interval, window_interval [, timezone]);
```
## hopEnd {#window-view-functions-hopend}
返回hop窗口的结束时间(不包含边界)。
``` sql
hopEnd(time_attr, hop_interval, window_interval [, timezone]);
```

View File

@ -5,7 +5,7 @@ toc_title: VIEW
# CREATE VIEW {#create-view}
创建一个新视图。 有两种类型的视图:普通视图和物化视图。
创建一个新视图。 有两种类型的视图:普通视图物化视图Live视图和Window视图。
## Normal {#normal}
@ -241,3 +241,120 @@ Code: 60. DB::Exception: Received from localhost:9000. DB::Exception: Table defa
- 使用定期刷新从系统表中查看指标。
[原始文章](https://clickhouse.com/docs/en/sql-reference/statements/create/view/) <!--hide-->
## Window View [Experimental] {#window-view}
!!! important "重要"
这是一项试验性功能,可能会在未来版本中以向后不兼容的方式进行更改。
通过[allow_experimental_window_view](../../../operations/settings/settings.md#allow-experimental-window-view)启用window view以及`WATCH`语句。输入命令
`set allow_experimental_window_view = 1`
``` sql
CREATE WINDOW VIEW [IF NOT EXISTS] [db.]table_name [TO [db.]table_name] [ENGINE = engine] [WATERMARK = strategy] [ALLOWED_LATENESS = interval_function] AS SELECT ... GROUP BY window_view_function
```
Window view可以通过时间窗口聚合数据并在满足窗口触发条件时自动触发对应窗口计算。其通过将计算状态保存降低处理延迟支持将处理结果输出至目标表或通过`WATCH`语句输出至终端。
创建window view的方式和创建物化视图类似。Window view使用默认为`AggregatingMergeTree`的内部存储引擎存储计算中间状态。
### Window View 函数{#window-view-han-shu}
[Window view函数](../../functions/window-view-functions.md)用于获取窗口的起始和结束时间。Window view需要和window view函数配合使用。
### 时间属性{#window-view-shi-jian-shu-xing}
Window view 支持**处理时间**和**事件时间**两种时间类型。
**处理时间**为默认时间类型该模式下window view使用本地机器时间计算窗口数据。“处理时间”时间类型计算简单但具有不确定性。该模式下时间可以为window view函数的第一个参数`time_attr`,或通过函数`now()`使用当前机器时间。下面的例子展示了使用“处理时间”创建的window view的例子。
``` sql
CREATE WINDOW VIEW wv AS SELECT count(number), tumbleStart(w_id) as w_start from date GROUP BY tumble(now(), INTERVAL '5' SECOND) as w_id
```
**事件时间** 是事件真实发生的时间该时间往往在事件发生时便嵌入数据记录。事件时间处理提供较高的确定性可以处理乱序数据以及迟到数据。Window view 通过水位线(`WATERMARK`)启用事件时间处理。
Window view提供如下三种水位线策略
* `STRICTLY_ASCENDING`: 提交观测到的最大时间作为水位线,小于最大观测时间的数据不算迟到。
* `ASCENDING`: 提交观测到的最大时间减1作为水位线。小于或等于最大观测时间的数据不算迟到。
* `BOUNDED`: WATERMARK=INTERVAL. 提交最大观测时间减去固定间隔(`INTERVAL`)做为水位线。
以下为使用`WATERMARK`创建window view的示例
``` sql
CREATE WINDOW VIEW wv WATERMARK=STRICTLY_ASCENDING AS SELECT count(number) FROM date GROUP BY tumble(timestamp, INTERVAL '5' SECOND);
CREATE WINDOW VIEW wv WATERMARK=ASCENDING AS SELECT count(number) FROM date GROUP BY tumble(timestamp, INTERVAL '5' SECOND);
CREATE WINDOW VIEW wv WATERMARK=INTERVAL '3' SECOND AS SELECT count(number) FROM date GROUP BY tumble(timestamp, INTERVAL '5' SECOND);
```
通常窗口会在水位线到达时触发水位线到达之后的数据会被丢弃。Window view可以通过设置`ALLOWED_LATENESS=INTERVAL`来开启迟到消息处理。示例如下:
``` sql
CREATE WINDOW VIEW test.wv TO test.dst WATERMARK=ASCENDING ALLOWED_LATENESS=INTERVAL '2' SECOND AS SELECT count(a) AS count, tumbleEnd(wid) AS w_end FROM test.mt GROUP BY tumble(timestamp, INTERVAL '5' SECOND) AS wid;
```
需要注意的是迟到消息需要更新之前的处理结果。与在窗口结束时触发不同迟到消息到达时window view会立即触发计算。因此会导致同一个窗口输出多次计算结果。用户需要注意这种情况并消除重复结果。
### 新窗口监控{#window-view-xin-chuang-kou-jian-kong}
Window view可以通过`WATCH`语句将处理结果推送至终端,或通过`TO`语句将结果推送至数据表。
``` sql
WATCH [db.]name [LIMIT n]
```
`WATCH`语句和`LIVE VIEW`中的类似。支持设置`LIMIT`参数,输出消息数目达到`LIMIT`限制时结束查询。
### 设置{#window-view-she-zhi}
- `window_view_clean_interval`: window view清除过期数据间隔(单位为秒)。系统会定期清除过期数据,尚未触发的窗口数据不会被清除。
- `window_view_heartbeat_interval`: 用于判断watch查询活跃的心跳时间间隔。
### 示例{#window-view-shi-li}
假设我们需要每10秒统计一次`data`表中的点击日志,且`data`表的结构如下:
``` sql
CREATE TABLE data ( `id` UInt64, `timestamp` DateTime) ENGINE = Memory;
```
首先使用10秒大小的tumble函数创建window view。
``` sql
CREATE WINDOW VIEW wv as select count(id), tumbleStart(w_id) as window_start from data group by tumble(timestamp, INTERVAL '10' SECOND) as w_id
```
随后,我们使用`WATCH`语句获取计算结果。
``` sql
WATCH wv
```
当日志插入表`data`时,
``` sql
INSERT INTO data VALUES(1,now())
```
`WATCH`语句会输出如下结果:
``` text
┌─count(id)─┬────────window_start─┐
│ 1 │ 2020-01-14 16:56:40 │
└───────────┴─────────────────────┘
```
或者,我们可以通过`TO`关键字将处理结果输出至另一张表。
``` sql
CREATE WINDOW VIEW wv TO dst AS SELECT count(id), tumbleStart(w_id) as window_start FROM data GROUP BY tumble(timestamp, INTERVAL '10' SECOND) as w_id
```
ClickHouse测试中提供了更多的示例(以`*window_view*`命名)。
### Window View 使用场景{#window-view-shi-yong-chang-jing}
Window view 在以下场景有用:
* **监控**: 以时间维度聚合及处理数据,并将处理结果输出至目标表。用户可通过目标表获取并操作计算结果。
* **分析**: 以时间维度进行数据分析. 当数据源非常庞大时window view可以减少重复全表查询的计算量。

View File

@ -473,3 +473,7 @@ if (ENABLE_TESTS AND USE_GTEST)
add_custom_target (clickhouse-tests ALL DEPENDS ${CLICKHOUSE_UNIT_TESTS_TARGETS})
add_dependencies(clickhouse-bundle clickhouse-tests)
endif()
if (ENABLE_FUZZING)
add_compile_definitions(FUZZING_MODE=1)
endif ()

View File

@ -705,6 +705,12 @@ bool Client::processWithFuzzing(const String & full_query)
throw;
}
if (!orig_ast)
{
// Can't continue after a parsing error
return true;
}
// `USE db` should not be executed
// since this will break every query after `DROP db`
if (orig_ast->as<ASTUseQuery>())
@ -712,12 +718,6 @@ bool Client::processWithFuzzing(const String & full_query)
return true;
}
if (!orig_ast)
{
// Can't continue after a parsing error
return true;
}
// Don't repeat:
// - INSERT -- Because the tables may grow too big.
// - CREATE -- Because first we run the unmodified query, it will succeed,

View File

@ -17,3 +17,9 @@ clickhouse_program_add(local)
if(NOT CLICKHOUSE_ONE_SHARED)
target_link_libraries(clickhouse-local-lib PRIVATE clickhouse-server-lib)
endif()
if (ENABLE_FUZZING)
add_compile_definitions(FUZZING_MODE=1)
set (WITH_COVERAGE ON)
target_link_libraries(clickhouse-local-lib PRIVATE ${LIB_FUZZING_ENGINE})
endif ()

View File

@ -41,6 +41,10 @@
#include <base/argsToConfig.h>
#include <filesystem>
#if defined(FUZZING_MODE)
#include <Functions/getFuzzerData.h>
#endif
namespace fs = std::filesystem;
@ -407,10 +411,25 @@ try
std::cout << std::fixed << std::setprecision(3);
std::cerr << std::fixed << std::setprecision(3);
#if defined(FUZZING_MODE)
static bool first_time = true;
if (first_time)
{
if (queries_files.empty() && !config().has("query"))
{
std::cerr << "\033[31m" << "ClickHouse compiled in fuzzing mode." << "\033[0m" << std::endl;
std::cerr << "\033[31m" << "You have to provide a query with --query or --queries-file option." << "\033[0m" << std::endl;
std::cerr << "\033[31m" << "The query have to use function getFuzzerData() inside." << "\033[0m" << std::endl;
exit(1);
}
is_interactive = false;
#else
is_interactive = stdin_is_a_tty
&& (config().hasOption("interactive")
|| (!config().has("query") && !config().has("table-structure") && queries_files.empty()));
#endif
if (!is_interactive)
{
/// We will terminate process on error
@ -439,6 +458,11 @@ try
connect();
#ifdef FUZZING_MODE
first_time = false;
}
#endif
if (is_interactive && !delayed_interactive)
{
runInteractive();
@ -451,7 +475,9 @@ try
runInteractive();
}
#ifndef FUZZING_MODE
cleanup();
#endif
return Application::EXIT_OK;
}
catch (const DB::Exception & e)
@ -653,7 +679,7 @@ void LocalServer::processConfig()
}
static std::string getHelpHeader()
[[ maybe_unused ]] static std::string getHelpHeader()
{
return
"usage: clickhouse-local [initial table definition] [--query <query>]\n"
@ -669,7 +695,7 @@ static std::string getHelpHeader()
}
static std::string getHelpFooter()
[[ maybe_unused ]] static std::string getHelpFooter()
{
return
"Example printing memory used by each Unix user:\n"
@ -680,11 +706,23 @@ static std::string getHelpFooter()
}
void LocalServer::printHelpMessage(const OptionsDescription & options_description)
void LocalServer::printHelpMessage([[maybe_unused]] const OptionsDescription & options_description)
{
#if defined(FUZZING_MODE)
std::cout <<
"usage: clickhouse <clickhouse-local arguments> -- <libfuzzer arguments>\n"
"Note: It is important not to use only one letter keys with single dash for \n"
"for clickhouse-local arguments. It may work incorrectly.\n"
"ClickHouse is build with coverage guided fuzzer (libfuzzer) inside it.\n"
"You have to provide a query which contains getFuzzerData function.\n"
"This will take the data from fuzzing engine, pass it to getFuzzerData function and execute a query.\n"
"Each time the data will be different, and it will last until some segfault or sanitizer assertion is found. \n";
#else
std::cout << getHelpHeader() << "\n";
std::cout << options_description.main_description.value() << "\n";
std::cout << getHelpFooter() << "\n";
#endif
}
@ -781,3 +819,51 @@ int mainEntryClickHouseLocal(int argc, char ** argv)
return code ? code : 1;
}
}
#if defined(FUZZING_MODE)
std::optional<DB::LocalServer> fuzz_app;
extern "C" int LLVMFuzzerInitialize(int * pargc, char *** pargv)
{
int & argc = *pargc;
char ** argv = *pargv;
/// As a user you can add flags to clickhouse binary in fuzzing mode as follows
/// clickhouse <set of clickhouse-local specific flag> -- <set of libfuzzer flags>
/// Calculate the position of delimiter "--" that separates arguments
/// of clickhouse-local and libfuzzer
int pos_delim = argc;
for (int i = 0; i < argc; ++i)
{
if (strcmp(argv[i], "--") == 0)
{
pos_delim = i;
break;
}
}
/// Initialize clickhouse-local app
fuzz_app.emplace();
fuzz_app->init(pos_delim, argv);
/// We will leave clickhouse-local specific arguments as is, because libfuzzer will ignore
/// all keys starting with --
return 0;
}
extern "C" int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size)
try
{
auto input = String(reinterpret_cast<const char *>(data), size);
DB::FunctionGetFuzzerData::update(input);
fuzz_app->run();
return 0;
}
catch (...)
{
return 1;
}
#endif

View File

@ -88,6 +88,7 @@ namespace
using MainFunc = int (*)(int, char**);
#if !defined(FUZZING_MODE)
/// Add an item here to register new application
std::pair<const char *, MainFunc> clickhouse_applications[] =
@ -141,7 +142,6 @@ std::pair<const char *, MainFunc> clickhouse_applications[] =
{"hash-binary", mainEntryClickHouseHashBinary},
};
int printHelp(int, char **)
{
std::cerr << "Use one of the following commands:" << std::endl;
@ -150,7 +150,6 @@ int printHelp(int, char **)
return -1;
}
bool isClickhouseApp(const std::string & app_suffix, std::vector<char *> & argv)
{
/// Use app if the first arg 'app' is passed (the arg should be quietly removed)
@ -170,6 +169,7 @@ bool isClickhouseApp(const std::string & app_suffix, std::vector<char *> & argv)
std::string app_name = "clickhouse-" + app_suffix;
return !argv.empty() && (app_name == argv[0] || endsWith(argv[0], "/" + app_name));
}
#endif
enum class InstructionFail
@ -342,9 +342,13 @@ struct Checker
///
/// extern bool inside_main;
/// class C { C() { assert(inside_main); } };
#ifndef FUZZING_MODE
bool inside_main = false;
#else
bool inside_main = true;
#endif
#if !defined(FUZZING_MODE)
int main(int argc_, char ** argv_)
{
inside_main = true;
@ -375,3 +379,4 @@ int main(int argc_, char ** argv_)
return main_func(static_cast<int>(argv.size()), argv.data());
}
#endif

View File

@ -650,6 +650,38 @@
</replica>
</shard>
</test_shard_localhost>
<test_cluster_one_shard_three_replicas_localhost>
<shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
<replica>
<host>127.0.0.2</host>
<port>9000</port>
</replica>
<replica>
<host>127.0.0.3</host>
<port>9000</port>
</replica>
</shard>
<!--shard>
<internal_replication>false</internal_replication>
<replica>
<host>127.0.0.1</host>
<port>9000</port>
</replica>
<replica>
<host>127.0.0.2</host>
<port>9000</port>
</replica>
<replica>
<host>127.0.0.3</host>
<port>9000</port>
</replica>
</shard-->
</test_cluster_one_shard_three_replicas_localhost>
<test_cluster_two_shards_localhost>
<shard>
<replica>

View File

@ -20,6 +20,7 @@
namespace DB
{
struct Settings;
template <typename T> constexpr bool DecimalOrExtendedInt =
@ -42,39 +43,19 @@ struct AvgFraction
/// Invoked only is either Numerator or Denominator are Decimal.
Float64 NO_SANITIZE_UNDEFINED divideIfAnyDecimal(UInt32 num_scale, UInt32 denom_scale [[maybe_unused]]) const
{
if constexpr (is_decimal<Numerator> && is_decimal<Denominator>)
{
// According to the docs, num(S1) / denom(S2) would have scale S1
if constexpr (std::is_same_v<Numerator, Decimal256> && std::is_same_v<Denominator, Decimal128>)
///Special case as Decimal256 / Decimal128 = compile error (as Decimal128 is not parametrized by a wide
///int), but an __int128 instead
return DecimalUtils::convertTo<Float64>(
numerator / (denominator.template convertTo<Decimal256>()), num_scale);
else
return DecimalUtils::convertTo<Float64>(numerator / denominator, num_scale);
}
/// Numerator is always casted to Float64 to divide correctly if the denominator is not Float64.
Float64 num_converted;
Float64 numerator_float;
if constexpr (is_decimal<Numerator>)
num_converted = DecimalUtils::convertTo<Float64>(numerator, num_scale);
numerator_float = DecimalUtils::convertTo<Float64>(numerator, num_scale);
else
num_converted = static_cast<Float64>(numerator); /// all other types, including extended integral.
std::conditional_t<DecimalOrExtendedInt<Denominator>,
Float64, Denominator> denom_converted;
numerator_float = numerator;
Float64 denominator_float;
if constexpr (is_decimal<Denominator>)
denom_converted = DecimalUtils::convertTo<Float64>(denominator, denom_scale);
else if constexpr (DecimalOrExtendedInt<Denominator>)
/// no way to divide Float64 and extended integral type without an explicit cast.
denom_converted = static_cast<Float64>(denominator);
denominator_float = DecimalUtils::convertTo<Float64>(denominator, denom_scale);
else
denom_converted = denominator; /// can divide on float, no cast required.
denominator_float = denominator;
return num_converted / denom_converted;
return numerator_float / denominator_float;
}
Float64 NO_SANITIZE_UNDEFINED divide() const

View File

@ -82,17 +82,17 @@ createAggregateFunctionAvgWeighted(const std::string & name, const DataTypes & a
const bool left_decimal = isDecimal(data_type);
const bool right_decimal = isDecimal(data_type_weight);
/// We multiply value by weight, so actual scale of numerator is <scale of value> + <scale of weight>
if (left_decimal && right_decimal)
ptr.reset(create(*data_type, *data_type_weight,
argument_types,
getDecimalScale(*data_type), getDecimalScale(*data_type_weight)));
getDecimalScale(*data_type) + getDecimalScale(*data_type_weight), getDecimalScale(*data_type_weight)));
else if (left_decimal)
ptr.reset(create(*data_type, *data_type_weight, argument_types,
getDecimalScale(*data_type)));
else if (right_decimal)
ptr.reset(create(*data_type, *data_type_weight, argument_types,
// numerator is not decimal, so its scale is 0
0, getDecimalScale(*data_type_weight)));
getDecimalScale(*data_type_weight), getDecimalScale(*data_type_weight)));
else
ptr.reset(create(*data_type, *data_type_weight, argument_types));

View File

@ -517,6 +517,8 @@ if (USE_BZIP2)
target_include_directories (clickhouse_common_io SYSTEM BEFORE PRIVATE ${BZIP2_INCLUDE_DIR})
endif()
dbms_target_link_libraries(PUBLIC consistent-hashing)
include ("${ClickHouse_SOURCE_DIR}/cmake/add_check.cmake")
if (ENABLE_TESTS AND USE_GTEST)

View File

@ -603,6 +603,14 @@ void Connection::sendReadTaskResponse(const String & response)
out->next();
}
void Connection::sendMergeTreeReadTaskResponse(const PartitionReadResponse & response)
{
writeVarUInt(Protocol::Client::MergeTreeReadTaskResponse, *out);
response.serialize(*out);
out->next();
}
void Connection::sendPreparedData(ReadBuffer & input, size_t size, const String & name)
{
/// NOTE 'Throttler' is not used in this method (could use, but it's not important right now).
@ -872,6 +880,10 @@ Packet Connection::receivePacket()
case Protocol::Server::ReadTaskRequest:
return res;
case Protocol::Server::MergeTreeReadTaskRequest:
res.request = receivePartitionReadRequest();
return res;
case Protocol::Server::ProfileEvents:
res.block = receiveProfileEvents();
return res;
@ -1023,6 +1035,13 @@ ProfileInfo Connection::receiveProfileInfo() const
return profile_info;
}
PartitionReadRequest Connection::receivePartitionReadRequest() const
{
PartitionReadRequest request;
request.deserialize(*in);
return request;
}
void Connection::throwUnexpectedPacket(UInt64 packet_type, const char * expected) const
{

View File

@ -16,6 +16,8 @@
#include <Compression/ICompressionCodec.h>
#include <Storages/MergeTree/RequestResponse.h>
#include <atomic>
#include <optional>
@ -104,6 +106,8 @@ public:
void sendData(const Block & block, const String & name/* = "" */, bool scalar/* = false */) override;
void sendMergeTreeReadTaskResponse(const PartitionReadResponse & response) override;
void sendExternalTablesData(ExternalTablesData & data) override;
bool poll(size_t timeout_microseconds/* = 0 */) override;
@ -255,6 +259,7 @@ private:
std::vector<String> receiveMultistringMessage(UInt64 msg_type) const;
std::unique_ptr<Exception> receiveException() const;
Progress receiveProgress() const;
PartitionReadRequest receivePartitionReadRequest() const;
ProfileInfo receiveProfileInfo() const;
void initInputBuffers();

View File

@ -132,7 +132,7 @@ void HedgedConnections::sendQuery(
const String & query,
const String & query_id,
UInt64 stage,
const ClientInfo & client_info,
ClientInfo & client_info,
bool with_pending_data)
{
std::lock_guard lock(cancel_mutex);
@ -171,7 +171,9 @@ void HedgedConnections::sendQuery(
modified_settings.group_by_two_level_threshold_bytes = 0;
}
if (offset_states.size() > 1)
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && !settings.allow_experimental_parallel_reading_from_replicas;
if (offset_states.size() > 1 && enable_sample_offset_parallel_processing)
{
modified_settings.parallel_replicas_count = offset_states.size();
modified_settings.parallel_replica_offset = fd_to_replica_location[replica.packet_receiver->getFileDescriptor()].offset;

View File

@ -86,7 +86,7 @@ public:
const String & query,
const String & query_id,
UInt64 stage,
const ClientInfo & client_info,
ClientInfo & client_info,
bool with_pending_data) override;
void sendReadTaskResponse(const String &) override
@ -94,6 +94,11 @@ public:
throw Exception("sendReadTaskResponse in not supported with HedgedConnections", ErrorCodes::LOGICAL_ERROR);
}
void sendMergeTreeReadTaskResponse(PartitionReadResponse) override
{
throw Exception("sendMergeTreeReadTaskResponse in not supported with HedgedConnections", ErrorCodes::LOGICAL_ERROR);
}
Packet receivePacket() override;
Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) override;
@ -112,6 +117,8 @@ public:
bool hasActiveConnections() const override { return active_connection_count > 0; }
void setReplicaInfo(ReplicaInfo value) override { replica_info = value; }
private:
/// If we don't receive data from replica and there is no progress in query
/// execution for receive_data_timeout, we are trying to get new
@ -199,6 +206,8 @@ private:
bool sent_query = false;
bool cancelled = false;
ReplicaInfo replica_info;
mutable std::mutex cancel_mutex;
};

View File

@ -1,6 +1,9 @@
#pragma once
#include <compare>
#include <Client/Connection.h>
#include <Storages/MergeTree/RequestResponse.h>
namespace DB
{
@ -27,10 +30,11 @@ public:
const String & query,
const String & query_id,
UInt64 stage,
const ClientInfo & client_info,
ClientInfo & client_info,
bool with_pending_data) = 0;
virtual void sendReadTaskResponse(const String &) = 0;
virtual void sendMergeTreeReadTaskResponse(PartitionReadResponse response) = 0;
/// Get packet from any replica.
virtual Packet receivePacket() = 0;
@ -56,6 +60,17 @@ public:
/// Get the replica addresses as a string.
virtual std::string dumpAddresses() const = 0;
struct ReplicaInfo
{
size_t all_replicas_count{0};
size_t number_of_current_replica{0};
};
/// This is needed in max_parallel_replicas case.
/// We create a RemoteQueryExecutor for each replica
virtual void setReplicaInfo(ReplicaInfo value) = 0;
/// Returns the number of replicas.
virtual size_t size() const = 0;

View File

@ -12,6 +12,8 @@
#include <IO/ConnectionTimeouts.h>
#include <IO/Progress.h>
#include <Storages/MergeTree/RequestResponse.h>
#include <boost/noncopyable.hpp>
@ -32,10 +34,13 @@ struct Packet
Progress progress;
ProfileInfo profile_info;
std::vector<UUID> part_uuids;
PartitionReadRequest request;
PartitionReadResponse response;
Packet() : type(Protocol::Server::Hello) {}
};
/// Struct which represents data we are going to send for external table.
struct ExternalTableData
{
@ -96,6 +101,8 @@ public:
/// Send all contents of external (temporary) tables.
virtual void sendExternalTablesData(ExternalTablesData & data) = 0;
virtual void sendMergeTreeReadTaskResponse(const PartitionReadResponse & response) = 0;
/// Check, if has data to read.
virtual bool poll(size_t timeout_microseconds) = 0;

View File

@ -424,6 +424,11 @@ void LocalConnection::sendExternalTablesData(ExternalTablesData &)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
void LocalConnection::sendMergeTreeReadTaskResponse(const PartitionReadResponse &)
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
ServerConnectionPtr LocalConnection::createConnection(const ConnectionParameters &, ContextPtr current_context, bool send_progress)
{
return std::make_unique<LocalConnection>(current_context, send_progress);

View File

@ -92,6 +92,8 @@ public:
void sendExternalTablesData(ExternalTablesData &) override;
void sendMergeTreeReadTaskResponse(const PartitionReadResponse & response) override;
bool poll(size_t timeout_microseconds/* = 0 */) override;
bool hasReadPendingData() const override;

View File

@ -1,9 +1,10 @@
#include <Client/MultiplexedConnections.h>
#include <Common/thread_local_rng.h>
#include <Core/Protocol.h>
#include <IO/ConnectionTimeouts.h>
#include <IO/Operators.h>
#include <Common/thread_local_rng.h>
#include "Core/Protocol.h"
#include <Interpreters/ClientInfo.h>
namespace DB
{
@ -110,7 +111,7 @@ void MultiplexedConnections::sendQuery(
const String & query,
const String & query_id,
UInt64 stage,
const ClientInfo & client_info,
ClientInfo & client_info,
bool with_pending_data)
{
std::lock_guard lock(cancel_mutex);
@ -131,16 +132,29 @@ void MultiplexedConnections::sendQuery(
modified_settings.group_by_two_level_threshold = 0;
modified_settings.group_by_two_level_threshold_bytes = 0;
}
if (settings.allow_experimental_parallel_reading_from_replicas)
{
client_info.collaborate_with_initiator = true;
client_info.count_participating_replicas = replica_info.all_replicas_count;
client_info.number_of_current_replica = replica_info.number_of_current_replica;
}
}
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && !settings.allow_experimental_parallel_reading_from_replicas;
size_t num_replicas = replica_states.size();
if (num_replicas > 1)
{
/// Use multiple replicas for parallel query processing.
modified_settings.parallel_replicas_count = num_replicas;
if (enable_sample_offset_parallel_processing)
/// Use multiple replicas for parallel query processing.
modified_settings.parallel_replicas_count = num_replicas;
for (size_t i = 0; i < num_replicas; ++i)
{
modified_settings.parallel_replica_offset = i;
if (enable_sample_offset_parallel_processing)
modified_settings.parallel_replica_offset = i;
replica_states[i].connection->sendQuery(timeouts, query, query_id,
stage, &modified_settings, &client_info, with_pending_data);
}
@ -179,6 +193,16 @@ void MultiplexedConnections::sendReadTaskResponse(const String & response)
current_connection->sendReadTaskResponse(response);
}
void MultiplexedConnections::sendMergeTreeReadTaskResponse(PartitionReadResponse response)
{
std::lock_guard lock(cancel_mutex);
if (cancelled)
return;
current_connection->sendMergeTreeReadTaskResponse(response);
}
Packet MultiplexedConnections::receivePacket()
{
std::lock_guard lock(cancel_mutex);
@ -234,6 +258,7 @@ Packet MultiplexedConnections::drain()
switch (packet.type)
{
case Protocol::Server::MergeTreeReadTaskRequest:
case Protocol::Server::ReadTaskRequest:
case Protocol::Server::PartUUIDs:
case Protocol::Server::Data:
@ -313,6 +338,7 @@ Packet MultiplexedConnections::receivePacketUnlocked(AsyncCallback async_callbac
switch (packet.type)
{
case Protocol::Server::MergeTreeReadTaskRequest:
case Protocol::Server::ReadTaskRequest:
case Protocol::Server::PartUUIDs:
case Protocol::Server::Data:

View File

@ -38,10 +38,11 @@ public:
const String & query,
const String & query_id,
UInt64 stage,
const ClientInfo & client_info,
ClientInfo & client_info,
bool with_pending_data) override;
void sendReadTaskResponse(const String &) override;
void sendMergeTreeReadTaskResponse(PartitionReadResponse response) override;
Packet receivePacket() override;
@ -62,6 +63,7 @@ public:
/// Without locking, because sendCancel() does not change the state of the replicas.
bool hasActiveConnections() const override { return active_connection_count > 0; }
void setReplicaInfo(ReplicaInfo value) override { replica_info = value; }
private:
Packet receivePacketUnlocked(AsyncCallback async_callback, bool is_draining) override;
@ -102,6 +104,8 @@ private:
bool sent_query = false;
bool cancelled = false;
ReplicaInfo replica_info;
/// A mutex for the sendCancel function to execute safely
/// in separate thread.
mutable std::mutex cancel_mutex;

View File

@ -163,4 +163,3 @@ protected:
/** Creates a new object to put into the pool. */
virtual ObjectPtr allocObject() = 0;
};

View File

@ -64,24 +64,26 @@ namespace Protocol
{
enum Enum
{
Hello = 0, /// Name, version, revision.
Data = 1, /// A block of data (compressed or not).
Exception = 2, /// The exception during query execution.
Progress = 3, /// Query execution progress: rows read, bytes read.
Pong = 4, /// Ping response
EndOfStream = 5, /// All packets were transmitted
ProfileInfo = 6, /// Packet with profiling info.
Totals = 7, /// A block with totals (compressed or not).
Extremes = 8, /// A block with minimums and maximums (compressed or not).
TablesStatusResponse = 9, /// A response to TablesStatus request.
Log = 10, /// System logs of the query execution
TableColumns = 11, /// Columns' description for default values calculation
PartUUIDs = 12, /// List of unique parts ids.
ReadTaskRequest = 13, /// String (UUID) describes a request for which next task is needed
/// This is such an inverted logic, where server sends requests
/// And client returns back response
ProfileEvents = 14, /// Packet with profile events from server.
MAX = ProfileEvents,
Hello = 0, /// Name, version, revision.
Data = 1, /// A block of data (compressed or not).
Exception = 2, /// The exception during query execution.
Progress = 3, /// Query execution progress: rows read, bytes read.
Pong = 4, /// Ping response
EndOfStream = 5, /// All packets were transmitted
ProfileInfo = 6, /// Packet with profiling info.
Totals = 7, /// A block with totals (compressed or not).
Extremes = 8, /// A block with minimums and maximums (compressed or not).
TablesStatusResponse = 9, /// A response to TablesStatus request.
Log = 10, /// System logs of the query execution
TableColumns = 11, /// Columns' description for default values calculation
PartUUIDs = 12, /// List of unique parts ids.
ReadTaskRequest = 13, /// String (UUID) describes a request for which next task is needed
/// This is such an inverted logic, where server sends requests
/// And client returns back response
ProfileEvents = 14, /// Packet with profile events from server.
MergeTreeReadTaskRequest = 15, /// Request from a MergeTree replica to a coordinator
MAX = MergeTreeReadTaskRequest,
};
/// NOTE: If the type of packet argument would be Enum, the comparison packet >= 0 && packet < 10
@ -106,6 +108,7 @@ namespace Protocol
"PartUUIDs",
"ReadTaskRequest",
"ProfileEvents",
"MergeTreeReadTaskRequest",
};
return packet <= MAX
? data[packet]
@ -130,20 +133,20 @@ namespace Protocol
{
enum Enum
{
Hello = 0, /// Name, version, revision, default DB
Query = 1, /// Query id, query settings, stage up to which the query must be executed,
/// whether the compression must be used,
/// query text (without data for INSERTs).
Data = 2, /// A block of data (compressed or not).
Cancel = 3, /// Cancel the query execution.
Ping = 4, /// Check that connection to the server is alive.
TablesStatusRequest = 5, /// Check status of tables on the server.
KeepAlive = 6, /// Keep the connection alive
Scalar = 7, /// A block of data (compressed or not).
IgnoredPartUUIDs = 8, /// List of unique parts ids to exclude from query processing
ReadTaskResponse = 9, /// TODO:
MAX = ReadTaskResponse,
Hello = 0, /// Name, version, revision, default DB
Query = 1, /// Query id, query settings, stage up to which the query must be executed,
/// whether the compression must be used,
/// query text (without data for INSERTs).
Data = 2, /// A block of data (compressed or not).
Cancel = 3, /// Cancel the query execution.
Ping = 4, /// Check that connection to the server is alive.
TablesStatusRequest = 5, /// Check status of tables on the server.
KeepAlive = 6, /// Keep the connection alive
Scalar = 7, /// A block of data (compressed or not).
IgnoredPartUUIDs = 8, /// List of unique parts ids to exclude from query processing
ReadTaskResponse = 9, /// A filename to read from s3 (used in s3Cluster)
MergeTreeReadTaskResponse = 10, /// Coordinator's decision with a modified set of mark ranges allowed to read
MAX = MergeTreeReadTaskResponse,
};
inline const char * toString(UInt64 packet)
@ -159,6 +162,7 @@ namespace Protocol
"Scalar",
"IgnoredPartUUIDs",
"ReadTaskResponse",
"MergeTreeReadTaskResponse"
};
return packet <= MAX
? data[packet]

View File

@ -31,6 +31,9 @@
#define DBMS_CLUSTER_PROCESSING_PROTOCOL_VERSION 1
#define DBMS_PARALLEL_REPLICAS_PROTOCOL_VERSION 1
#define DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS 54453
/// Minimum revision supporting interserver secret.
#define DBMS_MIN_REVISION_WITH_INTERSERVER_SECRET 54441
@ -48,6 +51,7 @@
/// NOTE: DBMS_TCP_PROTOCOL_VERSION has nothing common with VERSION_REVISION,
/// later is just a number for server version (one number instead of commit SHA)
/// for simplicity (sometimes it may be more convenient in some use cases).
#define DBMS_TCP_PROTOCOL_VERSION 54452
#define DBMS_TCP_PROTOCOL_VERSION 54453
#define DBMS_MIN_PROTOCOL_VERSION_WITH_INITIAL_QUERY_START_TIME 54449

View File

@ -75,6 +75,7 @@ class IColumn;
M(UInt64, s3_max_single_read_retries, 4, "The maximum number of retries during single S3 read.", 0) \
M(UInt64, s3_max_redirects, 10, "Max number of S3 redirects hops allowed.", 0) \
M(UInt64, s3_max_connections, 1024, "The maximum number of connections per server.", 0) \
M(UInt64, hdfs_replication, 0, "The actual number of replications can be specified when the hdfs file is created.", 0) \
M(UInt64, hsts_max_age, 0, "Expired time for hsts. 0 means disable HSTS.", 0) \
M(Bool, extremes, false, "Calculate minimums and maximums of the result columns. They can be output in JSON-formats.", IMPORTANT) \
M(Bool, use_uncompressed_cache, false, "Whether to use the cache of uncompressed blocks.", 0) \
@ -125,6 +126,8 @@ class IColumn;
M(UInt64, parallel_replicas_count, 0, "", 0) \
M(UInt64, parallel_replica_offset, 0, "", 0) \
\
M(Bool, allow_experimental_parallel_reading_from_replicas, false, "If true, ClickHouse will send a SELECT query to all replicas of a table. It will work for any kind on MergeTree table.", 0) \
\
M(Bool, skip_unavailable_shards, false, "If true, ClickHouse silently skips unavailable shards and nodes unresolvable through DNS. Shard is marked as unavailable when none of the replicas can be reached.", 0) \
\
M(UInt64, parallel_distributed_insert_select, 0, "Process distributed INSERT SELECT query in the same cluster on local tables on every shard, if 1 SELECT is executed on each shard, if 2 SELECT and INSERT is executed on each shard", 0) \

View File

@ -139,17 +139,20 @@ static DataTypePtr create(const ASTPtr & arguments)
if (!arguments || arguments->children.empty())
throw Exception("Data type AggregateFunction requires parameters: "
"name of aggregate function and list of data types for arguments", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
"version(optionally), name of aggregate function and list of data types for arguments", ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH);
ASTPtr data_type_ast = arguments->children[0];
size_t argument_types_start_idx = 1;
/* If aggregate function definition doesn't have version, it will have in AST children args [ASTFunction, types...] - in case
* it is parametric, or [ASTIdentifier, types...] - otherwise. If aggregate function has version in AST, then it will be:
* [ASTLitearl, ASTFunction (or ASTIdentifier), types...].
* [ASTLiteral, ASTFunction (or ASTIdentifier), types...].
*/
if (auto * version_ast = arguments->children[0]->as<ASTLiteral>())
{
if (arguments->children.size() < 2)
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Data type AggregateFunction has version, but it requires at least one more parameter - name of aggregate function");
version = version_ast->value.safeGet<UInt64>();
data_type_ast = arguments->children[1];
argument_types_start_idx = 2;

View File

@ -91,5 +91,6 @@ void registerDataTypeString(DataTypeFactory & factory)
factory.registerAlias("NCHAR LARGE OBJECT", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("BINARY LARGE OBJECT", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("BINARY VARYING", "String", DataTypeFactory::CaseInsensitive);
factory.registerAlias("VARBINARY", "String", DataTypeFactory::CaseInsensitive);
}
}

View File

@ -11,7 +11,7 @@
namespace DB
{
TableNamesSet getDependenciesSetFromCreateQuery(ContextPtr global_context, const ASTPtr & ast)
TableNamesSet getDependenciesSetFromCreateQuery(ContextPtr global_context, const QualifiedTableName & table, const ASTPtr & ast)
{
assert(global_context == global_context->getGlobalContext());
TableLoadingDependenciesVisitor::Data data;
@ -20,6 +20,7 @@ TableNamesSet getDependenciesSetFromCreateQuery(ContextPtr global_context, const
data.global_context = global_context;
TableLoadingDependenciesVisitor visitor{data};
visitor.visit(ast);
data.dependencies.erase(table);
return data.dependencies;
}
@ -132,7 +133,10 @@ void DDLDependencyVisitor::extractTableNameFromArgument(const ASTFunction & func
}
if (qualified_name.database.empty())
{
/// It can be table/dictionary from default database or XML dictionary, but we cannot distinguish it here.
qualified_name.database = data.default_database;
}
data.dependencies.emplace(std::move(qualified_name));
}

View File

@ -12,7 +12,7 @@ class ASTStorage;
using TableNamesSet = std::unordered_set<QualifiedTableName>;
TableNamesSet getDependenciesSetFromCreateQuery(ContextPtr global_context, const ASTPtr & ast);
TableNamesSet getDependenciesSetFromCreateQuery(ContextPtr global_context, const QualifiedTableName & table, const ASTPtr & ast);
/// Visits ASTCreateQuery and extracts names of table (or dictionary) dependencies
/// from column default expressions (joinGet, dictGet, etc)

View File

@ -269,6 +269,7 @@ StoragePtr DatabaseLazy::loadTable(const String & table_name) const
}
void DatabaseLazy::clearExpiredTables() const
try
{
std::lock_guard lock(mutex);
auto time_now = std::chrono::system_clock::to_time_t(std::chrono::system_clock::now());
@ -303,6 +304,10 @@ void DatabaseLazy::clearExpiredTables() const
cache_expiration_queue.splice(cache_expiration_queue.begin(), busy_tables, busy_tables.begin(), busy_tables.end());
}
catch (...)
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
}
DatabaseLazyIterator::DatabaseLazyIterator(const DatabaseLazy & database_, Strings && table_names_)

View File

@ -121,7 +121,7 @@ void DatabaseMemory::alterTable(ContextPtr local_context, const StorageID & tabl
throw Exception(ErrorCodes::UNKNOWN_TABLE, "Cannot alter: There is no metadata of table {}", table_id.getNameForLogs());
applyMetadataChangesToCreateQuery(it->second, metadata);
TableNamesSet new_dependencies = getDependenciesSetFromCreateQuery(local_context->getGlobalContext(), it->second);
TableNamesSet new_dependencies = getDependenciesSetFromCreateQuery(local_context->getGlobalContext(), table_id.getQualifiedName(), it->second);
DatabaseCatalog::instance().updateLoadingDependencies(table_id, std::move(new_dependencies));
}

View File

@ -181,8 +181,8 @@ void DatabaseOrdinary::loadTablesMetadata(ContextPtr local_context, ParsedTables
return;
}
TableNamesSet loading_dependencies = getDependenciesSetFromCreateQuery(getContext(), ast);
QualifiedTableName qualified_name{database_name, create_query->getTable()};
TableNamesSet loading_dependencies = getDependenciesSetFromCreateQuery(getContext(), qualified_name, ast);
std::lock_guard lock{metadata.mutex};
metadata.parsed_tables[qualified_name] = ParsedTableMetadata{full_path.string(), ast};
@ -297,7 +297,7 @@ void DatabaseOrdinary::alterTable(ContextPtr local_context, const StorageID & ta
out.close();
}
TableNamesSet new_dependencies = getDependenciesSetFromCreateQuery(local_context->getGlobalContext(), ast);
TableNamesSet new_dependencies = getDependenciesSetFromCreateQuery(local_context->getGlobalContext(), table_id.getQualifiedName(), ast);
DatabaseCatalog::instance().updateLoadingDependencies(table_id, std::move(new_dependencies));
commitAlterTable(table_id, table_metadata_tmp_path, table_metadata_path, statement, local_context);

View File

@ -133,10 +133,14 @@ void TablesLoader::removeUnresolvableDependencies(bool remove_loaded)
/// Table exists and it's already loaded
if (DatabaseCatalog::instance().isTableExist(StorageID(dependency_name.database, dependency_name.table), global_context))
return remove_loaded;
/// It's XML dictionary. It was loaded before tables and DDL dictionaries.
/// It's XML dictionary.
if (dependency_name.database == metadata.default_database &&
global_context->getExternalDictionariesLoader().has(dependency_name.table))
return remove_loaded;
{
LOG_WARNING(log, "Tables {} depend on XML dictionary {}, but XML dictionaries are loaded independently."
"Consider converting it to DDL dictionary.", fmt::join(info.dependent_database_objects, ", "), dependency_name);
return true;
}
/// Some tables depends on table "dependency_name", but there is no such table in DatabaseCatalog and we don't have its metadata.
/// We will ignore it and try to load dependent tables without "dependency_name"

View File

@ -97,7 +97,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskHDFS::writeFile(const String & path
/// Single O_WRONLY in libhdfs adds O_TRUNC
auto hdfs_buffer = std::make_unique<WriteBufferFromHDFS>(hdfs_path,
config, buf_size,
config, settings->replication, buf_size,
mode == WriteMode::Rewrite ? O_WRONLY : O_WRONLY | O_APPEND);
return std::make_unique<WriteIndirectBufferFromRemoteFS<WriteBufferFromHDFS>>(std::move(hdfs_buffer),
@ -142,12 +142,13 @@ bool DiskHDFS::checkUniqueId(const String & hdfs_uri) const
namespace
{
std::unique_ptr<DiskHDFSSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix)
std::unique_ptr<DiskHDFSSettings> getSettings(const Poco::Util::AbstractConfiguration & config, const String & config_prefix, const Settings & settings)
{
return std::make_unique<DiskHDFSSettings>(
config.getUInt64(config_prefix + ".min_bytes_for_seek", 1024 * 1024),
config.getInt(config_prefix + ".thread_pool_size", 16),
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000));
config.getInt(config_prefix + ".objects_chunk_size_to_delete", 1000),
settings.hdfs_replication);
}
}
@ -173,7 +174,7 @@ void registerDiskHDFS(DiskFactory & factory)
return std::make_shared<DiskHDFS>(
name, uri,
getSettings(config, config_prefix),
getSettings(config, config_prefix, context_->getSettingsRef()),
metadata_disk, config);
};

View File

@ -14,14 +14,17 @@ struct DiskHDFSSettings
size_t min_bytes_for_seek;
int thread_pool_size;
int objects_chunk_size_to_delete;
int replication;
DiskHDFSSettings(
int min_bytes_for_seek_,
int thread_pool_size_,
int objects_chunk_size_to_delete_)
int objects_chunk_size_to_delete_,
int replication_)
: min_bytes_for_seek(min_bytes_for_seek_)
, thread_pool_size(thread_pool_size_)
, objects_chunk_size_to_delete(objects_chunk_size_to_delete_) {}
, objects_chunk_size_to_delete(objects_chunk_size_to_delete_)
, replication(replication_) {}
};

View File

@ -304,6 +304,26 @@ OutputFormatPtr FormatFactory::getOutputFormat(
return format;
}
String FormatFactory::getContentType(
const String & name,
ContextPtr context,
const std::optional<FormatSettings> & _format_settings) const
{
const auto & output_getter = getCreators(name).output_creator;
if (!output_getter)
throw Exception(ErrorCodes::FORMAT_IS_NOT_SUITABLE_FOR_OUTPUT, "Format {} is not suitable for output (with processors)", name);
auto format_settings = _format_settings ? *_format_settings : getFormatSettings(context);
Block empty_block;
RowOutputFormatParams empty_params;
WriteBufferFromOwnString empty_buffer;
auto format = output_getter(empty_buffer, empty_block, empty_params, format_settings);
return format->getContentType();
}
void FormatFactory::registerInputFormat(const String & name, InputCreator input_creator)
{
auto & target = dict[name].input_creator;

View File

@ -131,6 +131,11 @@ public:
const Block & sample,
ContextPtr context,
WriteCallback callback = {},
const std::optional<FormatSettings> & _format_settings = std::nullopt) const;
String getContentType(
const String & name,
ContextPtr context,
const std::optional<FormatSettings> & format_settings = std::nullopt) const;
void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine);

View File

@ -123,3 +123,7 @@ set_source_files_properties("pointInPolygon.cpp" PROPERTIES COMPILE_FLAGS -fno-s
# target_link_libraries(clickhouse_functions PRIVATE ${S2_LIBRARY})
target_include_directories(clickhouse_functions SYSTEM PUBLIC ${S2_GEOMETRY_INCLUDE_DIR})
if (ENABLE_FUZZING)
add_compile_definitions(FUZZING_MODE=1)
endif ()

View File

@ -116,7 +116,7 @@ namespace
template <>
struct WindowImpl<TUMBLE>
{
static constexpr auto name = "TUMBLE";
static constexpr auto name = "tumble";
[[maybe_unused]] static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name)
{
@ -213,7 +213,7 @@ struct WindowImpl<TUMBLE>
template <>
struct WindowImpl<TUMBLE_START>
{
static constexpr auto name = "TUMBLE_START";
static constexpr auto name = "tumbleStart";
static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name)
{
@ -238,12 +238,18 @@ struct WindowImpl<TUMBLE_START>
[[maybe_unused]] static ColumnPtr dispatchForColumns(const ColumnsWithTypeAndName & arguments, const String & function_name)
{
const auto which_type = WhichDataType(arguments[0].type);
const auto & time_column = arguments[0];
const auto which_type = WhichDataType(time_column.type);
ColumnPtr result_column;
if (which_type.isDateTime())
result_column= WindowImpl<TUMBLE>::dispatchForColumns(arguments, function_name);
if (arguments.size() == 1)
{
if (which_type.isUInt32())
return time_column.column;
else //isTuple
result_column = time_column.column;
}
else
result_column = arguments[0].column;
result_column = WindowImpl<TUMBLE>::dispatchForColumns(arguments, function_name);
return executeWindowBound(result_column, 0, function_name);
}
};
@ -251,7 +257,7 @@ struct WindowImpl<TUMBLE_START>
template <>
struct WindowImpl<TUMBLE_END>
{
static constexpr auto name = "TUMBLE_END";
static constexpr auto name = "tumbleEnd";
[[maybe_unused]] static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name)
{
@ -260,12 +266,18 @@ struct WindowImpl<TUMBLE_END>
[[maybe_unused]] static ColumnPtr dispatchForColumns(const ColumnsWithTypeAndName & arguments, const String& function_name)
{
const auto which_type = WhichDataType(arguments[0].type);
const auto & time_column = arguments[0];
const auto which_type = WhichDataType(time_column.type);
ColumnPtr result_column;
if (which_type.isDateTime())
result_column = WindowImpl<TUMBLE>::dispatchForColumns(arguments, function_name);
if (arguments.size() == 1)
{
if (which_type.isUInt32())
return time_column.column;
else //isTuple
result_column = time_column.column;
}
else
result_column = arguments[0].column;
result_column = WindowImpl<TUMBLE>::dispatchForColumns(arguments, function_name);
return executeWindowBound(result_column, 1, function_name);
}
};
@ -273,7 +285,7 @@ struct WindowImpl<TUMBLE_END>
template <>
struct WindowImpl<HOP>
{
static constexpr auto name = "HOP";
static constexpr auto name = "hop";
[[maybe_unused]] static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name)
{
@ -405,7 +417,7 @@ struct WindowImpl<HOP>
template <>
struct WindowImpl<WINDOW_ID>
{
static constexpr auto name = "WINDOW_ID";
static constexpr auto name = "windowID";
[[maybe_unused]] static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name)
{
@ -557,7 +569,7 @@ struct WindowImpl<WINDOW_ID>
template <>
struct WindowImpl<HOP_START>
{
static constexpr auto name = "HOP_START";
static constexpr auto name = "hopStart";
static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name)
{
@ -600,7 +612,7 @@ struct WindowImpl<HOP_START>
template <>
struct WindowImpl<HOP_END>
{
static constexpr auto name = "HOP_END";
static constexpr auto name = "hopEnd";
[[maybe_unused]] static DataTypePtr getReturnType(const ColumnsWithTypeAndName & arguments, const String & function_name)
{

View File

@ -9,25 +9,25 @@ namespace DB
/** Window functions:
*
* TUMBLE(time_attr, interval [, timezone])
* tumble(time_attr, interval [, timezone])
*
* TUMBLE_START(window_id)
* tumbleStart(window_id)
*
* TUMBLE_START(time_attr, interval [, timezone])
* tumbleStart(time_attr, interval [, timezone])
*
* TUMBLE_END(window_id)
* tumbleEnd(window_id)
*
* TUMBLE_END(time_attr, interval [, timezone])
* tumbleEnd(time_attr, interval [, timezone])
*
* HOP(time_attr, hop_interval, window_interval [, timezone])
* hop(time_attr, hop_interval, window_interval [, timezone])
*
* HOP_START(window_id)
* hopStart(window_id)
*
* HOP_START(time_attr, hop_interval, window_interval [, timezone])
* hopStart(time_attr, hop_interval, window_interval [, timezone])
*
* HOP_END(window_id)
* hopEnd(window_id)
*
* HOP_END(time_attr, hop_interval, window_interval [, timezone])
* hopEnd(time_attr, hop_interval, window_interval [, timezone])
*
*/
enum WindowFunctionName

View File

@ -0,0 +1,11 @@
#include <Functions/getFuzzerData.h>
namespace DB
{
void registerFunctionGetFuzzerData(FunctionFactory & factory)
{
factory.registerFunction<FunctionGetFuzzerData>();
}
}

View File

@ -0,0 +1,50 @@
#pragma once
#include <Functions/IFunction.h>
#include <Functions/FunctionFactory.h>
#include <DataTypes/DataTypeString.h>
#include <Core/Field.h>
namespace DB
{
class FunctionGetFuzzerData : public IFunction
{
inline static String fuzz_data;
public:
static constexpr auto name = "getFuzzerData";
inline static FunctionPtr create(ContextPtr) { return create(); }
static FunctionPtr create()
{
return std::make_shared<FunctionGetFuzzerData>();
}
inline String getName() const override { return name; }
inline size_t getNumberOfArguments() const override { return 0; }
DataTypePtr getReturnTypeImpl(const DataTypes & /*arguments*/) const override
{
return std::make_shared<DataTypeString>();
}
inline bool isDeterministic() const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
ColumnPtr executeImpl(const ColumnsWithTypeAndName &,
const DataTypePtr &,
size_t input_rows_count) const override
{
return DataTypeString().createColumnConst(input_rows_count, fuzz_data);
}
static void update(const String & fuzz_data_)
{
fuzz_data = fuzz_data_;
}
};
}

View File

@ -85,6 +85,10 @@ void registerFunctionGetOSKernelVersion(FunctionFactory &);
void registerFunctionConvertCharset(FunctionFactory &);
#endif
#ifdef FUZZING_MODE
void registerFunctionGetFuzzerData(FunctionFactory & factory);
#endif
void registerFunctionsMiscellaneous(FunctionFactory & factory)
{
registerFunctionCurrentDatabase(factory);
@ -166,6 +170,10 @@ void registerFunctionsMiscellaneous(FunctionFactory & factory)
#if USE_ICU
registerFunctionConvertCharset(factory);
#endif
#ifdef FUZZING_MODE
registerFunctionGetFuzzerData(factory);
#endif
}
}

View File

@ -7,7 +7,11 @@ namespace DB
{
WriteBufferFromHTTP::WriteBufferFromHTTP(
const Poco::URI & uri, const std::string & method, const ConnectionTimeouts & timeouts, size_t buffer_size_)
const Poco::URI & uri,
const std::string & method,
const std::string & content_type,
const ConnectionTimeouts & timeouts,
size_t buffer_size_)
: WriteBufferFromOStream(buffer_size_)
, session{makeHTTPSession(uri, timeouts)}
, request{method, uri.getPathAndQuery(), Poco::Net::HTTPRequest::HTTP_1_1}
@ -15,6 +19,11 @@ WriteBufferFromHTTP::WriteBufferFromHTTP(
request.setHost(uri.getHost());
request.setChunkedTransferEncoding(true);
if (!content_type.empty())
{
request.set("Content-Type", content_type);
}
LOG_TRACE((&Poco::Logger::get("WriteBufferToHTTP")), "Sending request to {}", uri.toString());
ostr = &session->sendRequest(request);

View File

@ -20,6 +20,7 @@ class WriteBufferFromHTTP : public WriteBufferFromOStream
public:
explicit WriteBufferFromHTTP(const Poco::URI & uri,
const std::string & method = Poco::Net::HTTPRequest::HTTP_POST, // POST or PUT only
const std::string & content_type = "",
const ConnectionTimeouts & timeouts = {},
size_t buffer_size_ = DBMS_DEFAULT_BUFFER_SIZE);

View File

@ -118,6 +118,7 @@ inline void writeStringBinary(const std::string_view & s, WriteBuffer & buf)
writeStringBinary(StringRef{s}, buf);
}
template <typename T>
void writeVectorBinary(const std::vector<T> & v, WriteBuffer & buf)
{

View File

@ -89,6 +89,13 @@ void ClientInfo::write(WriteBuffer & out, const UInt64 server_protocol_revision)
writeBinary(uint8_t(0), out);
}
}
if (server_protocol_revision >= DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS)
{
writeVarUInt(static_cast<UInt64>(collaborate_with_initiator), out);
writeVarUInt(count_participating_replicas, out);
writeVarUInt(number_of_current_replica, out);
}
}
@ -170,6 +177,15 @@ void ClientInfo::read(ReadBuffer & in, const UInt64 client_protocol_revision)
readBinary(client_trace_context.trace_flags, in);
}
}
if (client_protocol_revision >= DBMS_MIN_REVISION_WITH_PARALLEL_REPLICAS)
{
UInt64 value;
readVarUInt(value, in);
collaborate_with_initiator = static_cast<bool>(value);
readVarUInt(count_participating_replicas, in);
readVarUInt(number_of_current_replica, in);
}
}

View File

@ -108,6 +108,11 @@ public:
bool is_replicated_database_internal = false;
/// For parallel processing on replicas
bool collaborate_with_initiator{false};
UInt64 count_participating_replicas{0};
UInt64 number_of_current_replica{0};
bool empty() const { return query_kind == QueryKind::NO_QUERY; }
/** Serialization and deserialization.

View File

@ -184,6 +184,8 @@ public:
bool isLocal() const { return !local_addresses.empty(); }
bool hasRemoteConnections() const { return local_addresses.size() != per_replica_pools.size(); }
size_t getLocalNodeCount() const { return local_addresses.size(); }
size_t getRemoteNodeCount() const { return per_replica_pools.size() - local_addresses.size(); }
size_t getAllNodeCount() const { return per_replica_pools.size(); }
bool hasInternalReplication() const { return has_internal_replication; }
/// Name of directory for asynchronous write to StorageDistributed if has_internal_replication
const std::string & insertPathForInternalReplication(bool prefer_localhost_replica, bool use_compact_format) const;

View File

@ -37,7 +37,9 @@ public:
Block header;
size_t shard_num = 0;
size_t num_replicas = 0;
ConnectionPoolWithFailoverPtr pool;
ConnectionPoolPtrs per_replica_pools;
/// If we connect to replicas lazily.
/// (When there is a local replica with big delay).

View File

@ -117,7 +117,9 @@ void SelectStreamFactory::createForShard(
.query = modified_query_ast,
.header = header,
.shard_num = shard_info.shard_num,
.num_replicas = shard_info.getAllNodeCount(),
.pool = shard_info.pool,
.per_replica_pools = shard_info.per_replica_pools,
.lazy = lazy,
.local_delay = local_delay,
});

View File

@ -2962,7 +2962,7 @@ PartUUIDsPtr Context::getPartUUIDs() const
ReadTaskCallback Context::getReadTaskCallback() const
{
if (!next_task_callback.has_value())
throw Exception(fmt::format("Next task callback is not set for query {}", getInitialQueryId()), ErrorCodes::LOGICAL_ERROR);
throw Exception(ErrorCodes::LOGICAL_ERROR, "Next task callback is not set for query {}", getInitialQueryId());
return next_task_callback.value();
}
@ -2972,6 +2972,20 @@ void Context::setReadTaskCallback(ReadTaskCallback && callback)
next_task_callback = callback;
}
MergeTreeReadTaskCallback Context::getMergeTreeReadTaskCallback() const
{
if (!merge_tree_read_task_callback.has_value())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Next task callback for is not set for query {}", getInitialQueryId());
return merge_tree_read_task_callback.value();
}
void Context::setMergeTreeReadTaskCallback(MergeTreeReadTaskCallback && callback)
{
merge_tree_read_task_callback = callback;
}
PartUUIDsPtr Context::getIgnoredPartUUIDs() const
{
auto lock = getLock();

View File

@ -14,6 +14,7 @@
#include <Common/RemoteHostFilter.h>
#include <Common/isLocalAddress.h>
#include <base/types.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include "config_core.h"
@ -148,6 +149,8 @@ using InputBlocksReader = std::function<Block(ContextPtr)>;
/// Used in distributed task processing
using ReadTaskCallback = std::function<String()>;
using MergeTreeReadTaskCallback = std::function<std::optional<PartitionReadResponse>(PartitionReadRequest)>;
/// An empty interface for an arbitrary object that may be attached by a shared pointer
/// to query context, when using ClickHouse as a library.
struct IHostContext
@ -216,8 +219,12 @@ private:
Scalars scalars;
Scalars local_scalars;
/// Fields for distributed s3 function
/// Used in s3Cluster table function. With this callback, a worker node could ask an initiator
/// about next file to read from s3.
std::optional<ReadTaskCallback> next_task_callback;
/// Used in parallel reading from replicas. A replica tells about its intentions to read
/// some ranges from some part and initiator will tell the replica about whether it is accepted or denied.
std::optional<MergeTreeReadTaskCallback> merge_tree_read_task_callback;
/// Record entities accessed by current query, and store this information in system.query_log.
struct QueryAccessInfo
@ -865,6 +872,9 @@ public:
ReadTaskCallback getReadTaskCallback() const;
void setReadTaskCallback(ReadTaskCallback && callback);
MergeTreeReadTaskCallback getMergeTreeReadTaskCallback() const;
void setMergeTreeReadTaskCallback(MergeTreeReadTaskCallback && callback);
/// Background executors related methods
void initializeBackgroundExecutorsIfNeeded();

View File

@ -95,14 +95,16 @@ QualifiedTableName ExternalDictionariesLoader::qualifyDictionaryNameWithDatabase
return qualified_dictionary_name;
}
if (qualified_name->database.empty() && has(dictionary_name))
/// If dictionary was not qualified with database name, try to resolve dictionary as xml dictionary.
if (qualified_name->database.empty() && !has(qualified_name->table))
{
/// This is xml dictionary
return *qualified_name;
}
std::string current_database_name = query_context->getCurrentDatabase();
std::string resolved_name = resolveDictionaryNameFromDatabaseCatalog(dictionary_name, current_database_name);
if (qualified_name->database.empty())
qualified_name->database = query_context->getCurrentDatabase();
/// If after qualify dictionary_name with default_database_name we find it, add default_database to qualified name.
if (has(resolved_name))
qualified_name->database = std::move(current_database_name);
}
return *qualified_name;
}

View File

@ -981,9 +981,10 @@ BlockIO InterpreterCreateQuery::createTable(ASTCreateQuery & create)
return {};
/// If table has dependencies - add them to the graph
TableNamesSet loading_dependencies = getDependenciesSetFromCreateQuery(getContext()->getGlobalContext(), query_ptr);
QualifiedTableName qualified_name{database_name, create.getTable()};
TableNamesSet loading_dependencies = getDependenciesSetFromCreateQuery(getContext()->getGlobalContext(), qualified_name, query_ptr);
if (!loading_dependencies.empty())
DatabaseCatalog::instance().addLoadingDependencies(QualifiedTableName{database_name, create.getTable()}, std::move(loading_dependencies));
DatabaseCatalog::instance().addLoadingDependencies(std::move(qualified_name), std::move(loading_dependencies));
return fillTableIfNeeded(create);
}

View File

@ -231,8 +231,8 @@ bool isStorageTouchedByMutations(
PullingPipelineExecutor executor(io.pipeline);
Block block;
while (!block.rows())
executor.pull(block);
while (executor.pull(block)) {}
if (!block.rows())
return false;
else if (block.rows() != 1)
@ -575,7 +575,10 @@ ASTPtr MutationsInterpreter::prepare(bool dry_run)
ErrorCodes::BAD_ARGUMENTS,
"Cannot materialize column `{}` because it doesn't have default expression", column.name);
stages.back().column_to_updated.emplace(column.name, column.default_desc.expression->clone());
auto materialized_column = makeASTFunction(
"_CAST", column.default_desc.expression->clone(), std::make_shared<ASTLiteral>(column.type->getName()));
stages.back().column_to_updated.emplace(column.name, materialized_column);
}
else if (command.type == MutationCommand::MATERIALIZE_INDEX)
{

View File

@ -465,9 +465,13 @@ void removeUnneededColumnsFromSelectClause(const ASTSelectQuery * select_query,
ASTFunction * func = elem->as<ASTFunction>();
/// Never remove untuple. It's result column may be in required columns.
/// It is not easy to analyze untuple here, because types were not calculated yes.
/// It is not easy to analyze untuple here, because types were not calculated yet.
if (func && func->name == "untuple")
new_elements.push_back(elem);
/// removing aggregation can change number of rows, so `count()` result in outer sub-query would be wrong
if (func && AggregateFunctionFactory::instance().isAggregateFunctionName(func->name) && !select_query->groupBy())
new_elements.push_back(elem);
}
}

View File

@ -1,6 +1,7 @@
#include "UserDefinedSQLFunctionVisitor.h"
#include <unordered_map>
#include <unordered_set>
#include <stack>
#include <Parsers/ASTFunction.h>
@ -18,19 +19,16 @@ namespace ErrorCodes
extern const int UNSUPPORTED_METHOD;
}
void UserDefinedSQLFunctionMatcher::visit(ASTPtr & ast, Data & data)
void UserDefinedSQLFunctionMatcher::visit(ASTPtr & ast, Data &)
{
auto * function = ast->as<ASTFunction>();
if (!function)
return;
auto result = tryToReplaceFunction(*function);
if (result)
{
ast = result;
visit(ast, data);
}
std::unordered_set<std::string> udf_in_replace_process;
auto replace_result = tryToReplaceFunction(*function, udf_in_replace_process);
if (replace_result)
ast = replace_result;
}
bool UserDefinedSQLFunctionMatcher::needChildVisit(const ASTPtr &, const ASTPtr &)
@ -38,8 +36,13 @@ bool UserDefinedSQLFunctionMatcher::needChildVisit(const ASTPtr &, const ASTPtr
return true;
}
ASTPtr UserDefinedSQLFunctionMatcher::tryToReplaceFunction(const ASTFunction & function)
ASTPtr UserDefinedSQLFunctionMatcher::tryToReplaceFunction(const ASTFunction & function, std::unordered_set<std::string> & udf_in_replace_process)
{
if (udf_in_replace_process.find(function.name) != udf_in_replace_process.end())
throw Exception(ErrorCodes::UNSUPPORTED_METHOD,
"Recursive function call detected during function call {}",
function.name);
auto user_defined_function = UserDefinedSQLFunctionFactory::instance().tryGet(function.name);
if (!user_defined_function)
return nullptr;
@ -71,10 +74,15 @@ ASTPtr UserDefinedSQLFunctionMatcher::tryToReplaceFunction(const ASTFunction & f
identifier_name_to_function_argument.emplace(identifier_name, function_argument);
}
auto [it, _] = udf_in_replace_process.emplace(function.name);
auto function_body_to_update = function_core_expression->children.at(1)->clone();
auto expression_list = std::make_shared<ASTExpressionList>();
expression_list->children.emplace_back(std::move(function_body_to_update));
std::stack<ASTPtr> ast_nodes_to_update;
ast_nodes_to_update.push(function_body_to_update);
ast_nodes_to_update.push(expression_list);
while (!ast_nodes_to_update.empty())
{
@ -83,6 +91,13 @@ ASTPtr UserDefinedSQLFunctionMatcher::tryToReplaceFunction(const ASTFunction & f
for (auto & child : ast_node_to_update->children)
{
if (auto * inner_function = child->as<ASTFunction>())
{
auto replace_result = tryToReplaceFunction(*inner_function, udf_in_replace_process);
if (replace_result)
child = replace_result;
}
auto identifier_name_opt = tryGetIdentifierName(child);
if (identifier_name_opt)
{
@ -104,6 +119,10 @@ ASTPtr UserDefinedSQLFunctionMatcher::tryToReplaceFunction(const ASTFunction & f
}
}
udf_in_replace_process.erase(it);
function_body_to_update = expression_list->children[0];
auto function_alias = function.tryGetAlias();
if (!function_alias.empty())

View File

@ -34,7 +34,7 @@ public:
private:
static void visit(ASTFunction & func, const Data & data);
static ASTPtr tryToReplaceFunction(const ASTFunction & function);
static ASTPtr tryToReplaceFunction(const ASTFunction & function, std::unordered_set<std::string> & udf_in_replace_process);
};

View File

@ -632,7 +632,13 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
}
{
OpenTelemetrySpanHolder span("IInterpreter::execute()");
std::unique_ptr<OpenTelemetrySpanHolder> span;
if (context->query_trace_context.trace_id != UUID())
{
auto raw_interpreter_ptr = interpreter.get();
std::string class_name(abi::__cxa_demangle(typeid(*raw_interpreter_ptr).name(), nullptr, nullptr, nullptr));
span = std::make_unique<OpenTelemetrySpanHolder>(class_name + "::execute()");
}
res = interpreter->execute();
}

View File

@ -22,7 +22,6 @@
#include <Parsers/ASTSubquery.h>
#include <Parsers/ASTTTLElement.h>
#include <Parsers/ASTWindowDefinition.h>
#include <Parsers/IAST.h>
#include <Parsers/ASTAssignment.h>
#include <Parsers/parseIdentifierOrStringLiteral.h>
@ -35,7 +34,6 @@
#include <Parsers/ParserCreateQuery.h>
#include <Parsers/queryToString.h>
#include <boost/algorithm/string.hpp>
#include "ASTColumnsMatcher.h"
#include <Interpreters/StorageID.h>
@ -1935,15 +1933,21 @@ bool ParserColumnsTransformers::parseImpl(Pos & pos, ASTPtr & node, Expected & e
{
if (const auto * func = lambda->as<ASTFunction>(); func && func->name == "lambda")
{
if (func->arguments->children.size() != 2)
throw Exception(ErrorCodes::SYNTAX_ERROR, "lambda requires two arguments");
const auto * lambda_args_tuple = func->arguments->children.at(0)->as<ASTFunction>();
if (!lambda_args_tuple || lambda_args_tuple->name != "tuple")
throw Exception(ErrorCodes::SYNTAX_ERROR, "First argument of lambda must be a tuple");
const ASTs & lambda_arg_asts = lambda_args_tuple->arguments->children;
if (lambda_arg_asts.size() != 1)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "APPLY column transformer can only accept lambda with one argument");
throw Exception(ErrorCodes::SYNTAX_ERROR, "APPLY column transformer can only accept lambda with one argument");
if (auto opt_arg_name = tryGetIdentifierName(lambda_arg_asts[0]); opt_arg_name)
lambda_arg = *opt_arg_name;
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "lambda argument declarations must be identifiers");
throw Exception(ErrorCodes::SYNTAX_ERROR, "lambda argument declarations must be identifiers");
}
else
{

View File

@ -74,7 +74,8 @@ ReadFromMergeTree::ReadFromMergeTree(
bool sample_factor_column_queried_,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
Poco::Logger * log_,
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_)
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading)
: ISourceStep(DataStream{.header = MergeTreeBaseSelectProcessor::transformHeader(
metadata_snapshot_->getSampleBlockForColumns(real_column_names_, data_.getVirtuals(), data_.getStorageID()),
getPrewhereInfo(query_info_),
@ -107,6 +108,9 @@ ReadFromMergeTree::ReadFromMergeTree(
auto type = std::make_shared<DataTypeFloat64>();
output_stream->header.insert({type->createColumn(), type, "_sample_factor"});
}
if (enable_parallel_reading)
read_task_callback = context->getMergeTreeReadTaskCallback();
}
Pipe ReadFromMergeTree::readFromPool(
@ -127,6 +131,7 @@ Pipe ReadFromMergeTree::readFromPool(
}
const auto & settings = context->getSettingsRef();
const auto & client_info = context->getClientInfo();
MergeTreeReadPool::BackoffSettings backoff_settings(settings);
auto pool = std::make_shared<MergeTreeReadPool>(
@ -147,17 +152,30 @@ Pipe ReadFromMergeTree::readFromPool(
for (size_t i = 0; i < max_streams; ++i)
{
std::optional<ParallelReadingExtension> extension;
if (read_task_callback)
{
extension = ParallelReadingExtension
{
.callback = read_task_callback.value(),
.count_participating_replicas = client_info.count_participating_replicas,
.number_of_current_replica = client_info.number_of_current_replica,
.colums_to_read = required_columns
};
}
auto source = std::make_shared<MergeTreeThreadSelectProcessor>(
i, pool, min_marks_for_concurrent_read, max_block_size,
settings.preferred_block_size_bytes, settings.preferred_max_column_in_block_size_bytes,
data, metadata_snapshot, use_uncompressed_cache,
prewhere_info, actions_settings, reader_settings, virt_column_names);
prewhere_info, actions_settings, reader_settings, virt_column_names, std::move(extension));
if (i == 0)
{
/// Set the approximate number of rows for the first source only
/// Set the approximate number of rows for the first source only
/// In case of parallel processing on replicas do not set approximate rows at all.
/// Because the value will be identical on every replicas and will be accounted
/// multiple times (settings.max_parallel_replicas times more)
if (i == 0 && !client_info.collaborate_with_initiator)
source->addTotalRowsApprox(total_rows);
}
pipes.emplace_back(std::move(source));
}
@ -172,10 +190,22 @@ ProcessorPtr ReadFromMergeTree::createSource(
bool use_uncompressed_cache,
bool has_limit_below_one_block)
{
const auto & client_info = context->getClientInfo();
std::optional<ParallelReadingExtension> extension;
if (read_task_callback)
{
extension = ParallelReadingExtension
{
.callback = read_task_callback.value(),
.count_participating_replicas = client_info.count_participating_replicas,
.number_of_current_replica = client_info.number_of_current_replica,
.colums_to_read = required_columns
};
}
return std::make_shared<TSource>(
data, metadata_snapshot, part.data_part, max_block_size, preferred_block_size_bytes,
preferred_max_column_in_block_size_bytes, required_columns, part.ranges, use_uncompressed_cache, prewhere_info,
actions_settings, reader_settings, virt_column_names, part.part_index_in_query, has_limit_below_one_block);
actions_settings, reader_settings, virt_column_names, part.part_index_in_query, has_limit_below_one_block, std::move(extension));
}
Pipe ReadFromMergeTree::readInOrder(

View File

@ -97,7 +97,8 @@ public:
bool sample_factor_column_queried_,
std::shared_ptr<PartitionIdToMaxBlock> max_block_numbers_to_read_,
Poco::Logger * log_,
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr_,
bool enable_parallel_reading
);
String getName() const override { return "ReadFromMergeTree"; }
@ -184,6 +185,8 @@ private:
MergeTreeDataSelectAnalysisResultPtr selectRangesToRead(MergeTreeData::DataPartsVector parts) const;
ReadFromMergeTree::AnalysisResult getAnalysisResult() const;
MergeTreeDataSelectAnalysisResultPtr analyzed_result_ptr;
std::optional<MergeTreeReadTaskCallback> read_task_callback;
};
struct MergeTreeDataSelectAnalysisResult

View File

@ -12,6 +12,8 @@
#include <Interpreters/InterpreterSelectQuery.h>
#include <IO/ConnectionTimeoutsContext.h>
#include <Common/checkStackSize.h>
#include <Client/ConnectionPool.h>
#include <Client/ConnectionPoolWithFailover.h>
namespace DB
{
@ -112,7 +114,10 @@ ReadFromRemote::ReadFromRemote(
{
}
void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard)
void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator,
std::shared_ptr<ConnectionPoolWithFailover> pool,
std::optional<IConnections::ReplicaInfo> replica_info)
{
bool add_agg_info = stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
@ -125,7 +130,10 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
}
auto lazily_create_stream = [
pool = shard.pool, shard_num = shard.shard_num, shard_count = shard_count, query = shard.query, header = shard.header,
replica_info = replica_info,
pool = pool ? pool : shard.pool,
coordinator = coordinator,
shard_num = shard.shard_num, shard_count = shard_count, query = shard.query, header = shard.header,
context = context, throttler = throttler,
main_table = main_table, table_func_ptr = table_func_ptr,
scalars = scalars, external_tables = external_tables,
@ -161,9 +169,12 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
max_remote_delay = std::max(try_result.staleness, max_remote_delay);
}
if (try_results.empty() || local_delay < max_remote_delay)
/// We disable this branch in case of parallel reading from replicas, because createLocalPlan will call
/// InterpreterSelectQuery directly and it will be too ugly to pass ParallelReplicasCoordinator or some callback there.
if (!context->getClientInfo().collaborate_with_initiator && (try_results.empty() || local_delay < max_remote_delay))
{
auto plan = createLocalPlan(query, header, context, stage, shard_num, shard_count);
return QueryPipelineBuilder::getPipe(std::move(*plan->buildQueryPipeline(
QueryPlanOptimizationSettings::fromContext(context),
BuildQueryPipelineSettings::fromContext(context))));
@ -180,7 +191,8 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
pool, std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage);
pool, std::move(connections), query_string, header, context, throttler, scalars, external_tables, stage,
RemoteQueryExecutor::Extension{.parallel_reading_coordinator = std::move(coordinator), .replica_info = replica_info});
return createRemoteSourcePipe(remote_query_executor, add_agg_info, add_totals, add_extremes, async_read);
}
@ -191,7 +203,10 @@ void ReadFromRemote::addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFacto
addConvertingActions(pipes.back(), output_stream->header);
}
void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard)
void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator,
std::shared_ptr<ConnectionPoolWithFailover> pool,
std::optional<IConnections::ReplicaInfo> replica_info)
{
bool add_agg_info = stage == QueryProcessingStage::WithMergeableState;
bool add_totals = false;
@ -207,11 +222,20 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::
scalars["_shard_num"]
= Block{{DataTypeUInt32().createColumnConst(1, shard.shard_num), std::make_shared<DataTypeUInt32>(), "_shard_num"}};
auto remote_query_executor = std::make_shared<RemoteQueryExecutor>(
shard.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage);
std::shared_ptr<RemoteQueryExecutor> remote_query_executor;
remote_query_executor = std::make_shared<RemoteQueryExecutor>(
pool ? pool : shard.pool, query_string, shard.header, context, throttler, scalars, external_tables, stage,
RemoteQueryExecutor::Extension{.parallel_reading_coordinator = std::move(coordinator), .replica_info = std::move(replica_info)});
remote_query_executor->setLogger(log);
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
/// In case of parallel reading from replicas we have a connection pool per replica.
/// Setting PoolMode will make no sense.
if (!pool)
remote_query_executor->setPoolMode(PoolMode::GET_MANY);
if (!table_func_ptr)
remote_query_executor->setMainTable(main_table);
@ -223,12 +247,51 @@ void ReadFromRemote::addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::
void ReadFromRemote::initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &)
{
Pipes pipes;
for (const auto & shard : shards)
const auto & settings = context->getSettingsRef();
const bool enable_sample_offset_parallel_processing = settings.max_parallel_replicas > 1 && !settings.allow_experimental_parallel_reading_from_replicas;
/// We have to create a pipe for each replica
/// FIXME: The second condition is only for tests to work, because hedged connections enabled by default.
if (settings.max_parallel_replicas > 1 && !enable_sample_offset_parallel_processing && !context->getSettingsRef().use_hedged_requests)
{
if (shard.lazy)
addLazyPipe(pipes, shard);
else
addPipe(pipes, shard);
const Settings & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
for (const auto & shard : shards)
{
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>();
for (size_t replica_num = 0; replica_num < shard.num_replicas; ++replica_num)
{
IConnections::ReplicaInfo replica_info
{
.all_replicas_count = shard.num_replicas,
.number_of_current_replica = replica_num
};
auto pool = shard.per_replica_pools[replica_num];
auto pool_with_failover = std::make_shared<ConnectionPoolWithFailover>(
ConnectionPoolPtrs{pool}, current_settings.load_balancing);
if (shard.lazy)
addLazyPipe(pipes, shard, coordinator, pool_with_failover, replica_info);
else
addPipe(pipes, shard, coordinator, pool_with_failover, replica_info);
}
}
}
else
{
for (const auto & shard : shards)
{
auto coordinator = std::make_shared<ParallelReplicasReadingCoordinator>();
if (shard.lazy)
addLazyPipe(pipes, shard, /*coordinator=*/nullptr, /*pool*/{}, /*replica_info*/std::nullopt);
else
addPipe(pipes, shard, /*coordinator=*/nullptr, /*pool*/{}, /*replica_info*/std::nullopt);
}
}
auto pipe = Pipe::unitePipes(std::move(pipes));

View File

@ -1,9 +1,11 @@
#pragma once
#include <Processors/QueryPlan/ISourceStep.h>
#include <Core/QueryProcessingStage.h>
#include <Client/IConnections.h>
#include <Storages/IStorage_fwd.h>
#include <Interpreters/StorageID.h>
#include <Interpreters/ClusterProxy/IStreamFactory.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
namespace DB
{
@ -37,6 +39,12 @@ public:
void initializePipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
private:
enum class Mode
{
PerReplica,
PerShard
};
ClusterProxy::IStreamFactory::Shards shards;
QueryProcessingStage::Enum stage;
@ -52,8 +60,16 @@ private:
Poco::Logger * log;
UInt32 shard_count;
void addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard);
void addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard);
void addLazyPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator,
std::shared_ptr<ConnectionPoolWithFailover> pool,
std::optional<IConnections::ReplicaInfo> replica_info);
void addPipe(Pipes & pipes, const ClusterProxy::IStreamFactory::Shard & shard,
std::shared_ptr<ParallelReplicasReadingCoordinator> coordinator,
std::shared_ptr<ConnectionPoolWithFailover> pool,
std::optional<IConnections::ReplicaInfo> replica_info);
void addPipeForReplica();
};
}

View File

@ -403,12 +403,22 @@ IProcessor::Status StrictResizeProcessor::prepare(const PortNumbers & updated_in
/// Close all other waiting for data outputs (there is no corresponding input for them).
while (!waiting_outputs.empty())
{
auto & output = output_ports[waiting_outputs.front()];
waiting_outputs.pop();
auto & output = output_ports[waiting_outputs.front()];
waiting_outputs.pop();
output.status = OutputStatus::Finished;
output.port->finish();
++num_finished_outputs;
if (output.status != OutputStatus::Finished)
++num_finished_outputs;
output.status = OutputStatus::Finished;
output.port->finish();
}
if (num_finished_outputs == outputs.size())
{
for (auto & input : inputs)
input.close();
return Status::Finished;
}
if (disabled_input_ports.empty())
@ -418,4 +428,3 @@ IProcessor::Status StrictResizeProcessor::prepare(const PortNumbers & updated_in
}
}

View File

@ -7,6 +7,7 @@
#include <Columns/ColumnConst.h>
#include <Common/CurrentThread.h>
#include "Core/Protocol.h"
#include "IO/ReadHelpers.h"
#include <QueryPipeline/Pipe.h>
#include <Processors/Sources/SourceFromSingleChunk.h>
#include <Processors/Transforms/LimitsCheckingTransform.h>
@ -20,6 +21,7 @@
#include <Client/MultiplexedConnections.h>
#include <Client/HedgedConnections.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
#include <IO/ReadBufferFromString.h>
namespace CurrentMetrics
@ -42,21 +44,26 @@ namespace ErrorCodes
RemoteQueryExecutor::RemoteQueryExecutor(
const String & query_, const Block & header_, ContextPtr context_,
const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::shared_ptr<TaskIterator> task_iterator_)
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_), scalars(scalars_)
, external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_)
, external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
{}
RemoteQueryExecutor::RemoteQueryExecutor(
Connection & connection,
const String & query_, const Block & header_, ContextPtr context_,
ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::shared_ptr<TaskIterator> task_iterator_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, task_iterator_)
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
{
create_connections = [this, &connection, throttler]()
create_connections = [this, &connection, throttler, extension_]()
{
return std::make_shared<MultiplexedConnections>(connection, context->getSettingsRef(), throttler);
auto res = std::make_shared<MultiplexedConnections>(connection, context->getSettingsRef(), throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
};
}
@ -64,12 +71,15 @@ RemoteQueryExecutor::RemoteQueryExecutor(
std::shared_ptr<Connection> connection_ptr,
const String & query_, const Block & header_, ContextPtr context_,
ThrottlerPtr throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::shared_ptr<TaskIterator> task_iterator_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, task_iterator_)
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: RemoteQueryExecutor(query_, header_, context_, scalars_, external_tables_, stage_, extension_)
{
create_connections = [this, connection_ptr, throttler]()
create_connections = [this, connection_ptr, throttler, extension_]()
{
return std::make_shared<MultiplexedConnections>(connection_ptr, context->getSettingsRef(), throttler);
auto res = std::make_shared<MultiplexedConnections>(connection_ptr, context->getSettingsRef(), throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
};
}
@ -78,12 +88,18 @@ RemoteQueryExecutor::RemoteQueryExecutor(
std::vector<IConnectionPool::Entry> && connections_,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::shared_ptr<TaskIterator> task_iterator_)
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_), pool(pool_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
, pool(pool_)
{
create_connections = [this, connections_, throttler]() mutable {
return std::make_shared<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
create_connections = [this, connections_, throttler, extension_]() mutable {
auto res = std::make_shared<MultiplexedConnections>(std::move(connections_), context->getSettingsRef(), throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
};
}
@ -91,11 +107,14 @@ RemoteQueryExecutor::RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool_,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler, const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::shared_ptr<TaskIterator> task_iterator_)
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_)
: header(header_), query(query_), context(context_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_), task_iterator(task_iterator_), pool(pool_)
, scalars(scalars_), external_tables(external_tables_), stage(stage_)
, task_iterator(extension_ ? extension_->task_iterator : nullptr)
, parallel_reading_coordinator(extension_ ? extension_->parallel_reading_coordinator : nullptr)
, pool(pool_)
{
create_connections = [this, throttler]()->std::shared_ptr<IConnections>
create_connections = [this, throttler, extension_]()->std::shared_ptr<IConnections>
{
const Settings & current_settings = context->getSettingsRef();
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(current_settings);
@ -107,7 +126,10 @@ RemoteQueryExecutor::RemoteQueryExecutor(
if (main_table)
table_to_check = std::make_shared<QualifiedTableName>(main_table.getQualifiedName());
return std::make_shared<HedgedConnections>(pool, context, timeouts, throttler, pool_mode, table_to_check);
auto res = std::make_shared<HedgedConnections>(pool, context, timeouts, throttler, pool_mode, table_to_check);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
}
#endif
@ -122,7 +144,10 @@ RemoteQueryExecutor::RemoteQueryExecutor(
else
connection_entries = pool->getMany(timeouts, &current_settings, pool_mode);
return std::make_shared<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler);
auto res = std::make_shared<MultiplexedConnections>(std::move(connection_entries), current_settings, throttler);
if (extension_ && extension_->replica_info)
res->setReplicaInfo(*extension_->replica_info);
return res;
};
}
@ -344,6 +369,9 @@ std::optional<Block> RemoteQueryExecutor::processPacket(Packet packet)
{
switch (packet.type)
{
case Protocol::Server::MergeTreeReadTaskRequest:
processMergeTreeReadTaskRequest(packet.request);
break;
case Protocol::Server::ReadTaskRequest:
processReadTaskRequest();
break;
@ -440,6 +468,15 @@ void RemoteQueryExecutor::processReadTaskRequest()
connections->sendReadTaskResponse(response);
}
void RemoteQueryExecutor::processMergeTreeReadTaskRequest(PartitionReadRequest request)
{
if (!parallel_reading_coordinator)
throw Exception("Coordinator for parallel reading from replicas is not initialized", ErrorCodes::LOGICAL_ERROR);
auto response = parallel_reading_coordinator->handleRequest(std::move(request));
connections->sendMergeTreeReadTaskResponse(response);
}
void RemoteQueryExecutor::finish(std::unique_ptr<ReadContext> * read_context)
{
/** If one of:

View File

@ -1,5 +1,7 @@
#pragma once
#include <variant>
#include <Client/ConnectionPool.h>
#include <Client/IConnections.h>
#include <Client/ConnectionPoolWithFailover.h>
@ -7,7 +9,7 @@
#include <Interpreters/Context.h>
#include <Interpreters/StorageID.h>
#include <Common/TimerDescriptor.h>
#include <variant>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
namespace DB
@ -35,20 +37,33 @@ class RemoteQueryExecutor
public:
using ReadContext = RemoteQueryExecutorReadContext;
/// We can provide additional logic for RemoteQueryExecutor
/// For example for s3Cluster table function we provide an Iterator over tasks to do.
/// Nodes involved into the query send request for a new task and we answer them using this object.
/// In case of parallel reading from replicas we provide a Coordinator object
/// Every replica will tell us about parts and mark ranges it wants to read and coordinator will
/// decide whether to deny or to accept that request.
struct Extension
{
std::shared_ptr<TaskIterator> task_iterator{nullptr};
std::shared_ptr<ParallelReplicasReadingCoordinator> parallel_reading_coordinator;
std::optional<IConnections::ReplicaInfo> replica_info;
};
/// Takes already set connection.
/// We don't own connection, thus we have to drain it synchronously.
RemoteQueryExecutor(
Connection & connection,
const String & query_, const Block & header_, ContextPtr context_,
ThrottlerPtr throttler_ = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::shared_ptr<TaskIterator> task_iterator_ = {});
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<Extension> extension_ = std::nullopt);
/// Takes already set connection.
RemoteQueryExecutor(
std::shared_ptr<Connection> connection,
const String & query_, const Block & header_, ContextPtr context_,
ThrottlerPtr throttler_ = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::shared_ptr<TaskIterator> task_iterator_ = {});
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<Extension> extension_ = std::nullopt);
/// Accepts several connections already taken from pool.
RemoteQueryExecutor(
@ -56,14 +71,14 @@ public:
std::vector<IConnectionPool::Entry> && connections_,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::shared_ptr<TaskIterator> task_iterator_ = {});
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<Extension> extension_ = std::nullopt);
/// Takes a pool and gets one or several connections from it.
RemoteQueryExecutor(
const ConnectionPoolWithFailoverPtr & pool,
const String & query_, const Block & header_, ContextPtr context_,
const ThrottlerPtr & throttler = nullptr, const Scalars & scalars_ = Scalars(), const Tables & external_tables_ = Tables(),
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::shared_ptr<TaskIterator> task_iterator_ = {});
QueryProcessingStage::Enum stage_ = QueryProcessingStage::Complete, std::optional<Extension> extension_ = std::nullopt);
~RemoteQueryExecutor();
@ -115,7 +130,7 @@ private:
RemoteQueryExecutor(
const String & query_, const Block & header_, ContextPtr context_,
const Scalars & scalars_, const Tables & external_tables_,
QueryProcessingStage::Enum stage_, std::shared_ptr<TaskIterator> task_iterator_);
QueryProcessingStage::Enum stage_, std::optional<Extension> extension_);
Block header;
Block totals;
@ -136,6 +151,13 @@ private:
/// Initiator identifier for distributed task processing
std::shared_ptr<TaskIterator> task_iterator;
std::shared_ptr<ParallelReplicasReadingCoordinator> parallel_reading_coordinator;
/// This is needed only for parallel reading from replicas, because
/// we create a RemoteQueryExecutor per replica and have to store additional info
/// about the number of the current replica or the count of replicas at all.
IConnections::ReplicaInfo replica_info;
std::function<std::shared_ptr<IConnections>()> create_connections;
/// Hold a shared reference to the connection pool so that asynchronous connection draining will
/// work safely. Make sure it's the first member so that we don't destruct it too early.
@ -203,6 +225,8 @@ private:
void processReadTaskRequest();
void processMergeTreeReadTaskRequest(PartitionReadRequest request);
/// Cancell query and restart it with info about duplicated UUIDs
/// only for `allow_experimental_query_deduplication`.
std::variant<Block, int> restartQueryWithoutDuplicatedUUIDs(std::unique_ptr<ReadContext> * read_context = nullptr);

View File

@ -310,10 +310,25 @@ void TCPHandler::runImpl()
query_context->setReadTaskCallback([this]() -> String
{
std::lock_guard lock(task_callback_mutex);
if (state.is_cancelled)
return {};
sendReadTaskRequestAssumeLocked();
return receiveReadTaskResponseAssumeLocked();
});
query_context->setMergeTreeReadTaskCallback([this](PartitionReadRequest request) -> std::optional<PartitionReadResponse>
{
std::lock_guard lock(task_callback_mutex);
if (state.is_cancelled)
return std::nullopt;
sendMergeTreeReadTaskRequstAssumeLocked(std::move(request));
return receivePartitionMergeTreeReadTaskResponseAssumeLocked();
});
/// Processing Query
state.io = executeQuery(state.query, query_context, false, state.stage);
@ -663,10 +678,13 @@ void TCPHandler::processOrdinaryQueryWithProcessors()
Block block;
while (executor.pull(block, interactive_delay / 1000))
{
std::lock_guard lock(task_callback_mutex);
std::unique_lock lock(task_callback_mutex);
if (isQueryCancelled())
{
/// Several callback like callback for parallel reading could be called from inside the pipeline
/// and we have to unlock the mutex from our side to prevent deadlock.
lock.unlock();
/// A packet was received requesting to stop execution of the request.
executor.cancel();
break;
@ -786,6 +804,15 @@ void TCPHandler::sendReadTaskRequestAssumeLocked()
out->next();
}
void TCPHandler::sendMergeTreeReadTaskRequstAssumeLocked(PartitionReadRequest request)
{
writeVarUInt(Protocol::Server::MergeTreeReadTaskRequest, *out);
request.serialize(*out);
out->next();
}
void TCPHandler::sendProfileInfo(const ProfileInfo & info)
{
writeVarUInt(Protocol::Server::ProfileInfo, *out);
@ -1297,6 +1324,35 @@ String TCPHandler::receiveReadTaskResponseAssumeLocked()
}
std::optional<PartitionReadResponse> TCPHandler::receivePartitionMergeTreeReadTaskResponseAssumeLocked()
{
UInt64 packet_type = 0;
readVarUInt(packet_type, *in);
if (packet_type != Protocol::Client::MergeTreeReadTaskResponse)
{
if (packet_type == Protocol::Client::Cancel)
{
state.is_cancelled = true;
/// For testing connection collector.
if (sleep_in_receive_cancel.totalMilliseconds())
{
std::chrono::milliseconds ms(sleep_in_receive_cancel.totalMilliseconds());
std::this_thread::sleep_for(ms);
}
return std::nullopt;
}
else
{
throw Exception(fmt::format("Received {} packet after requesting read task",
Protocol::Client::toString(packet_type)), ErrorCodes::UNEXPECTED_PACKET_FROM_CLIENT);
}
}
PartitionReadResponse response;
response.deserialize(*in);
return response;
}
void TCPHandler::receiveClusterNameAndSalt()
{
readStringBinary(cluster, *in);
@ -1697,7 +1753,7 @@ bool TCPHandler::isQueryCancelled()
return true;
default:
throw NetException("Unknown packet from client", ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
throw NetException("Unknown packet from client " + toString(packet_type), ErrorCodes::UNKNOWN_PACKET_FROM_CLIENT);
}
}

Some files were not shown because too many files have changed in this diff Show More