Merge branch 'master' of github.com:ClickHouse/ClickHouse into poco-file-to-std-fs

This commit is contained in:
kssenii 2021-05-05 18:42:40 +03:00
commit 2dabdd0f73
61 changed files with 1395 additions and 992 deletions

View File

@ -91,6 +91,10 @@ ReplxxLineReader::ReplxxLineReader(
/// it also binded to M-p/M-n).
rx.bind_key(Replxx::KEY::meta('N'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::COMPLETE_NEXT, code); });
rx.bind_key(Replxx::KEY::meta('P'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::COMPLETE_PREVIOUS, code); });
/// By default M-BACKSPACE is KILL_TO_WHITESPACE_ON_LEFT, while in readline it is backward-kill-word
rx.bind_key(Replxx::KEY::meta(Replxx::KEY::BACKSPACE), [this](char32_t code) { return rx.invoke(Replxx::ACTION::KILL_TO_BEGINING_OF_WORD, code); });
/// By default C-w is KILL_TO_BEGINING_OF_WORD, while in readline it is unix-word-rubout
rx.bind_key(Replxx::KEY::control('W'), [this](char32_t code) { return rx.invoke(Replxx::ACTION::KILL_TO_WHITESPACE_ON_LEFT, code); });
rx.bind_key(Replxx::KEY::meta('E'), [this](char32_t) { openEditor(); return Replxx::ACTION_RESULT::CONTINUE; });
}

2
contrib/librdkafka vendored

@ -1 +1 @@
Subproject commit cf11d0aa36d4738f2c9bf4377807661660f1be76
Subproject commit 43491d33ca2826531d1e3cae70d4bf1e5249e3c9

View File

@ -61,7 +61,7 @@ RUN apt-get update \
RUN echo 'deb http://archive.ubuntu.com/ubuntu/ focal-proposed restricted main multiverse universe' > /etc/apt/sources.list.d/proposed-repositories.list
RUN apt-get update \
&& apt-get install gcc-10 g++-10 --yes
&& apt-get install gcc-10 g++-10 --yes --no-install-recommends
RUN rm /etc/apt/sources.list.d/proposed-repositories.list && apt-get update

View File

@ -124,4 +124,11 @@ Reboot.
To check if its working, you can use `ulimit -n` command.
## Run ClickHouse server:
```
cd ClickHouse
./build/programs/clickhouse-server --config-file ./programs/server/config.xml
```
[Original article](https://clickhouse.tech/docs/en/development/build_osx/) <!--hide-->

View File

@ -21,18 +21,19 @@ echo https://transtats.bts.gov/PREZIP/On_Time_Reporting_Carrier_On_Time_Performa
Creating a table:
``` sql
CREATE TABLE `ontime` (
CREATE TABLE `ontime`
(
`Year` UInt16,
`Quarter` UInt8,
`Month` UInt8,
`DayofMonth` UInt8,
`DayOfWeek` UInt8,
`FlightDate` Date,
`UniqueCarrier` FixedString(7),
`AirlineID` Int32,
`Carrier` FixedString(2),
`TailNum` String,
`FlightNum` String,
`Reporting_Airline` String,
`DOT_ID_Reporting_Airline` Int32,
`IATA_CODE_Reporting_Airline` String,
`Tail_Number` Int32,
`Flight_Number_Reporting_Airline` String,
`OriginAirportID` Int32,
`OriginAirportSeqID` Int32,
`OriginCityMarketID` Int32,
@ -74,7 +75,7 @@ CREATE TABLE `ontime` (
`Diverted` UInt8,
`CRSElapsedTime` Int32,
`ActualElapsedTime` Int32,
`AirTime` Int32,
`AirTime` Nullable(Int32),
`Flights` Int32,
`Distance` Int32,
`DistanceGroup` UInt8,
@ -132,9 +133,9 @@ CREATE TABLE `ontime` (
`Div5WheelsOff` String,
`Div5TailNum` String
) ENGINE = MergeTree
PARTITION BY Year
ORDER BY (Carrier, FlightDate)
SETTINGS index_granularity = 8192;
PARTITION BY Year
ORDER BY (IATA_CODE_Reporting_Airline, FlightDate)
SETTINGS index_granularity = 8192;
```
Loading data with multiple threads:
@ -206,7 +207,7 @@ LIMIT 10;
Q4. The number of delays by carrier for 2007
``` sql
SELECT Carrier, count(*)
SELECT IATA_CODE_Reporting_Airline AS Carrier, count(*)
FROM ontime
WHERE DepDelay>10 AND Year=2007
GROUP BY Carrier
@ -220,29 +221,29 @@ SELECT Carrier, c, c2, c*100/c2 as c3
FROM
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c
FROM ontime
WHERE DepDelay>10
AND Year=2007
GROUP BY Carrier
)
) q
JOIN
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c2
FROM ontime
WHERE Year=2007
GROUP BY Carrier
) USING Carrier
) qq USING Carrier
ORDER BY c3 DESC;
```
Better version of the same query:
``` sql
SELECT Carrier, avg(DepDelay>10)*100 AS c3
SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3
FROM ontime
WHERE Year=2007
GROUP BY Carrier
@ -256,29 +257,29 @@ SELECT Carrier, c, c2, c*100/c2 as c3
FROM
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c
FROM ontime
WHERE DepDelay>10
AND Year>=2000 AND Year<=2008
GROUP BY Carrier
)
) q
JOIN
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c2
FROM ontime
WHERE Year>=2000 AND Year<=2008
GROUP BY Carrier
) USING Carrier
) qq USING Carrier
ORDER BY c3 DESC;
```
Better version of the same query:
``` sql
SELECT Carrier, avg(DepDelay>10)*100 AS c3
SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3
FROM ontime
WHERE Year>=2000 AND Year<=2008
GROUP BY Carrier
@ -297,7 +298,7 @@ FROM
from ontime
WHERE DepDelay>10
GROUP BY Year
)
) q
JOIN
(
select
@ -305,7 +306,7 @@ JOIN
count(*) as c2
from ontime
GROUP BY Year
) USING (Year)
) qq USING (Year)
ORDER BY Year;
```
@ -340,7 +341,7 @@ Q10.
``` sql
SELECT
min(Year), max(Year), Carrier, count(*) AS cnt,
min(Year), max(Year), IATA_CODE_Reporting_Airline AS Carrier, count(*) AS cnt,
sum(ArrDelayMinutes>30) AS flights_delayed,
round(sum(ArrDelayMinutes>30)/count(*),2) AS rate
FROM ontime

View File

@ -430,7 +430,7 @@ Keys for syslog:
Default value: `LOG_USER` if `address` is specified, `LOG_DAEMON` otherwise.
- format Message format. Possible values: `bsd` and `syslog.`
## send_crash_reports {#server_configuration_parameters-logger}
## send_crash_reports {#server_configuration_parameters-send_crash_reports}
Settings for opt-in sending crash reports to the ClickHouse core developers team via [Sentry](https://sentry.io).
Enabling it, especially in pre-production environments, is highly appreciated.

View File

@ -143,6 +143,16 @@ Possible values:
Default value: 0.
## http_max_uri_size {#http-max-uri-size}
Sets the maximum URI length of an HTTP request.
Possible values:
- Positive integer.
Default value: 1048576.
## send_progress_in_http_headers {#settings-send_progress_in_http_headers}
Enables or disables `X-ClickHouse-Progress` HTTP response headers in `clickhouse-server` responses.

View File

@ -18,6 +18,10 @@ Columns:
- `data_compressed_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Size of compressed data in local files, in bytes.
- `broken_data_files` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Number of files that has been marked as broken (due to an error).
- `broken_data_compressed_bytes` ([UInt64](../../sql-reference/data-types/int-uint.md)) — Size of compressed data in broken files, in bytes.
- `last_exception` ([String](../../sql-reference/data-types/string.md)) — Text message about the last error that occurred (if any).
**Example**

View File

@ -224,7 +224,7 @@ assumeNotNull(x)
**Returned values**
- The original value from the non-`Nullable` type, if it is not `NULL`.
- The default value for the non-`Nullable` type if the original value was `NULL`.
- Implementation specific result if the original value was `NULL`.
**Example**

View File

@ -29,18 +29,19 @@ done
テーブルの作成:
``` sql
CREATE TABLE `ontime` (
CREATE TABLE `ontime`
(
`Year` UInt16,
`Quarter` UInt8,
`Month` UInt8,
`DayofMonth` UInt8,
`DayOfWeek` UInt8,
`FlightDate` Date,
`UniqueCarrier` FixedString(7),
`AirlineID` Int32,
`Carrier` FixedString(2),
`TailNum` String,
`FlightNum` String,
`Reporting_Airline` String,
`DOT_ID_Reporting_Airline` Int32,
`IATA_CODE_Reporting_Airline` String,
`Tail_Number` Int32,
`Flight_Number_Reporting_Airline` String,
`OriginAirportID` Int32,
`OriginAirportSeqID` Int32,
`OriginCityMarketID` Int32,
@ -82,7 +83,7 @@ CREATE TABLE `ontime` (
`Diverted` UInt8,
`CRSElapsedTime` Int32,
`ActualElapsedTime` Int32,
`AirTime` Int32,
`AirTime` Nullable(Int32),
`Flights` Int32,
`Distance` Int32,
`DistanceGroup` UInt8,
@ -140,15 +141,15 @@ CREATE TABLE `ontime` (
`Div5WheelsOff` String,
`Div5TailNum` String
) ENGINE = MergeTree
PARTITION BY Year
ORDER BY (Carrier, FlightDate)
SETTINGS index_granularity = 8192;
PARTITION BY Year
ORDER BY (IATA_CODE_Reporting_Airline, FlightDate)
SETTINGS index_granularity = 8192;
```
データのロード:
``` bash
$ for i in *.zip; do echo $i; unzip -cq $i '*.csv' | sed 's/\.00//g' | clickhouse-client --host=example-perftest01j --query="INSERT INTO ontime FORMAT CSVWithNames"; done
ls -1 *.zip | xargs -I{} -P $(nproc) bash -c "echo {}; unzip -cq {} '*.csv' | sed 's/\.00//g' | clickhouse-client --input_format_with_names_use_header=0 --query='INSERT INTO ontime FORMAT CSVWithNames'"
```
## パーティション済みデータのダウンロード {#download-of-prepared-partitions}
@ -212,10 +213,10 @@ LIMIT 10;
Q4. 2007年のキャリア別の遅延の数
``` sql
SELECT Carrier, count(*)
SELECT IATA_CODE_Reporting_Airline AS Carrier, count(*)
FROM ontime
WHERE DepDelay>10 AND Year=2007
GROUP BY Carrier
GROUP BY IATA_CODE_Reporting_Airline
ORDER BY count(*) DESC;
```
@ -226,32 +227,32 @@ SELECT Carrier, c, c2, c*100/c2 as c3
FROM
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c
FROM ontime
WHERE DepDelay>10
AND Year=2007
GROUP BY Carrier
)
) q
JOIN
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c2
FROM ontime
WHERE Year=2007
GROUP BY Carrier
) USING Carrier
) qq USING Carrier
ORDER BY c3 DESC;
```
同じクエリのより良いバージョン:
``` sql
SELECT Carrier, avg(DepDelay>10)*100 AS c3
SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3
FROM ontime
WHERE Year=2007
GROUP BY Carrier
GROUP BY IATA_CODE_Reporting_Airline
ORDER BY c3 DESC
```
@ -262,29 +263,29 @@ SELECT Carrier, c, c2, c*100/c2 as c3
FROM
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c
FROM ontime
WHERE DepDelay>10
AND Year>=2000 AND Year<=2008
GROUP BY Carrier
)
) q
JOIN
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c2
FROM ontime
WHERE Year>=2000 AND Year<=2008
GROUP BY Carrier
) USING Carrier
) qq USING Carrier
ORDER BY c3 DESC;
```
同じクエリのより良いバージョン:
``` sql
SELECT Carrier, avg(DepDelay>10)*100 AS c3
SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3
FROM ontime
WHERE Year>=2000 AND Year<=2008
GROUP BY Carrier
@ -303,7 +304,7 @@ FROM
from ontime
WHERE DepDelay>10
GROUP BY Year
)
) q
JOIN
(
select
@ -311,7 +312,7 @@ JOIN
count(*) as c2
from ontime
GROUP BY Year
) USING (Year)
) qq USING (Year)
ORDER BY Year;
```
@ -346,7 +347,7 @@ Q10.
``` sql
SELECT
min(Year), max(Year), Carrier, count(*) AS cnt,
min(Year), max(Year), IATA_CODE_Reporting_Airline AS Carrier, count(*) AS cnt,
sum(ArrDelayMinutes>30) AS flights_delayed,
round(sum(ArrDelayMinutes>30)/count(*),2) AS rate
FROM ontime

View File

@ -27,18 +27,19 @@ done
Создание таблицы:
``` sql
CREATE TABLE `ontime` (
CREATE TABLE `ontime`
(
`Year` UInt16,
`Quarter` UInt8,
`Month` UInt8,
`DayofMonth` UInt8,
`DayOfWeek` UInt8,
`FlightDate` Date,
`UniqueCarrier` FixedString(7),
`AirlineID` Int32,
`Carrier` FixedString(2),
`TailNum` String,
`FlightNum` String,
`Reporting_Airline` String,
`DOT_ID_Reporting_Airline` Int32,
`IATA_CODE_Reporting_Airline` String,
`Tail_Number` Int32,
`Flight_Number_Reporting_Airline` String,
`OriginAirportID` Int32,
`OriginAirportSeqID` Int32,
`OriginCityMarketID` Int32,
@ -80,7 +81,7 @@ CREATE TABLE `ontime` (
`Diverted` UInt8,
`CRSElapsedTime` Int32,
`ActualElapsedTime` Int32,
`AirTime` Int32,
`AirTime` Nullable(Int32),
`Flights` Int32,
`Distance` Int32,
`DistanceGroup` UInt8,
@ -138,15 +139,15 @@ CREATE TABLE `ontime` (
`Div5WheelsOff` String,
`Div5TailNum` String
) ENGINE = MergeTree
PARTITION BY Year
ORDER BY (Carrier, FlightDate)
SETTINGS index_granularity = 8192;
PARTITION BY Year
ORDER BY (IATA_CODE_Reporting_Airline, FlightDate)
SETTINGS index_granularity = 8192;
```
Загрузка данных:
``` bash
$ for i in *.zip; do echo $i; unzip -cq $i '*.csv' | sed 's/\.00//g' | clickhouse-client --host=example-perftest01j --query="INSERT INTO ontime FORMAT CSVWithNames"; done
ls -1 *.zip | xargs -I{} -P $(nproc) bash -c "echo {}; unzip -cq {} '*.csv' | sed 's/\.00//g' | clickhouse-client --input_format_with_names_use_header=0 --query='INSERT INTO ontime FORMAT CSVWithNames'"
```
## Скачивание готовых партиций {#skachivanie-gotovykh-partitsii}
@ -211,7 +212,7 @@ LIMIT 10;
Q4. Количество задержек по перевозчикам за 2007 год
``` sql
SELECT Carrier, count(*)
SELECT IATA_CODE_Reporting_Airline AS Carrier, count(*)
FROM ontime
WHERE DepDelay>10 AND Year=2007
GROUP BY Carrier
@ -225,29 +226,29 @@ SELECT Carrier, c, c2, c*100/c2 as c3
FROM
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c
FROM ontime
WHERE DepDelay>10
AND Year=2007
GROUP BY Carrier
)
) q
JOIN
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c2
FROM ontime
WHERE Year=2007
GROUP BY Carrier
) USING Carrier
) qq USING Carrier
ORDER BY c3 DESC;
```
Более оптимальная версия того же запроса:
``` sql
SELECT Carrier, avg(DepDelay>10)*100 AS c3
SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3
FROM ontime
WHERE Year=2007
GROUP BY Carrier
@ -261,29 +262,29 @@ SELECT Carrier, c, c2, c*100/c2 as c3
FROM
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c
FROM ontime
WHERE DepDelay>10
AND Year>=2000 AND Year<=2008
GROUP BY Carrier
)
) q
JOIN
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c2
FROM ontime
WHERE Year>=2000 AND Year<=2008
GROUP BY Carrier
) USING Carrier
) qq USING Carrier
ORDER BY c3 DESC;
```
Более оптимальная версия того же запроса:
``` sql
SELECT Carrier, avg(DepDelay>10)*100 AS c3
SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3
FROM ontime
WHERE Year>=2000 AND Year<=2008
GROUP BY Carrier
@ -302,7 +303,7 @@ FROM
from ontime
WHERE DepDelay>10
GROUP BY Year
)
) q
JOIN
(
select
@ -310,7 +311,7 @@ JOIN
count(*) as c2
from ontime
GROUP BY Year
) USING (Year)
) qq USING (Year)
ORDER BY Year;
```
@ -346,7 +347,7 @@ Q10.
``` sql
SELECT
min(Year), max(Year), Carrier, count(*) AS cnt,
min(Year), max(Year), IATA_CODE_Reporting_Airline AS Carrier, count(*) AS cnt,
sum(ArrDelayMinutes>30) AS flights_delayed,
round(sum(ArrDelayMinutes>30)/count(*),2) AS rate
FROM ontime

View File

@ -415,7 +415,7 @@ ClickHouse проверяет условия для `min_part_size` и `min_part
Значения по умолчанию: при указанном `address` - `LOG_USER`, иначе - `LOG_DAEMON`
- format - формат сообщений. Возможные значения - `bsd` и `syslog`
## send_crash_reports {#server_configuration_parameters-logger}
## send_crash_reports {#server_configuration_parameters-send_crash_reports}
Настройки для отправки сообщений о сбоях в команду разработчиков ядра ClickHouse через [Sentry](https://sentry.io).
Включение этих настроек, особенно в pre-production среде, может дать очень ценную информацию и поможет развитию ClickHouse.

View File

@ -119,6 +119,16 @@ ClickHouse применяет настройку в тех случаях, ко
Значение по умолчанию: 0.
## http_max_uri_size {#http-max-uri-size}
Устанавливает максимальную длину URI в HTTP-запросе.
Возможные значения:
- Положительное целое.
Значение по умолчанию: 1048576.
## send_progress_in_http_headers {#settings-send_progress_in_http_headers}
Включает или отключает HTTP-заголовки `X-ClickHouse-Progress` в ответах `clickhouse-server`.

View File

@ -224,7 +224,7 @@ assumeNotNull(x)
**Возвращаемые значения**
- Исходное значение с не `Nullable` типом, если оно — не `NULL`.
- Значение по умолчанию для не `Nullable` типа, если исходное значение — `NULL`.
- Неспецифицированный результат, зависящий от реализации, если исходное значение — `NULL`.
**Пример**

View File

@ -29,18 +29,19 @@ done
创建表结构:
``` sql
CREATE TABLE `ontime` (
CREATE TABLE `ontime`
(
`Year` UInt16,
`Quarter` UInt8,
`Month` UInt8,
`DayofMonth` UInt8,
`DayOfWeek` UInt8,
`FlightDate` Date,
`UniqueCarrier` FixedString(7),
`AirlineID` Int32,
`Carrier` FixedString(2),
`TailNum` String,
`FlightNum` String,
`Reporting_Airline` String,
`DOT_ID_Reporting_Airline` Int32,
`IATA_CODE_Reporting_Airline` String,
`Tail_Number` Int32,
`Flight_Number_Reporting_Airline` String,
`OriginAirportID` Int32,
`OriginAirportSeqID` Int32,
`OriginCityMarketID` Int32,
@ -82,7 +83,7 @@ CREATE TABLE `ontime` (
`Diverted` UInt8,
`CRSElapsedTime` Int32,
`ActualElapsedTime` Int32,
`AirTime` Int32,
`AirTime` Nullable(Int32),
`Flights` Int32,
`Distance` Int32,
`DistanceGroup` UInt8,
@ -140,15 +141,15 @@ CREATE TABLE `ontime` (
`Div5WheelsOff` String,
`Div5TailNum` String
) ENGINE = MergeTree
PARTITION BY Year
ORDER BY (Carrier, FlightDate)
SETTINGS index_granularity = 8192;
PARTITION BY Year
ORDER BY (IATA_CODE_Reporting_Airline, FlightDate)
SETTINGS index_granularity = 8192;
```
加载数据:
``` bash
$ for i in *.zip; do echo $i; unzip -cq $i '*.csv' | sed 's/\.00//g' | clickhouse-client --host=example-perftest01j --query="INSERT INTO ontime FORMAT CSVWithNames"; done
ls -1 *.zip | xargs -I{} -P $(nproc) bash -c "echo {}; unzip -cq {} '*.csv' | sed 's/\.00//g' | clickhouse-client --input_format_with_names_use_header=0 --query='INSERT INTO ontime FORMAT CSVWithNames'"
```
## 下载预处理好的分区数据 {#xia-zai-yu-chu-li-hao-de-fen-qu-shu-ju}
@ -212,7 +213,7 @@ LIMIT 10;
Q4. 查询2007年各航空公司延误超过10分钟以上的次数
``` sql
SELECT Carrier, count(*)
SELECT IATA_CODE_Reporting_Airline AS Carrier, count(*)
FROM ontime
WHERE DepDelay>10 AND Year=2007
GROUP BY Carrier
@ -226,29 +227,29 @@ SELECT Carrier, c, c2, c*100/c2 as c3
FROM
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c
FROM ontime
WHERE DepDelay>10
AND Year=2007
GROUP BY Carrier
)
) q
JOIN
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c2
FROM ontime
WHERE Year=2007
GROUP BY Carrier
) USING Carrier
) qq USING Carrier
ORDER BY c3 DESC;
```
更好的查询版本:
``` sql
SELECT Carrier, avg(DepDelay>10)*100 AS c3
SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3
FROM ontime
WHERE Year=2007
GROUP BY Carrier
@ -262,29 +263,29 @@ SELECT Carrier, c, c2, c*100/c2 as c3
FROM
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c
FROM ontime
WHERE DepDelay>10
AND Year>=2000 AND Year<=2008
GROUP BY Carrier
)
) q
JOIN
(
SELECT
Carrier,
IATA_CODE_Reporting_Airline AS Carrier,
count(*) AS c2
FROM ontime
WHERE Year>=2000 AND Year<=2008
GROUP BY Carrier
) USING Carrier
) qq USING Carrier
ORDER BY c3 DESC;
```
更好的查询版本:
``` sql
SELECT Carrier, avg(DepDelay>10)*100 AS c3
SELECT IATA_CODE_Reporting_Airline AS Carrier, avg(DepDelay>10)*100 AS c3
FROM ontime
WHERE Year>=2000 AND Year<=2008
GROUP BY Carrier
@ -303,7 +304,7 @@ FROM
from ontime
WHERE DepDelay>10
GROUP BY Year
)
) q
JOIN
(
select
@ -311,7 +312,7 @@ JOIN
count(*) as c2
from ontime
GROUP BY Year
) USING (Year)
) qq USING (Year)
ORDER BY Year;
```
@ -346,7 +347,7 @@ Q10.
``` sql
SELECT
min(Year), max(Year), Carrier, count(*) AS cnt,
min(Year), max(Year), IATA_CODE_Reporting_Airline AS Carrier, count(*) AS cnt,
sum(ArrDelayMinutes>30) AS flights_delayed,
round(sum(ArrDelayMinutes>30)/count(*),2) AS rate
FROM ontime

View File

@ -23,19 +23,9 @@ function _complete_for_clickhouse_entrypoint_bin()
fi
util="${words[1]}"
case "$prev" in
-C|--config-file|--config)
return
;;
# Argh... This looks like a bash bug...
# Redirections are passed to the completion function
# although it is managed by the shell directly...
'<'|'>'|'>>'|[12]'>'|[12]'>>')
return
;;
esac
if _complete_for_clickhouse_generic_bin_impl "$prev"; then
COMPREPLY=( $(compgen -W "$(_clickhouse_get_options "$cmd" "$util")" -- "$cur") )
fi
return 0
}

View File

@ -15,6 +15,13 @@ shopt -s extglob
export _CLICKHOUSE_COMPLETION_LOADED=1
CLICKHOUSE_QueryProcessingStage=(
complete
fetch_columns
with_mergeable_state
with_mergeable_state_after_aggregation
)
function _clickhouse_bin_exist()
{ [ -x "$1" ] || command -v "$1" >& /dev/null; }
@ -30,6 +37,33 @@ function _clickhouse_get_options()
"$@" --help 2>&1 | awk -F '[ ,=<>]' '{ for (i=1; i <= NF; ++i) { if (substr($i, 0, 1) == "-" && length($i) > 1) print $i; } }' | sort -u
}
function _complete_for_clickhouse_generic_bin_impl()
{
local prev=$1 && shift
case "$prev" in
-C|--config-file|--config)
return 1
;;
--stage)
COMPREPLY=( $(compgen -W "${CLICKHOUSE_QueryProcessingStage[*]}" -- "$cur") )
return 1
;;
--host)
COMPREPLY=( $(compgen -A hostname -- "$cur") )
return 1
;;
# Argh... This looks like a bash bug...
# Redirections are passed to the completion function
# although it is managed by the shell directly...
'<'|'>'|'>>'|[12]'>'|[12]'>>')
return 1
;;
esac
return 0
}
function _complete_for_clickhouse_generic_bin()
{
local cur prev
@ -39,19 +73,9 @@ function _complete_for_clickhouse_generic_bin()
COMPREPLY=()
_get_comp_words_by_ref cur prev
case "$prev" in
-C|--config-file|--config)
return
;;
# Argh... This looks like a bash bug...
# Redirections are passed to the completion function
# although it is managed by the shell directly...
'<'|'>'|'>>'|[12]'>'|[12]'>>')
return
;;
esac
if _complete_for_clickhouse_generic_bin_impl "$prev"; then
COMPREPLY=( $(compgen -W "$(_clickhouse_get_options "$cmd")" -- "$cur") )
fi
return 0
}

View File

@ -27,6 +27,7 @@
#include <Parsers/formatAST.h>
#include <Parsers/parseQuery.h>
namespace DB
{

View File

@ -50,7 +50,7 @@ struct QueryFuzzer
// Some debug fields for detecting problematic ASTs with loops.
// These are reset for each fuzzMain call.
std::unordered_set<const IAST *> debug_visited_nodes;
ASTPtr * debug_top_ast;
ASTPtr * debug_top_ast = nullptr;
// This is the only function you have to call -- it will modify the passed

View File

@ -24,11 +24,11 @@ LibraryBridgeHelper::LibraryBridgeHelper(
ContextPtr context_,
const Block & sample_block_,
const Field & dictionary_id_)
: IBridgeHelper(context_)
: IBridgeHelper(context_->getGlobalContext())
, log(&Poco::Logger::get("LibraryBridgeHelper"))
, sample_block(sample_block_)
, config(context_->getConfigRef())
, http_timeout(context_->getSettingsRef().http_receive_timeout.value)
, http_timeout(context_->getGlobalContext()->getSettingsRef().http_receive_timeout.value)
, dictionary_id(dictionary_id_)
{
bridge_port = config.getUInt("library_bridge.port", DEFAULT_PORT);

View File

@ -62,20 +62,19 @@ public:
static constexpr inline auto SCHEMA_ALLOWED_HANDLER = "/schema_allowed";
XDBCBridgeHelper(
ContextPtr global_context_,
ContextPtr context_,
Poco::Timespan http_timeout_,
const std::string & connection_string_)
: IXDBCBridgeHelper(global_context_)
: IXDBCBridgeHelper(context_->getGlobalContext())
, log(&Poco::Logger::get(BridgeHelperMixin::getName() + "BridgeHelper"))
, connection_string(connection_string_)
, http_timeout(http_timeout_)
, config(global_context_->getConfigRef())
, config(context_->getGlobalContext()->getConfigRef())
{
bridge_host = config.getString(BridgeHelperMixin::configPrefix() + ".host", DEFAULT_HOST);
bridge_port = config.getUInt(BridgeHelperMixin::configPrefix() + ".port", DEFAULT_PORT);
}
protected:
auto getConnectionString() const { return connection_string; }

View File

@ -1,6 +1,5 @@
#pragma once
#include <Common/HashTable/HashTable.h>
#include <Common/HashTable/HashTableKeyHolder.h>
#include <Common/ColumnsHashingImpl.h>
@ -15,6 +14,8 @@
#include <Core/Defines.h>
#include <memory>
#include <cassert>
namespace DB
{
@ -594,8 +595,11 @@ struct HashMethodKeysFixed
return prepared_keys[row];
#if defined(__SSSE3__) && !defined(MEMORY_SANITIZER)
if constexpr (!has_low_cardinality && !has_nullable_keys && sizeof(Key) <= 16)
if constexpr (sizeof(Key) <= 16)
{
assert(!has_low_cardinality && !has_nullable_keys);
return packFixedShuffle<Key>(columns_data.get(), keys_size, key_sizes.data(), row, masks.get());
}
#endif
return packFixed<Key>(row, keys_size, Base::getActualColumns(), key_sizes);
}

View File

@ -55,6 +55,7 @@
M(LocalThread, "Number of threads in local thread pools. The threads in local thread pools are taken from the global thread pool.") \
M(LocalThreadActive, "Number of threads in local thread pools running a task.") \
M(DistributedFilesToInsert, "Number of pending files to process for asynchronous insertion into Distributed tables. Number of files for every shard is summed.") \
M(BrokenDistributedFilesToInsert, "Number of files for asynchronous insertion into Distributed tables that has been marked as broken. This metric will starts from 0 on start. Number of files for every shard is summed.") \
M(TablesToDropQueueSize, "Number of dropped tables, that are waiting for background data removal.") \
M(MaxDDLEntryID, "Max processed DDL entry of DDLWorker.") \
M(PartsTemporary, "The part is generating now, it is not in data_parts list.") \

View File

@ -91,49 +91,38 @@ inline UInt32 updateWeakHash32(const DB::UInt8 * pos, size_t size, DB::UInt32 up
{
if (size < 8)
{
DB::UInt64 value = 0;
auto * value_ptr = reinterpret_cast<unsigned char *>(&value);
UInt64 value = 0;
typedef __attribute__((__aligned__(1))) uint16_t uint16_unaligned_t;
typedef __attribute__((__aligned__(1))) uint32_t uint32_unaligned_t;
/// Adopted code from FastMemcpy.h (memcpy_tiny)
switch (size)
{
case 0:
break;
case 1:
value_ptr[0] = pos[0];
__builtin_memcpy(&value, pos, 1);
break;
case 2:
*reinterpret_cast<uint16_t *>(value_ptr) = *reinterpret_cast<const uint16_unaligned_t *>(pos);
__builtin_memcpy(&value, pos, 2);
break;
case 3:
*reinterpret_cast<uint16_t *>(value_ptr) = *reinterpret_cast<const uint16_unaligned_t *>(pos);
value_ptr[2] = pos[2];
__builtin_memcpy(&value, pos, 3);
break;
case 4:
*reinterpret_cast<uint32_t *>(value_ptr) = *reinterpret_cast<const uint32_unaligned_t *>(pos);
__builtin_memcpy(&value, pos, 4);
break;
case 5:
*reinterpret_cast<uint32_t *>(value_ptr) = *reinterpret_cast<const uint32_unaligned_t *>(pos);
value_ptr[4] = pos[4];
__builtin_memcpy(&value, pos, 5);
break;
case 6:
*reinterpret_cast<uint32_t *>(value_ptr) = *reinterpret_cast<const uint32_unaligned_t *>(pos);
*reinterpret_cast<uint16_unaligned_t *>(value_ptr + 4) =
*reinterpret_cast<const uint16_unaligned_t *>(pos + 4);
__builtin_memcpy(&value, pos, 6);
break;
case 7:
*reinterpret_cast<uint32_t *>(value_ptr) = *reinterpret_cast<const uint32_unaligned_t *>(pos);
*reinterpret_cast<uint32_unaligned_t *>(value_ptr + 3) =
*reinterpret_cast<const uint32_unaligned_t *>(pos + 3);
__builtin_memcpy(&value, pos, 7);
break;
default:
__builtin_unreachable();
}
value_ptr[7] = size;
reinterpret_cast<unsigned char *>(&value)[7] = size;
return intHashCRC32(value, updated_value);
}

View File

@ -6,7 +6,7 @@
namespace DB
{
/** Opens a file /proc/self/mstat. Keeps it open and reads memory statistics via 'pread'.
/** Opens a file /proc/self/statm. Keeps it open and reads memory statistics via 'pread'.
* This is Linux specific.
* See: man procfs
*

View File

@ -85,7 +85,7 @@ public:
struct Counter
{
Counter() {}
Counter() = default; //-V730
Counter(const TKey & k, UInt64 c = 0, UInt64 e = 0, size_t h = 0)
: key(k), slot(0), hash(h), count(c), error(e) {}
@ -148,7 +148,7 @@ public:
// Increase weight of a key that already exists
auto hash = counter_map.hash(key);
if (auto counter = findCounter(key, hash); counter)
if (auto * counter = findCounter(key, hash); counter)
{
counter->count += increment;
counter->error += error;
@ -159,12 +159,12 @@ public:
// Key doesn't exist, but can fit in the top K
if (unlikely(size() < capacity()))
{
auto c = new Counter(arena.emplace(key), increment, error, hash);
auto * c = new Counter(arena.emplace(key), increment, error, hash);
push(c);
return;
}
auto min = counter_list.back();
auto * min = counter_list.back();
// The key doesn't exist and cannot fit in the current top K, but
// the new key has a bigger weight and is virtually more present
// compared to the element who is less present on the set. This part
@ -218,7 +218,7 @@ public:
*/
if (m2 > 0)
{
for (auto counter : counter_list)
for (auto * counter : counter_list)
{
counter->count += m2;
counter->error += m2;
@ -226,10 +226,10 @@ public:
}
// The list is sorted in descending order, we have to scan in reverse
for (auto counter : boost::adaptors::reverse(rhs.counter_list))
for (auto * counter : boost::adaptors::reverse(rhs.counter_list))
{
size_t hash = counter_map.hash(counter->key);
if (auto current = findCounter(counter->key, hash))
if (auto * current = findCounter(counter->key, hash))
{
// Subtract m2 previously added, guaranteed not negative
current->count += (counter->count - m2);
@ -262,7 +262,7 @@ public:
std::vector<Counter> topK(size_t k) const
{
std::vector<Counter> res;
for (auto counter : counter_list)
for (auto * counter : counter_list)
{
res.push_back(*counter);
if (res.size() == k)
@ -274,7 +274,7 @@ public:
void write(WriteBuffer & wb) const
{
writeVarUInt(size(), wb);
for (auto counter : counter_list)
for (auto * counter : counter_list)
counter->write(wb);
writeVarUInt(alpha_map.size(), wb);
@ -290,7 +290,7 @@ public:
for (size_t i = 0; i < count; ++i)
{
auto counter = new Counter();
auto * counter = new Counter();
counter->read(rb);
counter->hash = counter_map.hash(counter->key);
push(counter);
@ -325,7 +325,7 @@ protected:
{
while (counter->slot > 0)
{
auto next = counter_list[counter->slot - 1];
auto * next = counter_list[counter->slot - 1];
if (*counter > *next)
{
std::swap(next->slot, counter->slot);
@ -339,7 +339,7 @@ protected:
private:
void destroyElements()
{
for (auto counter : counter_list)
for (auto * counter : counter_list)
{
arena.free(counter->key);
delete counter;
@ -376,7 +376,7 @@ private:
{
removed_keys = 0;
counter_map.clear();
for (auto counter : counter_list)
for (auto * counter : counter_list)
counter_map[counter->key] = counter;
}

View File

@ -310,7 +310,7 @@ public:
template <typename T, typename Z = void *>
using enable_if_not_field_or_stringlike_t = std::enable_if_t<!std::is_same_v<std::decay_t<T>, Field> && !std::is_same_v<NearestFieldType<std::decay_t<T>>, String>, Z>;
Field()
Field() //-V730
: which(Types::Null)
{
}
@ -851,7 +851,7 @@ decltype(auto) castToNearestFieldType(T && x)
}
template <typename T>
Field::Field(T && rhs, enable_if_not_field_or_stringlike_t<T>)
Field::Field(T && rhs, enable_if_not_field_or_stringlike_t<T>) //-V730
{
auto && val = castToNearestFieldType(std::forward<T>(rhs));
createConcrete(std::forward<decltype(val)>(val));

View File

@ -14,6 +14,7 @@ SRCS(
DiskFactory.cpp
DiskLocal.cpp
DiskMemory.cpp
DiskRestartProxy.cpp
DiskSelector.cpp
IDisk.cpp
IVolume.cpp

View File

@ -137,7 +137,7 @@ ColumnPtr wrapInNullable(const ColumnPtr & src, const ColumnsWithTypeAndName & a
if (const auto * nullable = checkAndGetColumn<ColumnNullable>(*elem.column))
{
const ColumnPtr & null_map_column = nullable->getNullMapColumnPtr();
if (!result_null_map_column)
if (!result_null_map_column) //-V1051
{
result_null_map_column = null_map_column;
}

View File

@ -47,6 +47,7 @@ SRCS(
ReadBufferAIO.cpp
ReadBufferFromFile.cpp
ReadBufferFromFileBase.cpp
ReadBufferFromFileDecorator.cpp
ReadBufferFromFileDescriptor.cpp
ReadBufferFromIStream.cpp
ReadBufferFromMemory.cpp
@ -57,6 +58,7 @@ SRCS(
UseSSL.cpp
WriteBufferFromFile.cpp
WriteBufferFromFileBase.cpp
WriteBufferFromFileDecorator.cpp
WriteBufferFromFileDescriptor.cpp
WriteBufferFromFileDescriptorDiscardOnFailure.cpp
WriteBufferFromHTTP.cpp

View File

@ -1043,12 +1043,12 @@ private:
*/
struct AggregateFunctionInstruction
{
const IAggregateFunction * that;
size_t state_offset;
const IColumn ** arguments;
const IAggregateFunction * batch_that;
const IColumn ** batch_arguments;
const UInt64 * offsets = nullptr;
const IAggregateFunction * that{};
size_t state_offset{};
const IColumn ** arguments{};
const IAggregateFunction * batch_that{};
const IColumn ** batch_arguments{};
const UInt64 * offsets{};
};
using AggregateFunctionInstructions = std::vector<AggregateFunctionInstruction>;

View File

@ -26,6 +26,8 @@
#include <Interpreters/DictionaryReader.h>
#include <Interpreters/Context.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/parseAggregateFunctionParameters.h>
@ -119,11 +121,14 @@ bool sanitizeBlock(Block & block, bool throw_if_cannot_create_column)
return true;
}
ExpressionAnalyzerData::~ExpressionAnalyzerData() = default;
ExpressionAnalyzer::ExtractedSettings::ExtractedSettings(const Settings & settings_)
: use_index_for_in_with_subqueries(settings_.use_index_for_in_with_subqueries)
, size_limits_for_set(settings_.max_rows_in_set, settings_.max_bytes_in_set, settings_.set_overflow_mode)
{}
ExpressionAnalyzer::~ExpressionAnalyzer() = default;
ExpressionAnalyzer::ExpressionAnalyzer(
const ASTPtr & query_,
@ -746,11 +751,11 @@ static JoinPtr tryGetStorageJoin(std::shared_ptr<TableJoin> analyzed_join)
return {};
}
static ExpressionActionsPtr createJoinedBlockActions(ContextPtr context, const TableJoin & analyzed_join)
static ActionsDAGPtr createJoinedBlockActions(ContextPtr context, const TableJoin & analyzed_join)
{
ASTPtr expression_list = analyzed_join.rightKeysList();
auto syntax_result = TreeRewriter(context).analyze(expression_list, analyzed_join.columnsFromJoinedTable());
return ExpressionAnalyzer(expression_list, syntax_result, context).getActions(true, false);
return ExpressionAnalyzer(expression_list, syntax_result, context).getActionsDAG(true, false);
}
static bool allowDictJoin(StoragePtr joined_storage, ContextPtr context, String & dict_name, String & key_name)
@ -802,25 +807,22 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(
const ASTTablesInSelectQueryElement & join_element, const ColumnsWithTypeAndName & left_sample_columns)
{
/// Two JOINs are not supported with the same subquery, but different USINGs.
auto join_hash = join_element.getTreeHash();
String join_subquery_id = toString(join_hash.first) + "_" + toString(join_hash.second);
SubqueryForSet & subquery_for_join = subqueries_for_sets[join_subquery_id];
if (joined_plan)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Table join was already created for query");
/// Use StorageJoin if any.
if (!subquery_for_join.join)
subquery_for_join.join = tryGetStorageJoin(syntax->analyzed_join);
JoinPtr join = tryGetStorageJoin(syntax->analyzed_join);
if (!subquery_for_join.join)
if (!join)
{
/// Actions which need to be calculated on joined block.
ExpressionActionsPtr joined_block_actions = createJoinedBlockActions(getContext(), analyzedJoin());
auto joined_block_actions = createJoinedBlockActions(getContext(), analyzedJoin());
Names original_right_columns;
if (!subquery_for_join.source)
{
NamesWithAliases required_columns_with_aliases = analyzedJoin().getRequiredColumns(
joined_block_actions->getSampleBlock(), joined_block_actions->getRequiredColumns());
Block(joined_block_actions->getResultColumns()), joined_block_actions->getRequiredColumns().getNames());
for (auto & pr : required_columns_with_aliases)
original_right_columns.push_back(pr.first);
@ -831,38 +833,51 @@ JoinPtr SelectQueryExpressionAnalyzer::makeTableJoin(
*/
auto interpreter = interpretSubquery(join_element.table_expression, getContext(), original_right_columns, query_options);
subquery_for_join.makeSource(interpreter, std::move(required_columns_with_aliases));
{
joined_plan = std::make_unique<QueryPlan>();
interpreter->buildQueryPlan(*joined_plan);
auto sample_block = interpreter->getSampleBlock();
auto rename_dag = std::make_unique<ActionsDAG>(sample_block.getColumnsWithTypeAndName());
for (const auto & name_with_alias : required_columns_with_aliases)
{
if (sample_block.has(name_with_alias.first))
{
auto pos = sample_block.getPositionByName(name_with_alias.first);
const auto & alias = rename_dag->addAlias(*rename_dag->getInputs()[pos], name_with_alias.second);
rename_dag->getIndex()[pos] = &alias;
}
}
/// TODO You do not need to set this up when JOIN is only needed on remote servers.
subquery_for_join.addJoinActions(joined_block_actions); /// changes subquery_for_join.sample_block inside
auto rename_step = std::make_unique<ExpressionStep>(joined_plan->getCurrentDataStream(), std::move(rename_dag));
rename_step->setStepDescription("Rename joined columns");
joined_plan->addStep(std::move(rename_step));
}
const ColumnsWithTypeAndName & right_sample_columns = subquery_for_join.sample_block.getColumnsWithTypeAndName();
auto joined_actions_step = std::make_unique<ExpressionStep>(joined_plan->getCurrentDataStream(), std::move(joined_block_actions));
joined_actions_step->setStepDescription("Joined actions");
joined_plan->addStep(std::move(joined_actions_step));
const ColumnsWithTypeAndName & right_sample_columns = joined_plan->getCurrentDataStream().header.getColumnsWithTypeAndName();
bool need_convert = syntax->analyzed_join->applyJoinKeyConvert(left_sample_columns, right_sample_columns);
if (need_convert)
subquery_for_join.addJoinActions(std::make_shared<ExpressionActions>(
syntax->analyzed_join->rightConvertingActions(),
ExpressionActionsSettings::fromContext(getContext())));
{
auto converting_step = std::make_unique<ExpressionStep>(joined_plan->getCurrentDataStream(), syntax->analyzed_join->rightConvertingActions());
converting_step->setStepDescription("Convert joined columns");
joined_plan->addStep(std::move(converting_step));
}
subquery_for_join.join = makeJoin(syntax->analyzed_join, subquery_for_join.sample_block, getContext());
join = makeJoin(syntax->analyzed_join, joined_plan->getCurrentDataStream().header, getContext());
/// Do not make subquery for join over dictionary.
if (syntax->analyzed_join->dictionary_reader)
{
JoinPtr join = subquery_for_join.join;
subqueries_for_sets.erase(join_subquery_id);
return join;
}
joined_plan.reset();
}
else
{
const ColumnsWithTypeAndName & right_sample_columns = subquery_for_join.sample_block.getColumnsWithTypeAndName();
bool need_convert = syntax->analyzed_join->applyJoinKeyConvert(left_sample_columns, right_sample_columns);
if (need_convert)
subquery_for_join.addJoinActions(std::make_shared<ExpressionActions>(syntax->analyzed_join->rightConvertingActions()));
}
syntax->analyzed_join->applyJoinKeyConvert(left_sample_columns, {});
return subquery_for_join.join;
return join;
}
ActionsDAGPtr SelectQueryExpressionAnalyzer::appendPrewhere(
@ -1345,6 +1360,11 @@ ExpressionActionsPtr ExpressionAnalyzer::getConstActions(const ColumnsWithTypeAn
return std::make_shared<ExpressionActions>(actions, ExpressionActionsSettings::fromContext(getContext()));
}
std::unique_ptr<QueryPlan> SelectQueryExpressionAnalyzer::getJoinedPlan()
{
return std::move(joined_plan);
}
ActionsDAGPtr SelectQueryExpressionAnalyzer::simpleSelectActions()
{
ExpressionActionsChain new_chain(getContext());

View File

@ -47,9 +47,13 @@ bool sanitizeBlock(Block & block, bool throw_if_cannot_create_column = false);
/// ExpressionAnalyzer sources, intermediates and results. It splits data and logic, allows to test them separately.
struct ExpressionAnalyzerData
{
~ExpressionAnalyzerData();
SubqueriesForSets subqueries_for_sets;
PreparedSets prepared_sets;
std::unique_ptr<QueryPlan> joined_plan;
/// Columns after ARRAY JOIN. If there is no ARRAY JOIN, it's source_columns.
NamesAndTypesList columns_after_array_join;
/// Columns after Columns after ARRAY JOIN and JOIN. If there is no JOIN, it's columns_after_array_join.
@ -99,6 +103,8 @@ public:
: ExpressionAnalyzer(query_, syntax_analyzer_result_, context_, 0, false, {})
{}
~ExpressionAnalyzer();
void appendExpression(ExpressionActionsChain & chain, const ASTPtr & expr, bool only_types);
/// If `ast` is not a SELECT query, just gets all the actions to evaluate the expression.
@ -293,6 +299,7 @@ public:
const AggregateDescriptions & aggregates() const { return aggregate_descriptions; }
const PreparedSets & getPreparedSets() const { return prepared_sets; }
std::unique_ptr<QueryPlan> getJoinedPlan();
/// Tables that will need to be sent to remote servers for distributed query processing.
const TemporaryTablesMapping & getExternalTables() const { return external_tables; }

View File

@ -1504,6 +1504,7 @@ BlockInputStreamPtr HashJoin::createStreamWithNonJoinedRows(const Block & result
void HashJoin::reuseJoinedData(const HashJoin & join)
{
data = join.data;
from_storage_join = true;
joinDispatch(kind, strictness, data->maps, [this](auto kind_, auto strictness_, auto & map)
{
used_flags.reinit<kind_, strictness_>(map.getBufferSizeInCells(data->type) + 1);

View File

@ -159,6 +159,8 @@ public:
void joinTotals(Block & block) const override;
bool isFilled() const override { return from_storage_join || data->type == Type::DICT; }
/** For RIGHT and FULL JOINs.
* A stream that will contain default values from left table, joined with rows from right table, that was not joined before.
* Use only after all calls to joinBlock was done.
@ -344,6 +346,9 @@ private:
ASTTableJoin::Kind kind;
ASTTableJoin::Strictness strictness;
/// This join was created from StorageJoin and it is already filled.
bool from_storage_join = false;
/// Names of key columns in right-side table (in the order they appear in ON/USING clause). @note It could contain duplicates.
const Names & key_names_right;

View File

@ -41,6 +41,10 @@ public:
virtual size_t getTotalByteCount() const = 0;
virtual bool alwaysReturnsEmptySet() const { return false; }
/// StorageJoin/Dictionary is already filled. No need to call addJoinedBlock.
/// Different query plan is used for such joins.
virtual bool isFilled() const { return false; }
virtual BlockInputStreamPtr createStreamWithNonJoinedRows(const Block &, UInt64) const { return {}; }
};

View File

@ -47,6 +47,7 @@
#include <Processors/QueryPlan/FillingStep.h>
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/FinishSortingStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/LimitByStep.h>
#include <Processors/QueryPlan/LimitStep.h>
#include <Processors/QueryPlan/MergeSortingStep.h>
@ -1117,14 +1118,37 @@ void InterpreterSelectQuery::executeImpl(QueryPlan & query_plan, const BlockInpu
if (expressions.hasJoin())
{
QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
if (expressions.join->isFilled())
{
QueryPlanStepPtr filled_join_step = std::make_unique<FilledJoinStep>(
query_plan.getCurrentDataStream(),
expressions.join,
expressions.join_has_delayed_stream,
settings.max_block_size);
filled_join_step->setStepDescription("JOIN");
query_plan.addStep(std::move(filled_join_step));
}
else
{
auto joined_plan = query_analyzer->getJoinedPlan();
if (!joined_plan)
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no joined plan for query");
QueryPlanStepPtr join_step = std::make_unique<JoinStep>(
query_plan.getCurrentDataStream(),
joined_plan->getCurrentDataStream(),
expressions.join,
settings.max_block_size);
join_step->setStepDescription("JOIN");
query_plan.addStep(std::move(join_step));
std::vector<QueryPlanPtr> plans;
plans.emplace_back(std::make_unique<QueryPlan>(std::move(query_plan)));
plans.emplace_back(std::move(joined_plan));
query_plan = QueryPlan();
query_plan.unitePlans(std::move(join_step), {std::move(plans)});
}
}
if (expressions.hasWhere())

View File

@ -1,9 +1,6 @@
#include <Interpreters/SubqueryForSet.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/MergeJoin.h>
#include <Interpreters/ExpressionActions.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Interpreters/PreparedSets.h>
namespace DB
{
@ -13,65 +10,4 @@ SubqueryForSet::~SubqueryForSet() = default;
SubqueryForSet::SubqueryForSet(SubqueryForSet &&) = default;
SubqueryForSet & SubqueryForSet::operator= (SubqueryForSet &&) = default;
void SubqueryForSet::makeSource(std::shared_ptr<InterpreterSelectWithUnionQuery> & interpreter,
NamesWithAliases && joined_block_aliases_)
{
joined_block_aliases = std::move(joined_block_aliases_);
source = std::make_unique<QueryPlan>();
interpreter->buildQueryPlan(*source);
sample_block = interpreter->getSampleBlock();
renameColumns(sample_block);
}
void SubqueryForSet::renameColumns(Block & block)
{
for (const auto & name_with_alias : joined_block_aliases)
{
if (block.has(name_with_alias.first))
{
auto pos = block.getPositionByName(name_with_alias.first);
auto column = block.getByPosition(pos);
block.erase(pos);
column.name = name_with_alias.second;
block.insert(std::move(column));
}
}
}
void SubqueryForSet::addJoinActions(ExpressionActionsPtr actions)
{
actions->execute(sample_block);
if (joined_block_actions == nullptr)
{
joined_block_actions = actions;
}
else
{
auto new_dag = ActionsDAG::merge(
std::move(*joined_block_actions->getActionsDAG().clone()),
std::move(*actions->getActionsDAG().clone()));
joined_block_actions = std::make_shared<ExpressionActions>(new_dag, actions->getSettings());
}
}
bool SubqueryForSet::insertJoinedBlock(Block & block)
{
renameColumns(block);
if (joined_block_actions)
joined_block_actions->execute(block);
return join->addJoinedBlock(block);
}
void SubqueryForSet::setTotals(Block totals)
{
if (join)
{
renameColumns(totals);
join->setTotals(totals);
}
}
}

View File

@ -2,19 +2,16 @@
#include <Core/Block.h>
#include <Storages/IStorage_fwd.h>
#include <Parsers/IAST.h>
#include <Interpreters/IJoin.h>
#include <Interpreters/PreparedSets.h>
namespace DB
{
class InterpreterSelectWithUnionQuery;
class ExpressionActions;
using ExpressionActionsPtr = std::shared_ptr<ExpressionActions>;
class QueryPlan;
class Set;
using SetPtr = std::shared_ptr<Set>;
/// Information on what to do when executing a subquery in the [GLOBAL] IN/JOIN section.
struct SubqueryForSet
{
@ -28,28 +25,10 @@ struct SubqueryForSet
/// If set, build it from result.
SetPtr set;
JoinPtr join;
/// Apply this actions to joined block.
ExpressionActionsPtr joined_block_actions;
Block sample_block; /// source->getHeader() + column renames
/// If set, put the result into the table.
/// This is a temporary table for transferring to remote servers for distributed query processing.
StoragePtr table;
void makeSource(std::shared_ptr<InterpreterSelectWithUnionQuery> & interpreter,
NamesWithAliases && joined_block_aliases_);
void addJoinActions(ExpressionActionsPtr actions);
bool insertJoinedBlock(Block & block);
void setTotals(Block totals);
private:
NamesWithAliases joined_block_aliases; /// Rename column from joined block from this list.
/// Rename source right table column names into qualified column names if they conflicts with left table ones.
void renameColumns(Block & block);
};
/// ID of subquery -> what to do with it.

View File

@ -19,7 +19,6 @@
#include <Interpreters/ReplaceQueryParameterVisitor.h>
#include <Poco/Util/AbstractConfiguration.h>
namespace DB
{

View File

@ -7,6 +7,7 @@
#include <Processors/Transforms/CreatingSetsTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/MergingAggregatedMemoryEfficientTransform.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Processors/Formats/IOutputFormat.h>
#include <Processors/Sources/SourceFromInputStream.h>
#include <Processors/Executors/PipelineExecutor.h>
@ -96,6 +97,12 @@ void QueryPipeline::addTransform(ProcessorPtr transform)
pipe.addTransform(std::move(transform));
}
void QueryPipeline::addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes)
{
checkInitializedAndNotCompleted();
pipe.addTransform(std::move(transform), totals, extremes);
}
void QueryPipeline::transform(const Transformer & transformer)
{
checkInitializedAndNotCompleted();
@ -255,6 +262,96 @@ QueryPipeline QueryPipeline::unitePipelines(
return pipeline;
}
std::unique_ptr<QueryPipeline> QueryPipeline::joinPipelines(
std::unique_ptr<QueryPipeline> left,
std::unique_ptr<QueryPipeline> right,
JoinPtr join,
size_t max_block_size,
Processors * collected_processors)
{
left->checkInitializedAndNotCompleted();
right->checkInitializedAndNotCompleted();
/// Extremes before join are useless. They will be calculated after if needed.
left->pipe.dropExtremes();
right->pipe.dropExtremes();
left->pipe.collected_processors = collected_processors;
right->pipe.collected_processors = collected_processors;
/// In case joined subquery has totals, and we don't, add default chunk to totals.
bool default_totals = false;
if (!left->hasTotals() && right->hasTotals())
{
left->addDefaultTotals();
default_totals = true;
}
/// (left) ──────┐
/// ╞> Joining ─> (joined)
/// (left) ─┐┌───┘
/// └┼───┐
/// (right) ┐ (totals) ──┼─┐ ╞> Joining ─> (joined)
/// ╞> Resize ┐ ╓─┘┌┼─┘
/// (right) ┘ │ ╟──┘└─┐
/// ╞> FillingJoin ─> Resize ╣ ╞> Joining ─> (totals)
/// (totals) ─────────┘ ╙─────┘
size_t num_streams = left->getNumStreams();
right->resize(1);
auto adding_joined = std::make_shared<FillingRightJoinSideTransform>(right->getHeader(), join);
InputPort * totals_port = nullptr;
if (right->hasTotals())
totals_port = adding_joined->addTotalsPort();
right->addTransform(std::move(adding_joined), totals_port, nullptr);
size_t num_streams_including_totals = num_streams + (left->hasTotals() ? 1 : 0);
right->resize(num_streams_including_totals);
/// This counter is needed for every Joining except totals, to decide which Joining will generate non joined rows.
auto finish_counter = std::make_shared<JoiningTransform::FinishCounter>(num_streams);
auto lit = left->pipe.output_ports.begin();
auto rit = right->pipe.output_ports.begin();
for (size_t i = 0; i < num_streams; ++i)
{
auto joining = std::make_shared<JoiningTransform>(left->getHeader(), join, max_block_size, false, default_totals, finish_counter);
connect(**lit, joining->getInputs().front());
connect(**rit, joining->getInputs().back());
*lit = &joining->getOutputs().front();
++lit;
++rit;
if (collected_processors)
collected_processors->emplace_back(joining);
left->pipe.processors.emplace_back(std::move(joining));
}
if (left->hasTotals())
{
auto joining = std::make_shared<JoiningTransform>(left->getHeader(), join, max_block_size, true, default_totals);
connect(*left->pipe.totals_port, joining->getInputs().front());
connect(**rit, joining->getInputs().back());
left->pipe.totals_port = &joining->getOutputs().front();
++rit;
if (collected_processors)
collected_processors->emplace_back(joining);
left->pipe.processors.emplace_back(std::move(joining));
}
left->pipe.processors.insert(left->pipe.processors.end(), right->pipe.processors.begin(), right->pipe.processors.end());
left->pipe.holder = std::move(right->pipe.holder);
left->pipe.header = left->pipe.output_ports.front()->getHeader();
return left;
}
void QueryPipeline::addCreatingSetsTransform(const Block & res_header, SubqueryForSet subquery_for_set, const SizeLimits & limits, ContextPtr context)
{

View File

@ -27,6 +27,9 @@ struct SizeLimits;
struct ExpressionActionsSettings;
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class QueryPipeline
{
public:
@ -52,6 +55,7 @@ public:
void addSimpleTransform(const Pipe::ProcessorGetterWithStreamKind & getter);
/// Add transform with getNumStreams() input ports.
void addTransform(ProcessorPtr transform);
void addTransform(ProcessorPtr transform, InputPort * totals, InputPort * extremes);
using Transformer = std::function<Processors(OutputPortRawPtrs ports)>;
/// Transform pipeline in general way.
@ -90,6 +94,15 @@ public:
size_t max_threads_limit = 0,
Processors * collected_processors = nullptr);
/// Join two pipelines together using JoinPtr.
/// If collector is used, it will collect only newly-added processors, but not processors from pipelines.
static std::unique_ptr<QueryPipeline> joinPipelines(
std::unique_ptr<QueryPipeline> left,
std::unique_ptr<QueryPipeline> right,
JoinPtr join,
size_t max_block_size,
Processors * collected_processors = nullptr);
/// Add other pipeline and execute it before current one.
/// Pipeline must have empty header, it should not generate any chunk.
/// This is used for CreatingSets.

View File

@ -55,8 +55,8 @@ void CreatingSetStep::describeActions(FormatSettings & settings) const
settings.out << prefix;
if (subquery_for_set.set)
settings.out << "Set: ";
else if (subquery_for_set.join)
settings.out << "Join: ";
// else if (subquery_for_set.join)
// settings.out << "Join: ";
settings.out << description << '\n';
}
@ -65,8 +65,8 @@ void CreatingSetStep::describeActions(JSONBuilder::JSONMap & map) const
{
if (subquery_for_set.set)
map.add("Set", description);
else if (subquery_for_set.join)
map.add("Join", description);
// else if (subquery_for_set.join)
// map.add("Join", description);
}
@ -134,8 +134,6 @@ void addCreatingSetsStep(
continue;
auto plan = std::move(set.source);
std::string type = (set.join != nullptr) ? "JOIN"
: "subquery";
auto creating_set = std::make_unique<CreatingSetStep>(
plan->getCurrentDataStream(),
@ -143,7 +141,7 @@ void addCreatingSetsStep(
std::move(set),
limits,
context);
creating_set->setStepDescription("Create set for " + type);
creating_set->setStepDescription("Create set for subquery");
plan->addStep(std::move(creating_set));
input_streams.emplace_back(plan->getCurrentDataStream());

View File

@ -28,22 +28,6 @@ static ITransformingStep::Traits getTraits(const ActionsDAGPtr & actions)
};
}
static ITransformingStep::Traits getJoinTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}
};
}
ExpressionStep::ExpressionStep(const DataStream & input_stream_, ActionsDAGPtr actions_dag_)
: ITransformingStep(
input_stream_,
@ -118,47 +102,4 @@ void ExpressionStep::describeActions(JSONBuilder::JSONMap & map) const
map.add("Expression", expression->toTree());
}
JoinStep::JoinStep(const DataStream & input_stream_, JoinPtr join_, bool has_non_joined_rows_, size_t max_block_size_)
: ITransformingStep(
input_stream_,
Transform::transformHeader(input_stream_.header, join_),
getJoinTraits())
, join(std::move(join_))
, has_non_joined_rows(has_non_joined_rows_)
, max_block_size(max_block_size_)
{
}
void JoinStep::transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
{
/// In case joined subquery has totals, and we don't, add default chunk to totals.
bool add_default_totals = false;
if (!pipeline.hasTotals())
{
pipeline.addDefaultTotals();
add_default_totals = true;
}
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
return std::make_shared<Transform>(header, join, on_totals, add_default_totals);
});
if (has_non_joined_rows)
{
const Block & join_result_sample = pipeline.getHeader();
auto stream = std::make_shared<LazyNonJoinedBlockInputStream>(*join, join_result_sample, max_block_size);
auto source = std::make_shared<SourceFromInputStream>(std::move(stream));
source->setQueryPlanStep(this);
pipeline.addDelayedStream(source);
/// Now, after adding delayed stream, it has implicit dependency on other port.
/// Here we add resize processor to remove this dependency.
/// Otherwise, if we add MergeSorting + MergingSorted transform to pipeline, we could get `Pipeline stuck`
pipeline.resize(pipeline.getNumStreams(), true);
}
}
}

View File

@ -7,9 +7,6 @@ namespace DB
class ActionsDAG;
using ActionsDAGPtr = std::shared_ptr<ActionsDAG>;
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class ExpressionTransform;
class JoiningTransform;
@ -36,23 +33,4 @@ private:
ActionsDAGPtr actions_dag;
};
/// TODO: add separate step for join.
class JoinStep : public ITransformingStep
{
public:
using Transform = JoiningTransform;
explicit JoinStep(const DataStream & input_stream_, JoinPtr join_, bool has_non_joined_rows_, size_t max_block_size_);
String getName() const override { return "Join"; }
void transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override;
const JoinPtr & getJoin() const { return join; }
private:
JoinPtr join;
bool has_non_joined_rows;
size_t max_block_size;
};
}

View File

@ -0,0 +1,89 @@
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPipeline.h>
#include <Processors/Transforms/JoiningTransform.h>
#include <Interpreters/IJoin.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
JoinStep::JoinStep(
const DataStream & left_stream_,
const DataStream & right_stream_,
JoinPtr join_,
size_t max_block_size_)
: join(std::move(join_))
, max_block_size(max_block_size_)
{
input_streams = {left_stream_, right_stream_};
output_stream = DataStream
{
.header = JoiningTransform::transformHeader(left_stream_.header, join),
};
}
QueryPipelinePtr JoinStep::updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &)
{
if (pipelines.size() != 2)
throw Exception(ErrorCodes::LOGICAL_ERROR, "JoinStep expect two input steps");
return QueryPipeline::joinPipelines(std::move(pipelines[0]), std::move(pipelines[1]), join, max_block_size, &processors);
}
void JoinStep::describePipeline(FormatSettings & settings) const
{
IQueryPlanStep::describePipeline(processors, settings);
}
static ITransformingStep::Traits getStorageJoinTraits()
{
return ITransformingStep::Traits
{
{
.preserves_distinct_columns = false,
.returns_single_stream = false,
.preserves_number_of_streams = true,
.preserves_sorting = false,
},
{
.preserves_number_of_rows = false,
}
};
}
FilledJoinStep::FilledJoinStep(const DataStream & input_stream_, JoinPtr join_, size_t max_block_size_)
: ITransformingStep(
input_stream_,
JoiningTransform::transformHeader(input_stream_.header, join_),
getStorageJoinTraits())
, join(std::move(join_))
, max_block_size(max_block_size_)
{
if (!join->isFilled())
throw Exception(ErrorCodes::LOGICAL_ERROR, "FilledJoinStep expects Join to be filled");
}
void FilledJoinStep::transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &)
{
bool default_totals = false;
if (!pipeline.hasTotals() && join->hasTotals())
{
pipeline.addDefaultTotals();
default_totals = true;
}
auto finish_counter = std::make_shared<JoiningTransform::FinishCounter>(pipeline.getNumStreams());
pipeline.addSimpleTransform([&](const Block & header, QueryPipeline::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipeline::StreamType::Totals;
auto counter = on_totals ? nullptr : finish_counter;
return std::make_shared<JoiningTransform>(header, join, max_block_size, on_totals, default_totals, counter);
});
}
}

View File

@ -0,0 +1,50 @@
#pragma once
#include <Processors/QueryPlan/IQueryPlanStep.h>
#include <Processors/QueryPlan/ITransformingStep.h>
namespace DB
{
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
/// Join two data streams.
class JoinStep : public IQueryPlanStep
{
public:
JoinStep(
const DataStream & left_stream_,
const DataStream & right_stream_,
JoinPtr join_,
size_t max_block_size_);
String getName() const override { return "Join"; }
QueryPipelinePtr updatePipeline(QueryPipelines pipelines, const BuildQueryPipelineSettings &) override;
void describePipeline(FormatSettings & settings) const override;
const JoinPtr & getJoin() const { return join; }
private:
JoinPtr join;
size_t max_block_size;
Processors processors;
};
/// Special step for the case when Join is already filled.
/// For StorageJoin and Dictionary.
class FilledJoinStep : public ITransformingStep
{
public:
FilledJoinStep(const DataStream & input_stream_, JoinPtr join_, size_t max_block_size_);
String getName() const override { return "FilledJoin"; }
void transformPipeline(QueryPipeline & pipeline, const BuildQueryPipelineSettings &) override;
private:
JoinPtr join;
size_t max_block_size;
};
}

View File

@ -3,6 +3,7 @@
#include <Processors/QueryPlan/FilterStep.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/QueryPlan/JoinStep.h>
#include <Processors/QueryPlan/ArrayJoinStep.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/CubeStep.h>
@ -72,8 +73,8 @@ static size_t tryAddNewFilterStep(
/// Add new Filter step before Aggregating.
/// Expression/Filter -> Aggregating -> Something
auto & node = nodes.emplace_back();
node.children.swap(child_node->children);
child_node->children.emplace_back(&node);
node.children.emplace_back(&node);
std::swap(node.children[0], child_node->children[0]);
/// Expression/Filter -> Aggregating -> Filter -> Something
/// New filter column is the first one.

View File

@ -45,8 +45,6 @@ void CreatingSetsTransform::startSubquery()
{
if (subquery.set)
LOG_TRACE(log, "Creating set.");
if (subquery.join)
LOG_TRACE(log, "Creating join.");
if (subquery.table)
LOG_TRACE(log, "Filling temporary table.");
@ -54,10 +52,9 @@ void CreatingSetsTransform::startSubquery()
table_out = subquery.table->write({}, subquery.table->getInMemoryMetadataPtr(), getContext());
done_with_set = !subquery.set;
done_with_join = !subquery.join;
done_with_table = !subquery.table;
if (done_with_set && done_with_join && done_with_table)
if (done_with_set /*&& done_with_join*/ && done_with_table)
throw Exception("Logical error: nothing to do with subquery", ErrorCodes::LOGICAL_ERROR);
if (table_out)
@ -72,8 +69,6 @@ void CreatingSetsTransform::finishSubquery()
if (subquery.set)
LOG_DEBUG(log, "Created Set with {} entries from {} rows in {} sec.", subquery.set->getTotalRowCount(), read_rows, seconds);
if (subquery.join)
LOG_DEBUG(log, "Created Join with {} entries from {} rows in {} sec.", subquery.join->getTotalRowCount(), read_rows, seconds);
if (subquery.table)
LOG_DEBUG(log, "Created Table with {} rows in {} sec.", read_rows, seconds);
}
@ -81,12 +76,6 @@ void CreatingSetsTransform::finishSubquery()
{
LOG_DEBUG(log, "Subquery has empty result.");
}
if (totals)
subquery.setTotals(getInputPort().getHeader().cloneWithColumns(totals.detachColumns()));
else
/// Set empty totals anyway, it is needed for MergeJoin.
subquery.setTotals({});
}
void CreatingSetsTransform::init()
@ -111,12 +100,6 @@ void CreatingSetsTransform::consume(Chunk chunk)
done_with_set = true;
}
if (!done_with_join)
{
if (!subquery.insertJoinedBlock(block))
done_with_join = true;
}
if (!done_with_table)
{
block = materializeBlock(block);
@ -130,7 +113,7 @@ void CreatingSetsTransform::consume(Chunk chunk)
done_with_table = true;
}
if (done_with_set && done_with_join && done_with_table)
if (done_with_set && done_with_table)
finishConsume();
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <DataStreams/SizeLimits.h>
#include <DataStreams/IBlockStream_fwd.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/SubqueryForSet.h>
#include <Processors/IAccumulatingTransform.h>
@ -43,7 +44,7 @@ private:
Stopwatch watch;
bool done_with_set = true;
bool done_with_join = true;
//bool done_with_join = true;
bool done_with_table = true;
SizeLimits network_transfer_limits;

View File

@ -1,10 +1,17 @@
#include <Processors/Transforms/JoiningTransform.h>
#include <Interpreters/ExpressionAnalyzer.h>
#include <Interpreters/ExpressionActions.h>
#include <DataStreams/IBlockInputStream.h>
#include <DataTypes/DataTypesNumber.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
Block JoiningTransform::transformHeader(Block header, const JoinPtr & join)
{
ExtraBlockPtr tmp;
@ -12,13 +19,128 @@ Block JoiningTransform::transformHeader(Block header, const JoinPtr & join)
return header;
}
JoiningTransform::JoiningTransform(Block input_header, JoinPtr join_,
bool on_totals_, bool default_totals_)
: ISimpleTransform(input_header, transformHeader(input_header, join_), on_totals_)
JoiningTransform::JoiningTransform(
Block input_header,
JoinPtr join_,
size_t max_block_size_,
bool on_totals_,
bool default_totals_,
FinishCounterPtr finish_counter_)
: IProcessor({input_header}, {transformHeader(input_header, join_)})
, join(std::move(join_))
, on_totals(on_totals_)
, default_totals(default_totals_)
{}
, finish_counter(std::move(finish_counter_))
, max_block_size(max_block_size_)
{
if (!join->isFilled())
inputs.emplace_back(Block(), this);
}
IProcessor::Status JoiningTransform::prepare()
{
auto & output = outputs.front();
/// Check can output.
if (output.isFinished() || stop_reading)
{
output.finish();
for (auto & input : inputs)
input.close();
return Status::Finished;
}
if (!output.canPush())
{
for (auto & input : inputs)
input.setNotNeeded();
return Status::PortFull;
}
/// Output if has data.
if (has_output)
{
output.push(std::move(output_chunk));
has_output = false;
return Status::PortFull;
}
if (inputs.size() > 1)
{
auto & last_in = inputs.back();
if (!last_in.isFinished())
{
last_in.setNeeded();
if (last_in.hasData())
throw Exception(ErrorCodes::LOGICAL_ERROR, "No data is expected from second JoiningTransform port");
return Status::NeedData;
}
}
if (has_input)
return Status::Ready;
auto & input = inputs.front();
if (input.isFinished())
{
if (process_non_joined)
return Status::Ready;
output.finish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
input_chunk = input.pull(true);
has_input = true;
return Status::Ready;
}
void JoiningTransform::work()
{
if (has_input)
{
transform(input_chunk);
output_chunk.swap(input_chunk);
has_input = not_processed != nullptr;
has_output = !output_chunk.empty();
}
else
{
if (!non_joined_stream)
{
if (!finish_counter || !finish_counter->isLast())
{
process_non_joined = false;
return;
}
non_joined_stream = join->createStreamWithNonJoinedRows(outputs.front().getHeader(), max_block_size);
if (!non_joined_stream)
{
process_non_joined = false;
return;
}
}
auto block = non_joined_stream->read();
if (!block)
{
process_non_joined = false;
return;
}
auto rows = block.rows();
output_chunk.setColumns(block.getColumns(), rows);
has_output = true;
}
}
void JoiningTransform::transform(Chunk & chunk)
{
@ -28,7 +150,7 @@ void JoiningTransform::transform(Chunk & chunk)
if (join->alwaysReturnsEmptySet() && !on_totals)
{
stopReading();
stop_reading = true;
chunk.clear();
return;
}
@ -42,7 +164,7 @@ void JoiningTransform::transform(Chunk & chunk)
auto cols = chunk.detachColumns();
for (auto & col : cols)
col = col->cloneResized(1);
block = getInputPort().getHeader().cloneWithColumns(std::move(cols));
block = inputs.front().getHeader().cloneWithColumns(std::move(cols));
/// Drop totals if both out stream and joined stream doesn't have ones.
/// See comment in ExpressionTransform.h
@ -61,29 +183,122 @@ void JoiningTransform::transform(Chunk & chunk)
Block JoiningTransform::readExecute(Chunk & chunk)
{
Block res;
// std::cerr << "=== Chunk rows " << chunk.getNumRows() << " cols " << chunk.getNumColumns() << std::endl;
if (!not_processed)
{
// std::cerr << "!not_processed " << std::endl;
if (chunk.hasColumns())
res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
res = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns());
if (res)
join->joinBlock(res, not_processed);
}
else if (not_processed->empty()) /// There's not processed data inside expression.
{
// std::cerr << "not_processed->empty() " << std::endl;
if (chunk.hasColumns())
res = getInputPort().getHeader().cloneWithColumns(chunk.detachColumns());
res = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns());
not_processed.reset();
join->joinBlock(res, not_processed);
}
else
{
// std::cerr << "not not_processed->empty() " << std::endl;
res = std::move(not_processed->block);
join->joinBlock(res, not_processed);
}
// std::cerr << "Res block rows " << res.rows() << " cols " << res.columns() << std::endl;
return res;
}
FillingRightJoinSideTransform::FillingRightJoinSideTransform(Block input_header, JoinPtr join_)
: IProcessor({input_header}, {Block()})
, join(std::move(join_))
{}
InputPort * FillingRightJoinSideTransform::addTotalsPort()
{
if (inputs.size() > 1)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Totals port was already added to FillingRightJoinSideTransform");
return &inputs.emplace_back(inputs.front().getHeader(), this);
}
IProcessor::Status FillingRightJoinSideTransform::prepare()
{
auto & output = outputs.front();
/// Check can output.
if (output.isFinished())
{
for (auto & input : inputs)
input.close();
return Status::Finished;
}
if (!output.canPush())
{
for (auto & input : inputs)
input.setNotNeeded();
return Status::PortFull;
}
auto & input = inputs.front();
if (stop_reading)
{
input.close();
}
else if (!input.isFinished())
{
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
chunk = input.pull(true);
return Status::Ready;
}
if (inputs.size() > 1)
{
auto & totals_input = inputs.back();
if (!totals_input.isFinished())
{
totals_input.setNeeded();
if (!totals_input.hasData())
return Status::NeedData;
chunk = totals_input.pull(true);
for_totals = true;
return Status::Ready;
}
}
else if (!set_totals)
{
chunk.setColumns(inputs.front().getHeader().cloneEmpty().getColumns(), 0);
for_totals = true;
return Status::Ready;
}
output.finish();
return Status::Finished;
}
void FillingRightJoinSideTransform::work()
{
auto block = inputs.front().getHeader().cloneWithColumns(chunk.detachColumns());
if (for_totals)
join->setTotals(block);
else
stop_reading = !join->addJoinedBlock(block);
set_totals = for_totals;
}
}

View File

@ -1,5 +1,5 @@
#pragma once
#include <Processors/ISimpleTransform.h>
#include <Processors/IProcessor.h>
namespace DB
@ -8,21 +8,63 @@ namespace DB
class IJoin;
using JoinPtr = std::shared_ptr<IJoin>;
class JoiningTransform : public ISimpleTransform
class IBlockInputStream;
using BlockInputStreamPtr = std::shared_ptr<IBlockInputStream>;
/// Join rows to chunk form left table.
/// This transform usually has two input ports and one output.
/// First input is for data from left table.
/// Second input has empty header and is connected with FillingRightJoinSide.
/// We can process left table only when Join is filled. Second input is used to signal that FillingRightJoinSide is finished.
class JoiningTransform : public IProcessor
{
public:
JoiningTransform(Block input_header, JoinPtr join_,
bool on_totals_ = false, bool default_totals_ = false);
/// Count streams and check which is last.
/// The last one should process non-joined rows.
class FinishCounter
{
public:
explicit FinishCounter(size_t total_) : total(total_) {}
bool isLast()
{
return finished.fetch_add(1) + 1 >= total;
}
private:
const size_t total;
std::atomic<size_t> finished{0};
};
using FinishCounterPtr = std::shared_ptr<FinishCounter>;
JoiningTransform(
Block input_header,
JoinPtr join_,
size_t max_block_size_,
bool on_totals_ = false,
bool default_totals_ = false,
FinishCounterPtr finish_counter_ = nullptr);
String getName() const override { return "JoiningTransform"; }
static Block transformHeader(Block header, const JoinPtr & join);
Status prepare() override;
void work() override;
protected:
void transform(Chunk & chunk) override;
bool needInputData() const override { return !not_processed; }
void transform(Chunk & chunk);
private:
Chunk input_chunk;
Chunk output_chunk;
bool has_input = false;
bool has_output = false;
bool stop_reading = false;
bool process_non_joined = true;
JoinPtr join;
bool on_totals;
/// This flag means that we have manually added totals to our pipeline.
@ -33,7 +75,33 @@ private:
ExtraBlockPtr not_processed;
FinishCounterPtr finish_counter;
BlockInputStreamPtr non_joined_stream;
size_t max_block_size;
Block readExecute(Chunk & chunk);
};
/// Fills Join with block from right table.
/// Has single input and single output port.
/// Output port has empty header. It is closed when al data is inserted in join.
class FillingRightJoinSideTransform : public IProcessor
{
public:
FillingRightJoinSideTransform(Block input_header, JoinPtr join_);
String getName() const override { return "FillingRightJoinSide"; }
InputPort * addTotalsPort();
Status prepare() override;
void work() override;
private:
JoinPtr join;
Chunk chunk;
bool stop_reading = false;
bool for_totals = false;
bool set_totals = false;
};
}

View File

@ -107,6 +107,7 @@ SRCS(
QueryPlan/IQueryPlanStep.cpp
QueryPlan/ISourceStep.cpp
QueryPlan/ITransformingStep.cpp
QueryPlan/JoinStep.cpp
QueryPlan/LimitByStep.cpp
QueryPlan/LimitStep.cpp
QueryPlan/MergeSortingStep.cpp

View File

@ -34,6 +34,7 @@ namespace CurrentMetrics
{
extern const Metric DistributedSend;
extern const Metric DistributedFilesToInsert;
extern const Metric BrokenDistributedFilesToInsert;
}
namespace fs = std::filesystem;
@ -304,6 +305,7 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
, log(&Poco::Logger::get(getLoggerName()))
, monitor_blocker(monitor_blocker_)
, metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0)
, metric_broken_files(CurrentMetrics::BrokenDistributedFilesToInsert, 0)
{
task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); });
task_handle->activateAndSchedule();
@ -368,20 +370,20 @@ void StorageDistributedDirectoryMonitor::run()
{
do_sleep = !processFiles(files);
std::lock_guard metrics_lock(metrics_mutex);
last_exception = std::exception_ptr{};
std::lock_guard status_lock(status_mutex);
status.last_exception = std::exception_ptr{};
}
catch (...)
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
do_sleep = true;
++error_count;
++status.error_count;
sleep_time = std::min(
std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(error_count))},
std::chrono::milliseconds{Int64(default_sleep_time.count() * std::exp2(status.error_count))},
max_sleep_time);
tryLogCurrentException(getLoggerName().data());
last_exception = std::current_exception();
status.last_exception = std::current_exception();
}
}
else
@ -392,9 +394,9 @@ void StorageDistributedDirectoryMonitor::run()
const auto now = std::chrono::system_clock::now();
if (now - last_decrease_time > decrease_error_count_period)
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
error_count /= 2;
status.error_count /= 2;
last_decrease_time = now;
}
@ -500,16 +502,16 @@ std::map<UInt64, std::string> StorageDistributedDirectoryMonitor::getFiles()
}
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
if (files_count != files.size())
LOG_TRACE(log, "Files set to {} (was {})", files.size(), files_count);
if (bytes_count != new_bytes_count)
LOG_TRACE(log, "Bytes set to {} (was {})", new_bytes_count, bytes_count);
if (status.files_count != files.size())
LOG_TRACE(log, "Files set to {} (was {})", files.size(), status.files_count);
if (status.bytes_count != new_bytes_count)
LOG_TRACE(log, "Bytes set to {} (was {})", new_bytes_count, status.bytes_count);
metric_pending_files.changeTo(files.size());
files_count = files.size();
bytes_count = new_bytes_count;
status.files_count = files.size();
status.bytes_count = new_bytes_count;
}
return files;
@ -560,8 +562,9 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
writeRemoteConvert(distributed_header, remote, compression_expected, in, log);
remote.writeSuffix();
}
catch (const Exception & e)
catch (Exception & e)
{
e.addMessage(fmt::format("While sending {}", file_path));
maybeMarkAsBroken(file_path, e);
throw;
}
@ -708,7 +711,7 @@ struct StorageDistributedDirectoryMonitor::Batch
if (remote)
remote->writeSuffix();
}
catch (const Exception & e)
catch (Exception & e)
{
if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
{
@ -716,8 +719,15 @@ struct StorageDistributedDirectoryMonitor::Batch
batch_broken = true;
}
else
{
std::vector<std::string> files(file_index_to_path.size());
for (const auto & [index, name] : file_index_to_path)
files.push_back(name);
e.addMessage(fmt::format("While sending batch {}", fmt::join(files, "\n")));
throw;
}
}
if (!batch_broken)
{
@ -817,10 +827,10 @@ bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t
return false;
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
metric_pending_files.add();
bytes_count += file_size;
++files_count;
status.bytes_count += file_size;
++status.files_count;
}
return task_handle->scheduleAfter(ms, false);
@ -828,16 +838,9 @@ bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t
StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::getStatus()
{
std::lock_guard metrics_lock(metrics_mutex);
return Status{
path,
last_exception,
error_count,
files_count,
bytes_count,
monitor_blocker.isCancelled(),
};
std::lock_guard status_lock(status_mutex);
Status current_status{status, path, monitor_blocker.isCancelled()};
return current_status;
}
void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map<UInt64, std::string> & files)
@ -964,11 +967,17 @@ void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_p
auto broken_dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path + "/broken/");
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
size_t file_size = fs::file_size(file_path);
--files_count;
bytes_count -= file_size;
--status.files_count;
status.bytes_count -= file_size;
++status.broken_files_count;
status.broken_bytes_count += file_size;
metric_broken_files.add();
}
fs::rename(file_path, broken_file_path);
@ -980,10 +989,10 @@ void StorageDistributedDirectoryMonitor::markAsSend(const std::string & file_pat
size_t file_size = fs::file_size(file_path);
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
metric_pending_files.sub();
--files_count;
bytes_count -= file_size;
--status.files_count;
status.bytes_count -= file_size;
}
fs::remove(file_path);
@ -1012,7 +1021,7 @@ void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_rela
std::lock_guard lock{mutex};
{
std::lock_guard metrics_lock(metrics_mutex);
std::lock_guard status_lock(status_mutex);
relative_path = new_relative_path;
path = disk->getPath() + relative_path + '/';
}

View File

@ -50,15 +50,23 @@ public:
/// For scheduling via DistributedBlockOutputStream
bool addAndSchedule(size_t file_size, size_t ms);
struct InternalStatus
{
std::exception_ptr last_exception;
size_t error_count = 0;
size_t files_count = 0;
size_t bytes_count = 0;
size_t broken_files_count = 0;
size_t broken_bytes_count = 0;
};
/// system.distribution_queue interface
struct Status
struct Status : InternalStatus
{
std::string path;
std::exception_ptr last_exception;
size_t error_count;
size_t files_count;
size_t bytes_count;
bool is_blocked;
bool is_blocked = false;
};
Status getStatus();
@ -92,11 +100,8 @@ private:
struct BatchHeader;
struct Batch;
std::mutex metrics_mutex;
size_t error_count = 0;
size_t files_count = 0;
size_t bytes_count = 0;
std::exception_ptr last_exception;
std::mutex status_mutex;
InternalStatus status;
const std::chrono::milliseconds default_sleep_time;
std::chrono::milliseconds sleep_time;
@ -110,6 +115,7 @@ private:
BackgroundSchedulePoolTaskHolder task_handle;
CurrentMetrics::Increment metric_pending_files;
CurrentMetrics::Increment metric_broken_files;
friend class DirectoryMonitorBlockInputStream;
};

View File

@ -3128,7 +3128,6 @@ StorageReplicatedMergeTree::CreateMergeEntryResult StorageReplicatedMergeTree::c
entry.merge_type = merge_type;
entry.deduplicate = deduplicate;
entry.deduplicate_by_columns = deduplicate_by_columns;
entry.merge_type = merge_type;
entry.create_time = time(nullptr);
for (const auto & part : parts)
@ -5215,11 +5214,7 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
{
auto log_entries = zookeeper->getChildren(zookeeper_path + "/log");
if (log_entries.empty())
{
res.log_max_index = 0;
}
else
if (!log_entries.empty())
{
const String & last_log_entry = *std::max_element(log_entries.begin(), log_entries.end());
res.log_max_index = parse<UInt64>(last_log_entry.substr(strlen("log-")));
@ -5231,7 +5226,6 @@ void StorageReplicatedMergeTree::getStatus(Status & res, bool with_zk_fields)
auto all_replicas = zookeeper->getChildren(zookeeper_path + "/replicas");
res.total_replicas = all_replicas.size();
res.active_replicas = 0;
for (const String & replica : all_replicas)
if (zookeeper->exists(zookeeper_path + "/replicas/" + replica + "/is_active"))
++res.active_replicas;
@ -5365,7 +5359,7 @@ void StorageReplicatedMergeTree::fetchPartition(
ContextPtr query_context)
{
Macros::MacroExpansionInfo info;
info.expand_special_macros_only = false;
info.expand_special_macros_only = false; //-V1048
info.table_id = getStorageID();
info.table_id.uuid = UUIDHelpers::Nil;
auto expand_from = query_context->getMacros()->expand(from_, info);
@ -6317,7 +6311,7 @@ void StorageReplicatedMergeTree::movePartitionToTable(const StoragePtr & dest_ta
entry_delete.type = LogEntry::DROP_RANGE;
entry_delete.source_replica = replica_name;
entry_delete.new_part_name = drop_range_fake_part_name;
entry_delete.detach = false;
entry_delete.detach = false; //-V1048
entry_delete.create_time = time(nullptr);
}

View File

@ -98,6 +98,8 @@ NamesAndTypesList StorageSystemDistributionQueue::getNamesAndTypes()
{ "error_count", std::make_shared<DataTypeUInt64>() },
{ "data_files", std::make_shared<DataTypeUInt64>() },
{ "data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
{ "broken_data_files", std::make_shared<DataTypeUInt64>() },
{ "broken_data_compressed_bytes", std::make_shared<DataTypeUInt64>() },
{ "last_exception", std::make_shared<DataTypeString>() },
};
}
@ -181,6 +183,8 @@ void StorageSystemDistributionQueue::fillData(MutableColumns & res_columns, Cont
res_columns[col_num++]->insert(status.error_count);
res_columns[col_num++]->insert(status.files_count);
res_columns[col_num++]->insert(status.bytes_count);
res_columns[col_num++]->insert(status.broken_files_count);
res_columns[col_num++]->insert(status.broken_bytes_count);
if (status.last_exception)
res_columns[col_num++]->insert(getExceptionMessage(status.last_exception, false));

View File

@ -376,74 +376,6 @@ def test_in_memory(start_cluster):
"Wide\t1\n")
def test_in_memory_wal(start_cluster):
# Merges are disabled in config
for i in range(5):
insert_random_data('wal_table', node11, 50)
node12.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
def check(node, rows, parts):
node.query("SELECT count() FROM wal_table") == "{}\n".format(rows)
node.query(
"SELECT count() FROM system.parts WHERE table = 'wal_table' AND part_type = 'InMemory'") == "{}\n".format(
parts)
check(node11, 250, 5)
check(node12, 250, 5)
# WAL works at inserts
node11.restart_clickhouse(kill=True)
check(node11, 250, 5)
# WAL works at fetches
node12.restart_clickhouse(kill=True)
check(node12, 250, 5)
insert_random_data('wal_table', node11, 50)
node12.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
# Disable replication
with PartitionManager() as pm:
pm.partition_instances(node11, node12)
check(node11, 300, 6)
wal_file = "/var/lib/clickhouse/data/default/wal_table/wal.bin"
# Corrupt wal file
# Truncate it to it's size minus 10 bytes
node11.exec_in_container(['bash', '-c', 'truncate --size="$(($(stat -c "%s" {}) - 10))" {}'.format(wal_file, wal_file)],
privileged=True, user='root')
node11.restart_clickhouse(kill=True)
# Broken part is lost, but other restored successfully
check(node11, 250, 5)
# WAL with blocks from 0 to 4
broken_wal_file = "/var/lib/clickhouse/data/default/wal_table/wal_0_4.bin"
# Check file exists
node11.exec_in_container(['bash', '-c', 'test -f {}'.format(broken_wal_file)])
# Fetch lost part from replica
node11.query("SYSTEM SYNC REPLICA wal_table", timeout=20)
check(node11, 300, 6)
# Check that new data is written to new wal, but old is still exists for restoring
# Check file not empty
node11.exec_in_container(['bash', '-c', 'test -s {}'.format(wal_file)])
# Check file exists
node11.exec_in_container(['bash', '-c', 'test -f {}'.format(broken_wal_file)])
# Data is lost without WAL
node11.query("ALTER TABLE wal_table MODIFY SETTING in_memory_parts_enable_wal = 0")
with PartitionManager() as pm:
pm.partition_instances(node11, node12)
insert_random_data('wal_table', node11, 50)
check(node11, 350, 7)
node11.restart_clickhouse(kill=True)
check(node11, 300, 6)
def test_in_memory_wal_rotate(start_cluster):
# Write every part to single wal
node11.query("ALTER TABLE restore_table MODIFY SETTING write_ahead_log_max_bytes = 10")

View File

@ -2467,8 +2467,6 @@ def test_kafka_issue14202(kafka_cluster):
kafka_format = 'JSONEachRow';
''')
time.sleep(3)
instance.query(
'INSERT INTO test.kafka_q SELECT t, some_string FROM ( SELECT dt AS t, some_string FROM test.empty_table )')
# check instance is alive

View File

@ -1,6 +1,6 @@
INSERT
1 0 1 1
1 0 1 1 0 0
FLUSH
1 0 0 0
1 0 0 0 0 0
UNBLOCK
0 0 0 0
0 0 0 0 0 0

View File

@ -10,15 +10,15 @@ select * from system.distribution_queue;
select 'INSERT';
system stop distributed sends dist_01293;
insert into dist_01293 select * from numbers(10);
select is_blocked, error_count, data_files, data_compressed_bytes>100 from system.distribution_queue where database = currentDatabase();
select is_blocked, error_count, data_files, data_compressed_bytes>100, broken_data_files, broken_data_compressed_bytes from system.distribution_queue where database = currentDatabase();
system flush distributed dist_01293;
select 'FLUSH';
select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue where database = currentDatabase();
select is_blocked, error_count, data_files, data_compressed_bytes, broken_data_files, broken_data_compressed_bytes from system.distribution_queue where database = currentDatabase();
select 'UNBLOCK';
system start distributed sends dist_01293;
select is_blocked, error_count, data_files, data_compressed_bytes from system.distribution_queue where database = currentDatabase();
select is_blocked, error_count, data_files, data_compressed_bytes, broken_data_files, broken_data_compressed_bytes from system.distribution_queue where database = currentDatabase();
drop table null_01293;
drop table dist_01293;