Merge branch 'master' into build/bump-llvm-15

This commit is contained in:
Azat Khuzhin 2022-10-17 22:13:11 +03:00 committed by GitHub
commit 123fe3e5df
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
42 changed files with 1651 additions and 50 deletions

View File

@ -1,10 +1,460 @@
---
slug: /zh/getting-started/example-datasets/brown-benchmark
sidebar_label: Brown University Benchmark
description: A new analytical benchmark for machine-generated log data
title: "Brown University Benchmark"
sidebar_label: 布朗大学基准
description: 机器生成日志数据的新分析基准
title: "布朗大学基准"
---
import Content from '@site/docs/en/getting-started/example-datasets/brown-benchmark.md';
`MgBench` 是机器生成的日志数据的新分析基准,[Andrew Crotty](http://cs.brown.edu/people/acrotty/)。
<Content />
下载数据:
```bash
wget https://datasets.clickhouse.com/mgbench{1..3}.csv.xz
```
解压数据:
```bash
xz -v -d mgbench{1..3}.csv.xz
```
创建数据库和表:
```sql
CREATE DATABASE mgbench;
```
```sql
USE mgbench;
```
```sql
CREATE TABLE mgbench.logs1 (
log_time DateTime,
machine_name LowCardinality(String),
machine_group LowCardinality(String),
cpu_idle Nullable(Float32),
cpu_nice Nullable(Float32),
cpu_system Nullable(Float32),
cpu_user Nullable(Float32),
cpu_wio Nullable(Float32),
disk_free Nullable(Float32),
disk_total Nullable(Float32),
part_max_used Nullable(Float32),
load_fifteen Nullable(Float32),
load_five Nullable(Float32),
load_one Nullable(Float32),
mem_buffers Nullable(Float32),
mem_cached Nullable(Float32),
mem_free Nullable(Float32),
mem_shared Nullable(Float32),
swap_free Nullable(Float32),
bytes_in Nullable(Float32),
bytes_out Nullable(Float32)
)
ENGINE = MergeTree()
ORDER BY (machine_group, machine_name, log_time);
```
```sql
CREATE TABLE mgbench.logs2 (
log_time DateTime,
client_ip IPv4,
request String,
status_code UInt16,
object_size UInt64
)
ENGINE = MergeTree()
ORDER BY log_time;
```
```sql
CREATE TABLE mgbench.logs3 (
log_time DateTime64,
device_id FixedString(15),
device_name LowCardinality(String),
device_type LowCardinality(String),
device_floor UInt8,
event_type LowCardinality(String),
event_unit FixedString(1),
event_value Nullable(Float32)
)
ENGINE = MergeTree()
ORDER BY (event_type, log_time);
```
插入数据:
```
clickhouse-client --query "INSERT INTO mgbench.logs1 FORMAT CSVWithNames" < mgbench1.csv
clickhouse-client --query "INSERT INTO mgbench.logs2 FORMAT CSVWithNames" < mgbench2.csv
clickhouse-client --query "INSERT INTO mgbench.logs3 FORMAT CSVWithNames" < mgbench3.csv
```
## 运行基准查询:
```sql
USE mgbench;
```
```sql
-- Q1.1: 自午夜以来每个 Web 服务器的 CPU/网络利用率是多少?
SELECT machine_name,
MIN(cpu) AS cpu_min,
MAX(cpu) AS cpu_max,
AVG(cpu) AS cpu_avg,
MIN(net_in) AS net_in_min,
MAX(net_in) AS net_in_max,
AVG(net_in) AS net_in_avg,
MIN(net_out) AS net_out_min,
MAX(net_out) AS net_out_max,
AVG(net_out) AS net_out_avg
FROM (
SELECT machine_name,
COALESCE(cpu_user, 0.0) AS cpu,
COALESCE(bytes_in, 0.0) AS net_in,
COALESCE(bytes_out, 0.0) AS net_out
FROM logs1
WHERE machine_name IN ('anansi','aragog','urd')
AND log_time >= TIMESTAMP '2017-01-11 00:00:00'
) AS r
GROUP BY machine_name;
```
```sql
-- Q1.2:最近一天有哪些机房的机器离线?
SELECT machine_name,
log_time
FROM logs1
WHERE (machine_name LIKE 'cslab%' OR
machine_name LIKE 'mslab%')
AND load_one IS NULL
AND log_time >= TIMESTAMP '2017-01-10 00:00:00'
ORDER BY machine_name,
log_time;
```
```sql
-- Q1.3:特定工作站过去 10 天的每小时的平均指标是多少?
SELECT dt,
hr,
AVG(load_fifteen) AS load_fifteen_avg,
AVG(load_five) AS load_five_avg,
AVG(load_one) AS load_one_avg,
AVG(mem_free) AS mem_free_avg,
AVG(swap_free) AS swap_free_avg
FROM (
SELECT CAST(log_time AS DATE) AS dt,
EXTRACT(HOUR FROM log_time) AS hr,
load_fifteen,
load_five,
load_one,
mem_free,
swap_free
FROM logs1
WHERE machine_name = 'babbage'
AND load_fifteen IS NOT NULL
AND load_five IS NOT NULL
AND load_one IS NOT NULL
AND mem_free IS NOT NULL
AND swap_free IS NOT NULL
AND log_time >= TIMESTAMP '2017-01-01 00:00:00'
) AS r
GROUP BY dt,
hr
ORDER BY dt,
hr;
```
```sql
-- Q1.4: 1 个月内,每台服务器的磁盘 I/O 阻塞的频率是多少?
SELECT machine_name,
COUNT(*) AS spikes
FROM logs1
WHERE machine_group = 'Servers'
AND cpu_wio > 0.99
AND log_time >= TIMESTAMP '2016-12-01 00:00:00'
AND log_time < TIMESTAMP '2017-01-01 00:00:00'
GROUP BY machine_name
ORDER BY spikes DESC
LIMIT 10;
```
```sql
-- Q1.5:哪些外部可访问的虚拟机的运行内存不足?
SELECT machine_name,
dt,
MIN(mem_free) AS mem_free_min
FROM (
SELECT machine_name,
CAST(log_time AS DATE) AS dt,
mem_free
FROM logs1
WHERE machine_group = 'DMZ'
AND mem_free IS NOT NULL
) AS r
GROUP BY machine_name,
dt
HAVING MIN(mem_free) < 10000
ORDER BY machine_name,
dt;
```
```sql
-- Q1.6: 每小时所有文件服务器的总网络流量是多少?
SELECT dt,
hr,
SUM(net_in) AS net_in_sum,
SUM(net_out) AS net_out_sum,
SUM(net_in) + SUM(net_out) AS both_sum
FROM (
SELECT CAST(log_time AS DATE) AS dt,
EXTRACT(HOUR FROM log_time) AS hr,
COALESCE(bytes_in, 0.0) / 1000000000.0 AS net_in,
COALESCE(bytes_out, 0.0) / 1000000000.0 AS net_out
FROM logs1
WHERE machine_name IN ('allsorts','andes','bigred','blackjack','bonbon',
'cadbury','chiclets','cotton','crows','dove','fireball','hearts','huey',
'lindt','milkduds','milkyway','mnm','necco','nerds','orbit','peeps',
'poprocks','razzles','runts','smarties','smuggler','spree','stride',
'tootsie','trident','wrigley','york')
) AS r
GROUP BY dt,
hr
ORDER BY both_sum DESC
LIMIT 10;
```
```sql
-- Q2.1:过去 2 周内哪些请求导致了服务器错误?
SELECT *
FROM logs2
WHERE status_code >= 500
AND log_time >= TIMESTAMP '2012-12-18 00:00:00'
ORDER BY log_time;
```
```sql
-- Q2.2:在特定的某 2 周内,用户密码文件是否被泄露了?
SELECT *
FROM logs2
WHERE status_code >= 200
AND status_code < 300
AND request LIKE '%/etc/passwd%'
AND log_time >= TIMESTAMP '2012-05-06 00:00:00'
AND log_time < TIMESTAMP '2012-05-20 00:00:00';
```
```sql
-- Q2.3:过去一个月顶级请求的平均路径深度是多少?
SELECT top_level,
AVG(LENGTH(request) - LENGTH(REPLACE(request, '/', ''))) AS depth_avg
FROM (
SELECT SUBSTRING(request FROM 1 FOR len) AS top_level,
request
FROM (
SELECT POSITION(SUBSTRING(request FROM 2), '/') AS len,
request
FROM logs2
WHERE status_code >= 200
AND status_code < 300
AND log_time >= TIMESTAMP '2012-12-01 00:00:00'
) AS r
WHERE len > 0
) AS s
WHERE top_level IN ('/about','/courses','/degrees','/events',
'/grad','/industry','/news','/people',
'/publications','/research','/teaching','/ugrad')
GROUP BY top_level
ORDER BY top_level;
```
```sql
-- Q2.4:在过去的 3 个月里,哪些客户端发出了过多的请求?
SELECT client_ip,
COUNT(*) AS num_requests
FROM logs2
WHERE log_time >= TIMESTAMP '2012-10-01 00:00:00'
GROUP BY client_ip
HAVING COUNT(*) >= 100000
ORDER BY num_requests DESC;
```
```sql
-- Q2.5:每天的独立访问者数量是多少?
SELECT dt,
COUNT(DISTINCT client_ip)
FROM (
SELECT CAST(log_time AS DATE) AS dt,
client_ip
FROM logs2
) AS r
GROUP BY dt
ORDER BY dt;
```
```sql
-- Q2.6平均和最大数据传输速率Gbps是多少
SELECT AVG(transfer) / 125000000.0 AS transfer_avg,
MAX(transfer) / 125000000.0 AS transfer_max
FROM (
SELECT log_time,
SUM(object_size) AS transfer
FROM logs2
GROUP BY log_time
) AS r;
```
```sql
-- Q3.1:自 2019/11/29 17:00 以来,室温是否达到过冰点?
SELECT *
FROM logs3
WHERE event_type = 'temperature'
AND event_value <= 32.0
AND log_time >= '2019-11-29 17:00:00.000';
```
```sql
-- Q3.4:在过去的 6 个月里,每扇门打开的频率是多少?
SELECT device_name,
device_floor,
COUNT(*) AS ct
FROM logs3
WHERE event_type = 'door_open'
AND log_time >= '2019-06-01 00:00:00.000'
GROUP BY device_name,
device_floor
ORDER BY ct DESC;
```
下面的查询 3.5 使用了 UNION 关键词。设置该模式以便组合 SELECT 的查询结果。该设置仅在未明确指定 UNION ALL 或 UNION DISTINCT 但使用了 UNION 进行共享时使用。
```sql
SET union_default_mode = 'DISTINCT'
```
```sql
-- Q3.5: 在冬季和夏季,建筑物内哪些地方会出现较大的温度变化?
WITH temperature AS (
SELECT dt,
device_name,
device_type,
device_floor
FROM (
SELECT dt,
hr,
device_name,
device_type,
device_floor,
AVG(event_value) AS temperature_hourly_avg
FROM (
SELECT CAST(log_time AS DATE) AS dt,
EXTRACT(HOUR FROM log_time) AS hr,
device_name,
device_type,
device_floor,
event_value
FROM logs3
WHERE event_type = 'temperature'
) AS r
GROUP BY dt,
hr,
device_name,
device_type,
device_floor
) AS s
GROUP BY dt,
device_name,
device_type,
device_floor
HAVING MAX(temperature_hourly_avg) - MIN(temperature_hourly_avg) >= 25.0
)
SELECT DISTINCT device_name,
device_type,
device_floor,
'WINTER'
FROM temperature
WHERE dt >= DATE '2018-12-01'
AND dt < DATE '2019-03-01'
UNION
SELECT DISTINCT device_name,
device_type,
device_floor,
'SUMMER'
FROM temperature
WHERE dt >= DATE '2019-06-01'
AND dt < DATE '2019-09-01';
```
```sql
-- Q3.6:对于每种类别的设备,每月的功耗指标是什么?
SELECT yr,
mo,
SUM(coffee_hourly_avg) AS coffee_monthly_sum,
AVG(coffee_hourly_avg) AS coffee_monthly_avg,
SUM(printer_hourly_avg) AS printer_monthly_sum,
AVG(printer_hourly_avg) AS printer_monthly_avg,
SUM(projector_hourly_avg) AS projector_monthly_sum,
AVG(projector_hourly_avg) AS projector_monthly_avg,
SUM(vending_hourly_avg) AS vending_monthly_sum,
AVG(vending_hourly_avg) AS vending_monthly_avg
FROM (
SELECT dt,
yr,
mo,
hr,
AVG(coffee) AS coffee_hourly_avg,
AVG(printer) AS printer_hourly_avg,
AVG(projector) AS projector_hourly_avg,
AVG(vending) AS vending_hourly_avg
FROM (
SELECT CAST(log_time AS DATE) AS dt,
EXTRACT(YEAR FROM log_time) AS yr,
EXTRACT(MONTH FROM log_time) AS mo,
EXTRACT(HOUR FROM log_time) AS hr,
CASE WHEN device_name LIKE 'coffee%' THEN event_value END AS coffee,
CASE WHEN device_name LIKE 'printer%' THEN event_value END AS printer,
CASE WHEN device_name LIKE 'projector%' THEN event_value END AS projector,
CASE WHEN device_name LIKE 'vending%' THEN event_value END AS vending
FROM logs3
WHERE device_type = 'meter'
) AS r
GROUP BY dt,
yr,
mo,
hr
) AS s
GROUP BY yr,
mo
ORDER BY yr,
mo;
```
此数据集可在 [Playground](https://play.clickhouse.com/play?user=play) 中进行交互式的请求, [example](https://play.clickhouse.com/play?user=play#U0VMRUNUIG1hY2hpbmVfbmFtZSwKICAgICAgIE1JTihjcHUpIEFTIGNwdV9taW4sCiAgICAgICBNQVgoY3B1KSBBUyBjcHVfbWF4LAogICAgICAgQVZHKGNwdSkgQVMgY3B1X2F2ZywKICAgICAgIE1JTihuZXRfaW4pIEFTIG5ldF9pbl9taW4sCiAgICAgICBNQVgobmV0X2luKSBBUyBuZXRfaW5fbWF4LAogICAgICAgQVZHKG5ldF9pbikgQVMgbmV0X2luX2F2ZywKICAgICAgIE1JTihuZXRfb3V0KSBBUyBuZXRfb3V0X21pbiwKICAgICAgIE1BWChuZXRfb3V0KSBBUyBuZXRfb3V0X21heCwKICAgICAgIEFWRyhuZXRfb3V0KSBBUyBuZXRfb3V0X2F2ZwpGUk9NICgKICBTRUxFQ1QgbWFjaGluZV9uYW1lLAogICAgICAgICBDT0FMRVNDRShjcHVfdXNlciwgMC4wKSBBUyBjcHUsCiAgICAgICAgIENPQUxFU0NFKGJ5dGVzX2luLCAwLjApIEFTIG5ldF9pbiwKICAgICAgICAgQ09BTEVTQ0UoYnl0ZXNfb3V0LCAwLjApIEFTIG5ldF9vdXQKICBGUk9NIG1nYmVuY2gubG9nczEKICBXSEVSRSBtYWNoaW5lX25hbWUgSU4gKCdhbmFuc2knLCdhcmFnb2cnLCd1cmQnKQogICAgQU5EIGxvZ190aW1lID49IFRJTUVTVEFNUCAnMjAxNy0wMS0xMSAwMDowMDowMCcKKSBBUyByCkdST1VQIEJZIG1hY2hpbmVfbmFtZQ==).

View File

@ -1,6 +1,6 @@
module github.com/ClickHouse/ClickHouse/programs/diagnostics
go 1.17
go 1.19
require (
github.com/ClickHouse/clickhouse-go/v2 v2.0.12

View File

@ -65,7 +65,6 @@ github.com/Azure/go-autorest/logger v0.2.0/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZ
github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/ClickHouse/clickhouse-go v1.5.3 h1:Vok8zUb/wlqc9u8oEqQzBMBRDoFd8NxPRqgYEqMnV88=
github.com/ClickHouse/clickhouse-go v1.5.3/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI=
github.com/ClickHouse/clickhouse-go/v2 v2.0.12 h1:Nbl/NZwoM6LGJm7smNBgvtdr/rxjlIssSW3eG/Nmb9E=
github.com/ClickHouse/clickhouse-go/v2 v2.0.12/go.mod h1:u4RoNQLLM2W6hNSPYrIESLJqaWSInZVmfM+MlaAhXcg=
@ -457,7 +456,6 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/hashicorp/consul/api v1.11.0/go.mod h1:XjsvQN+RJGWI2TWy1/kqaE16HrR2J/FWgkYjdZQsX9M=
github.com/hashicorp/consul/api v1.12.0/go.mod h1:6pVBMo0ebnYdt2S3H87XhekM/HHrUoTD2XXb/VrZVy0=
github.com/hashicorp/consul/sdk v0.8.0/go.mod h1:GBvyrGALthsZObzUGsfgHZQDXjg4lOjagTIwIR1vPms=
github.com/hashicorp/errwrap v0.0.0-20141028054710-7554cd9344ce/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
@ -663,9 +661,7 @@ github.com/paulmach/protoscan v0.2.1-0.20210522164731-4e53c6875432/go.mod h1:2sV
github.com/pelletier/go-toml v1.9.4 h1:tjENF6MfZAg8e4ZmZTeWaWiT2vXtsoO6+iuOjFhECwM=
github.com/pelletier/go-toml v1.9.4/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c=
github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU=
github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I=
github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY=
github.com/pierrec/lz4/v4 v4.1.12/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE=
github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4=
github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@ -717,7 +713,6 @@ github.com/russross/blackfriday/v2 v2.1.0/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQD
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/safchain/ethtool v0.0.0-20190326074333-42ed695e3de8/go.mod h1:Z0q5wiBQGYcxhMZ6gUqHn6pYNLypFAvaL3UvgZLR0U4=
github.com/sagikazarmark/crypt v0.3.0/go.mod h1:uD/D+6UF4SrIR1uGEv7bBNkNqLGqUr43MRiaGWX1Nig=
github.com/sagikazarmark/crypt v0.4.0/go.mod h1:ALv2SRj7GxYV4HO9elxH9nS6M9gW+xDNxqmyJ6RfDFM=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
@ -1083,7 +1078,6 @@ golang.org/x/sys v0.0.0-20211109184856-51b60fd695b3/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20211110154304-99a53858aa08/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211124211545-fe61309f8881/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211205182925-97ca703d548d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211210111614-af8b64212486/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9 h1:XfKQ4OlFl8okEOr5UvAqFRVj8pY/4yfcXrddB8qAbU0=
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@ -1202,7 +1196,6 @@ google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdr
google.golang.org/api v0.59.0/go.mod h1:sT2boj7M9YJxZzgeZqXogmhfmRWDtPzT31xkieUbuZU=
google.golang.org/api v0.61.0/go.mod h1:xQRti5UdCmoCEqFxcz93fTl338AVqDgyaDRuOZ3hg9I=
google.golang.org/api v0.62.0/go.mod h1:dKmwPCydfsad4qCH08MSdgWjfHOyfpd4VtDGgRFdavw=
google.golang.org/api v0.63.0/go.mod h1:gs4ij2ffTRXwuzzgJl/56BdwJaA194ijkfn++9tDuPo=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.5.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=

View File

@ -82,13 +82,17 @@
#if USE_BORINGSSL
#include <Compression/CompressionCodecEncrypted.h>
#endif
#include <Server/HTTP/HTTPServerConnectionFactory.h>
#include <Server/MySQLHandlerFactory.h>
#include <Server/PostgreSQLHandlerFactory.h>
#include <Server/ProxyV1HandlerFactory.h>
#include <Server/TLSHandlerFactory.h>
#include <Server/CertificateReloader.h>
#include <Server/ProtocolServerAdapter.h>
#include <Server/HTTP/HTTPServer.h>
#include <Interpreters/AsynchronousInsertQueue.h>
#include <filesystem>
#include <unordered_set>
#include "config.h"
#include "config_version.h"
@ -387,7 +391,16 @@ bool getListenTry(const Poco::Util::AbstractConfiguration & config)
{
bool listen_try = config.getBool("listen_try", false);
if (!listen_try)
listen_try = DB::getMultipleValuesFromConfig(config, "", "listen_host").empty();
{
Poco::Util::AbstractConfiguration::Keys protocols;
config.keys("protocols", protocols);
listen_try =
DB::getMultipleValuesFromConfig(config, "", "listen_host").empty() &&
std::none_of(protocols.begin(), protocols.end(), [&](const auto & protocol)
{
return config.has("protocols." + protocol + ".host") && config.has("protocols." + protocol + ".port");
});
}
return listen_try;
}
@ -1837,6 +1850,82 @@ int Server::main(const std::vector<std::string> & /*args*/)
return Application::EXIT_OK;
}
std::unique_ptr<TCPProtocolStackFactory> Server::buildProtocolStackFromConfig(
const Poco::Util::AbstractConfiguration & config,
const std::string & protocol,
Poco::Net::HTTPServerParams::Ptr http_params,
AsynchronousMetrics & async_metrics,
bool & is_secure)
{
auto create_factory = [&](const std::string & type, const std::string & conf_name) -> TCPServerConnectionFactory::Ptr
{
if (type == "tcp")
return TCPServerConnectionFactory::Ptr(new TCPHandlerFactory(*this, false, false));
if (type == "tls")
#if USE_SSL
return TCPServerConnectionFactory::Ptr(new TLSHandlerFactory(*this, conf_name));
#else
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
if (type == "proxy1")
return TCPServerConnectionFactory::Ptr(new ProxyV1HandlerFactory(*this, conf_name));
if (type == "mysql")
return TCPServerConnectionFactory::Ptr(new MySQLHandlerFactory(*this));
if (type == "postgres")
return TCPServerConnectionFactory::Ptr(new PostgreSQLHandlerFactory(*this));
if (type == "http")
return TCPServerConnectionFactory::Ptr(
new HTTPServerConnectionFactory(context(), http_params, createHandlerFactory(*this, config, async_metrics, "HTTPHandler-factory"))
);
if (type == "prometheus")
return TCPServerConnectionFactory::Ptr(
new HTTPServerConnectionFactory(context(), http_params, createHandlerFactory(*this, config, async_metrics, "PrometheusHandler-factory"))
);
if (type == "interserver")
return TCPServerConnectionFactory::Ptr(
new HTTPServerConnectionFactory(context(), http_params, createHandlerFactory(*this, config, async_metrics, "InterserverIOHTTPHandler-factory"))
);
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol configuration error, unknown protocol name '{}'", type);
};
std::string conf_name = "protocols." + protocol;
std::string prefix = conf_name + ".";
std::unordered_set<std::string> pset {conf_name};
auto stack = std::make_unique<TCPProtocolStackFactory>(*this, conf_name);
while (true)
{
// if there is no "type" - it's a reference to another protocol and this is just an endpoint
if (config.has(prefix + "type"))
{
std::string type = config.getString(prefix + "type");
if (type == "tls")
{
if (is_secure)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' contains more than one TLS layer", protocol);
is_secure = true;
}
stack->append(create_factory(type, conf_name));
}
if (!config.has(prefix + "impl"))
break;
conf_name = "protocols." + config.getString(prefix + "impl");
prefix = conf_name + ".";
if (!pset.insert(conf_name).second)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' configuration contains a loop on '{}'", protocol, conf_name);
}
return stack;
}
void Server::createServers(
Poco::Util::AbstractConfiguration & config,
@ -1855,6 +1944,55 @@ void Server::createServers(
http_params->setTimeout(settings.http_receive_timeout);
http_params->setKeepAliveTimeout(keep_alive_timeout);
Poco::Util::AbstractConfiguration::Keys protocols;
config.keys("protocols", protocols);
for (const auto & protocol : protocols)
{
std::vector<std::string> hosts;
if (config.has("protocols." + protocol + ".host"))
hosts.push_back(config.getString("protocols." + protocol + ".host"));
else
hosts = listen_hosts;
for (const auto & host : hosts)
{
std::string conf_name = "protocols." + protocol;
std::string prefix = conf_name + ".";
if (!config.has(prefix + "port"))
continue;
std::string description {"<undefined> protocol"};
if (config.has(prefix + "description"))
description = config.getString(prefix + "description");
std::string port_name = prefix + "port";
bool is_secure = false;
auto stack = buildProtocolStackFromConfig(config, protocol, http_params, async_metrics, is_secure);
if (stack->empty())
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' stack empty", protocol);
createServer(config, host, port_name.c_str(), listen_try, start_servers, servers, [&](UInt16 port) -> ProtocolServerAdapter
{
Poco::Net::ServerSocket socket;
auto address = socketBindListen(config, socket, host, port, is_secure);
socket.setReceiveTimeout(settings.receive_timeout);
socket.setSendTimeout(settings.send_timeout);
return ProtocolServerAdapter(
host,
port_name.c_str(),
description + ": " + address.toString(),
std::make_unique<TCPServer>(
stack.release(),
server_pool,
socket,
new Poco::Net::TCPServerParams));
});
}
}
for (const auto & listen_host : listen_hosts)
{
/// HTTP
@ -2102,13 +2240,50 @@ void Server::updateServers(
{
if (!server.isStopping())
{
bool has_host = std::find(listen_hosts.begin(), listen_hosts.end(), server.getListenHost()) != listen_hosts.end();
bool has_port = !config.getString(server.getPortName(), "").empty();
std::string port_name = server.getPortName();
bool has_host = false;
bool is_http = false;
if (port_name.starts_with("protocols."))
{
std::string protocol = port_name.substr(0, port_name.find_last_of('.'));
has_host = config.has(protocol + ".host");
/// NOTE: better to compare using getPortName() over using
/// dynamic_cast<> since HTTPServer is also used for prometheus and
/// internal replication communications.
bool is_http = server.getPortName() == "http_port" || server.getPortName() == "https_port";
std::string conf_name = protocol;
std::string prefix = protocol + ".";
std::unordered_set<std::string> pset {conf_name};
while (true)
{
if (config.has(prefix + "type"))
{
std::string type = config.getString(prefix + "type");
if (type == "http")
{
is_http = true;
break;
}
}
if (!config.has(prefix + "impl"))
break;
conf_name = "protocols." + config.getString(prefix + "impl");
prefix = conf_name + ".";
if (!pset.insert(conf_name).second)
throw Exception(ErrorCodes::INVALID_CONFIG_PARAMETER, "Protocol '{}' configuration contains a loop on '{}'", protocol, conf_name);
}
}
else
{
/// NOTE: better to compare using getPortName() over using
/// dynamic_cast<> since HTTPServer is also used for prometheus and
/// internal replication communications.
is_http = server.getPortName() == "http_port" || server.getPortName() == "https_port";
}
if (!has_host)
has_host = std::find(listen_hosts.begin(), listen_hosts.end(), server.getListenHost()) != listen_hosts.end();
bool has_port = !config.getString(port_name, "").empty();
bool force_restart = is_http && !isSameConfiguration(previous_config, config, "http_handlers");
if (force_restart)
LOG_TRACE(log, "<http_handlers> had been changed, will reload {}", server.getDescription());

View File

@ -3,6 +3,8 @@
#include <Server/IServer.h>
#include <Daemon/BaseDaemon.h>
#include <Server/TCPProtocolStackFactory.h>
#include <Poco/Net/HTTPServerParams.h>
/** Server provides three interfaces:
* 1. HTTP - simple interface for any applications.
@ -77,6 +79,13 @@ private:
UInt16 port,
[[maybe_unused]] bool secure = false) const;
std::unique_ptr<TCPProtocolStackFactory> buildProtocolStackFromConfig(
const Poco::Util::AbstractConfiguration & config,
const std::string & protocol,
Poco::Net::HTTPServerParams::Ptr http_params,
AsynchronousMetrics & async_metrics,
bool & is_secure);
using CreateServerFunc = std::function<ProtocolServerAdapter(UInt16)>;
void createServer(
Poco::Util::AbstractConfiguration & config,

View File

@ -2,6 +2,9 @@
#include "config.h"
#include <memory>
#include <string>
#include <Columns/ColumnMap.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
@ -33,14 +36,18 @@ class FunctionShowCertificate : public IFunction
public:
static constexpr auto name = "showCertificate";
static FunctionPtr create(ContextPtr)
static FunctionPtr create(ContextPtr ctx)
{
#if !defined(USE_SSL) || USE_SSL == 0
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED, "SSL support is disabled");
#endif
return std::make_shared<FunctionShowCertificate>();
return std::make_shared<FunctionShowCertificate>(ctx->getQueryContext()->getClientInfo().certificate);
}
std::string certificate;
explicit FunctionShowCertificate(const std::string & certificate_ = "") : certificate(certificate_) {}
String getName() const override { return name; }
size_t getNumberOfArguments() const override { return 0; }
@ -61,7 +68,15 @@ public:
if (input_rows_count)
{
#if USE_SSL
if (const X509 * cert = SSL_CTX_get0_certificate(Poco::Net::SSLManager::instance().defaultServerContext()->sslContext()))
std::unique_ptr<Poco::Crypto::X509Certificate> x509_cert;
if (!certificate.empty())
x509_cert = std::make_unique<Poco::Crypto::X509Certificate>(certificate);
const X509 * cert = x509_cert ?
x509_cert->certificate() :
SSL_CTX_get0_certificate(Poco::Net::SSLManager::instance().defaultServerContext()->sslContext());
if (cert)
{
BIO * b = BIO_new(BIO_s_mem());
SCOPE_EXIT(

View File

@ -69,6 +69,7 @@ public:
Interface interface = Interface::TCP;
bool is_secure = false;
String certificate;
/// For tcp
String os_user;

View File

@ -244,7 +244,7 @@ void Session::shutdownNamedSessions()
NamedSessionsStorage::instance().shutdown();
}
Session::Session(const ContextPtr & global_context_, ClientInfo::Interface interface_, bool is_secure)
Session::Session(const ContextPtr & global_context_, ClientInfo::Interface interface_, bool is_secure, const std::string & certificate)
: auth_id(UUIDHelpers::generateV4()),
global_context(global_context_),
log(&Poco::Logger::get(String{magic_enum::enum_name(interface_)} + "-Session"))
@ -252,6 +252,7 @@ Session::Session(const ContextPtr & global_context_, ClientInfo::Interface inter
prepared_client_info.emplace();
prepared_client_info->interface = interface_;
prepared_client_info->is_secure = is_secure;
prepared_client_info->certificate = certificate;
}
Session::~Session()

View File

@ -32,7 +32,7 @@ public:
/// Stops using named sessions. The method must be called at the server shutdown.
static void shutdownNamedSessions();
Session(const ContextPtr & global_context_, ClientInfo::Interface interface_, bool is_secure = false);
Session(const ContextPtr & global_context_, ClientInfo::Interface interface_, bool is_secure = false, const std::string & certificate = "");
~Session();
Session(const Session &&) = delete;

View File

@ -582,10 +582,28 @@ static std::tuple<ASTPtr, BlockIO> executeQueryImpl(
std::shared_ptr<const EnabledQuota> quota;
std::unique_ptr<IInterpreter> interpreter;
bool async_insert = false;
auto * queue = context->getAsynchronousInsertQueue();
const bool async_insert = queue
&& insert_query && !insert_query->select
&& insert_query->hasInlinedData() && settings.async_insert;
if (insert_query && settings.async_insert)
{
String reason;
if (!queue)
reason = "asynchronous insert queue is not configured";
else if (insert_query->select)
reason = "insert query has select";
else if (!insert_query->hasInlinedData())
reason = "insert query doesn't have inlined data";
else
async_insert = true;
if (!async_insert)
{
LOG_DEBUG(&Poco::Logger::get("executeQuery"),
"Setting async_insert=1, but INSERT query will be executed synchronously (reason: {})", reason);
}
}
if (async_insert)
{

View File

@ -0,0 +1,123 @@
#include <Server/ProxyV1Handler.h>
#include <Poco/Net/NetException.h>
#include <Common/NetException.h>
#include <Interpreters/Context.h>
namespace DB
{
namespace ErrorCodes
{
extern const int NETWORK_ERROR;
extern const int SOCKET_TIMEOUT;
extern const int CANNOT_READ_FROM_SOCKET;
extern const int CANNOT_PARSE_INPUT_ASSERTION_FAILED;
}
void ProxyV1Handler::run()
{
const auto & settings = server.context()->getSettingsRef();
socket().setReceiveTimeout(settings.receive_timeout);
std::string word;
bool eol;
// Read PROXYv1 protocol header
// http://www.haproxy.org/download/1.8/doc/proxy-protocol.txt
// read "PROXY"
if (!readWord(5, word, eol) || word != "PROXY" || eol)
throw ParsingException("PROXY protocol violation", ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED);
// read "TCP4" or "TCP6" or "UNKNOWN"
if (!readWord(7, word, eol))
throw ParsingException("PROXY protocol violation", ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED);
if (word != "TCP4" && word != "TCP6" && word != "UNKNOWN")
throw ParsingException("PROXY protocol violation", ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED);
if (word == "UNKNOWN" && eol)
return;
if (eol)
throw ParsingException("PROXY protocol violation", ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED);
// read address
if (!readWord(39, word, eol) || eol)
throw ParsingException("PROXY protocol violation", ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED);
stack_data.forwarded_for = std::move(word);
// read address
if (!readWord(39, word, eol) || eol)
throw ParsingException("PROXY protocol violation", ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED);
// read port
if (!readWord(5, word, eol) || eol)
throw ParsingException("PROXY protocol violation", ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED);
// read port and "\r\n"
if (!readWord(5, word, eol) || !eol)
throw ParsingException("PROXY protocol violation", ErrorCodes::CANNOT_PARSE_INPUT_ASSERTION_FAILED);
}
bool ProxyV1Handler::readWord(int max_len, std::string & word, bool & eol)
{
word.clear();
eol = false;
char ch = 0;
int n = 0;
bool is_cr = false;
try
{
for (++max_len; max_len > 0 || is_cr; --max_len)
{
n = socket().receiveBytes(&ch, 1);
if (n == 0)
{
socket().shutdown();
return false;
}
if (n < 0)
break;
if (is_cr)
return ch == 0x0A;
if (ch == 0x0D)
{
is_cr = true;
eol = true;
continue;
}
if (ch == ' ')
return true;
word.push_back(ch);
}
}
catch (const Poco::Net::NetException & e)
{
throw NetException(e.displayText() + ", while reading from socket (" + socket().peerAddress().toString() + ")", ErrorCodes::NETWORK_ERROR);
}
catch (const Poco::TimeoutException &)
{
throw NetException(fmt::format("Timeout exceeded while reading from socket ({}, {} ms)",
socket().peerAddress().toString(),
socket().getReceiveTimeout().totalMilliseconds()), ErrorCodes::SOCKET_TIMEOUT);
}
catch (const Poco::IOException & e)
{
throw NetException(e.displayText() + ", while reading from socket (" + socket().peerAddress().toString() + ")", ErrorCodes::NETWORK_ERROR);
}
if (n < 0)
throw NetException("Cannot read from socket (" + socket().peerAddress().toString() + ")", ErrorCodes::CANNOT_READ_FROM_SOCKET);
return false;
}
}

View File

@ -0,0 +1,29 @@
#pragma once
#include <Poco/Net/TCPServerConnection.h>
#include <Server/IServer.h>
#include <Server/TCPProtocolStackData.h>
namespace DB
{
class ProxyV1Handler : public Poco::Net::TCPServerConnection
{
using StreamSocket = Poco::Net::StreamSocket;
public:
explicit ProxyV1Handler(const StreamSocket & socket, IServer & server_, const std::string & conf_name_, TCPProtocolStackData & stack_data_)
: Poco::Net::TCPServerConnection(socket), server(server_), conf_name(conf_name_), stack_data(stack_data_) {}
void run() override;
protected:
bool readWord(int max_len, std::string & word, bool & eol);
private:
IServer & server;
std::string conf_name;
TCPProtocolStackData & stack_data;
};
}

View File

@ -0,0 +1,56 @@
#pragma once
#include <Poco/Net/NetException.h>
#include <Poco/Net/TCPServerConnection.h>
#include <Server/ProxyV1Handler.h>
#include <Common/logger_useful.h>
#include <Server/IServer.h>
#include <Server/TCPServer.h>
#include <Server/TCPProtocolStackData.h>
namespace DB
{
class ProxyV1HandlerFactory : public TCPServerConnectionFactory
{
private:
IServer & server;
Poco::Logger * log;
std::string conf_name;
class DummyTCPHandler : public Poco::Net::TCPServerConnection
{
public:
using Poco::Net::TCPServerConnection::TCPServerConnection;
void run() override {}
};
public:
explicit ProxyV1HandlerFactory(IServer & server_, const std::string & conf_name_)
: server(server_), log(&Poco::Logger::get("ProxyV1HandlerFactory")), conf_name(conf_name_)
{
}
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) override
{
TCPProtocolStackData stack_data;
return createConnection(socket, tcp_server, stack_data);
}
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer &/* tcp_server*/, TCPProtocolStackData & stack_data) override
{
try
{
LOG_TRACE(log, "TCP Request. Address: {}", socket.peerAddress().toString());
return new ProxyV1Handler(socket, server, conf_name, stack_data);
}
catch (const Poco::Net::NetException &)
{
LOG_TRACE(log, "TCP Request. Client is not connected (most likely RST packet was sent).");
return new DummyTCPHandler(socket);
}
}
};
}

View File

@ -109,6 +109,18 @@ TCPHandler::TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::N
{
}
TCPHandler::TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, TCPProtocolStackData & stack_data, std::string server_display_name_)
: Poco::Net::TCPServerConnection(socket_)
, server(server_)
, tcp_server(tcp_server_)
, log(&Poco::Logger::get("TCPHandler"))
, forwarded_for(stack_data.forwarded_for)
, certificate(stack_data.certificate)
, default_database(stack_data.default_database)
, server_display_name(std::move(server_display_name_))
{
}
TCPHandler::~TCPHandler()
{
try
@ -1060,7 +1072,7 @@ std::unique_ptr<Session> TCPHandler::makeSession()
{
auto interface = is_interserver_mode ? ClientInfo::Interface::TCP_INTERSERVER : ClientInfo::Interface::TCP;
auto res = std::make_unique<Session>(server.context(), interface, socket().secure());
auto res = std::make_unique<Session>(server.context(), interface, socket().secure(), certificate);
auto & client_info = res->getClientInfo();
client_info.forwarded_for = forwarded_for;
@ -1087,6 +1099,7 @@ void TCPHandler::receiveHello()
UInt64 packet_type = 0;
String user;
String password;
String default_db;
readVarUInt(packet_type, *in);
if (packet_type != Protocol::Client::Hello)
@ -1108,7 +1121,9 @@ void TCPHandler::receiveHello()
readVarUInt(client_version_minor, *in);
// NOTE For backward compatibility of the protocol, client cannot send its version_patch.
readVarUInt(client_tcp_protocol_version, *in);
readStringBinary(default_database, *in);
readStringBinary(default_db, *in);
if (!default_db.empty())
default_database = default_db;
readStringBinary(user, *in);
readStringBinary(password, *in);

View File

@ -22,6 +22,7 @@
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
#include "IServer.h"
#include "Server/TCPProtocolStackData.h"
#include "base/types.h"
@ -137,6 +138,7 @@ public:
* Proxy-forwarded (original client) IP address is used for quota accounting if quota is keyed by forwarded IP.
*/
TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, bool parse_proxy_protocol_, std::string server_display_name_);
TCPHandler(IServer & server_, TCPServer & tcp_server_, const Poco::Net::StreamSocket & socket_, TCPProtocolStackData & stack_data, std::string server_display_name_);
~TCPHandler() override;
void run() override;
@ -151,6 +153,7 @@ private:
Poco::Logger * log;
String forwarded_for;
String certificate;
String client_name;
UInt64 client_version_major = 0;

View File

@ -3,6 +3,7 @@
#include <Poco/Net/NetException.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Common/logger_useful.h>
#include "Server/TCPProtocolStackData.h"
#include <Server/IServer.h>
#include <Server/TCPHandler.h>
#include <Server/TCPServerConnectionFactory.h>
@ -53,6 +54,21 @@ public:
return new DummyTCPHandler(socket);
}
}
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server, TCPProtocolStackData & stack_data) override
{
try
{
LOG_TRACE(log, "TCP Request. Address: {}", socket.peerAddress().toString());
return new TCPHandler(server, tcp_server, socket, stack_data, server_display_name);
}
catch (const Poco::Net::NetException &)
{
LOG_TRACE(log, "TCP Request. Client is not connected (most likely RST packet was sent).");
return new DummyTCPHandler(socket);
}
}
};
}

View File

@ -0,0 +1,22 @@
#pragma once
#include <string>
#include <Poco/Net/StreamSocket.h>
namespace DB
{
// Data to communicate between protocol layers
struct TCPProtocolStackData
{
// socket implementation can be replaced by some layer - TLS as an example
Poco::Net::StreamSocket socket;
// host from PROXY layer
std::string forwarded_for;
// certificate path from TLS layer to TCP layer
std::string certificate;
// default database from endpoint configuration to TCP layer
std::string default_database;
};
}

View File

@ -0,0 +1,92 @@
#pragma once
#include <Server/TCPServerConnectionFactory.h>
#include <Server/IServer.h>
#include <Server/TCPProtocolStackHandler.h>
#include <Poco/Logger.h>
#include <Poco/Net/NetException.h>
#include <Common/logger_useful.h>
#include <Access/Common/AllowedClientHosts.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_ADDRESS_PATTERN_TYPE;
extern const int IP_ADDRESS_NOT_ALLOWED;
}
class TCPProtocolStackFactory : public TCPServerConnectionFactory
{
private:
IServer & server [[maybe_unused]];
Poco::Logger * log;
std::string conf_name;
std::vector<TCPServerConnectionFactory::Ptr> stack;
AllowedClientHosts allowed_client_hosts;
class DummyTCPHandler : public Poco::Net::TCPServerConnection
{
public:
using Poco::Net::TCPServerConnection::TCPServerConnection;
void run() override {}
};
public:
template <typename... T>
explicit TCPProtocolStackFactory(IServer & server_, const std::string & conf_name_, T... factory)
: server(server_), log(&Poco::Logger::get("TCPProtocolStackFactory")), conf_name(conf_name_), stack({factory...})
{
const auto & config = server.config();
/// Fill list of allowed hosts.
const auto networks_config = conf_name + ".networks";
if (config.has(networks_config))
{
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(networks_config, keys);
for (const String & key : keys)
{
String value = config.getString(networks_config + "." + key);
if (key.starts_with("ip"))
allowed_client_hosts.addSubnet(value);
else if (key.starts_with("host_regexp"))
allowed_client_hosts.addNameRegexp(value);
else if (key.starts_with("host"))
allowed_client_hosts.addName(value);
else
throw Exception("Unknown address pattern type: " + key, ErrorCodes::UNKNOWN_ADDRESS_PATTERN_TYPE);
}
}
}
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) override
{
if (!allowed_client_hosts.empty() && !allowed_client_hosts.contains(socket.peerAddress().host()))
throw Exception("Connections from " + socket.peerAddress().toString() + " are not allowed", ErrorCodes::IP_ADDRESS_NOT_ALLOWED);
try
{
LOG_TRACE(log, "TCP Request. Address: {}", socket.peerAddress().toString());
return new TCPProtocolStackHandler(server, tcp_server, socket, stack, conf_name);
}
catch (const Poco::Net::NetException &)
{
LOG_TRACE(log, "TCP Request. Client is not connected (most likely RST packet was sent).");
return new DummyTCPHandler(socket);
}
}
void append(TCPServerConnectionFactory::Ptr factory)
{
stack.push_back(std::move(factory));
}
size_t size() { return stack.size(); }
bool empty() { return stack.empty(); }
};
}

View File

@ -0,0 +1,46 @@
#pragma once
#include <Server/TCPServerConnectionFactory.h>
#include <Server/TCPServer.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Server/IServer.h>
#include <Server/TCPProtocolStackData.h>
namespace DB
{
class TCPProtocolStackHandler : public Poco::Net::TCPServerConnection
{
using StreamSocket = Poco::Net::StreamSocket;
using TCPServerConnection = Poco::Net::TCPServerConnection;
private:
IServer & server;
TCPServer & tcp_server;
std::vector<TCPServerConnectionFactory::Ptr> stack;
std::string conf_name;
public:
TCPProtocolStackHandler(IServer & server_, TCPServer & tcp_server_, const StreamSocket & socket, const std::vector<TCPServerConnectionFactory::Ptr> & stack_, const std::string & conf_name_)
: TCPServerConnection(socket), server(server_), tcp_server(tcp_server_), stack(stack_), conf_name(conf_name_)
{}
void run() override
{
const auto & conf = server.config();
TCPProtocolStackData stack_data;
stack_data.socket = socket();
stack_data.default_database = conf.getString(conf_name + ".default_database", "");
for (auto & factory : stack)
{
std::unique_ptr<TCPServerConnection> connection(factory->createConnection(socket(), tcp_server, stack_data));
connection->run();
if (stack_data.socket != socket())
socket() = stack_data.socket;
}
}
};
}

View File

@ -1,6 +1,7 @@
#pragma once
#include <Poco/SharedPtr.h>
#include <Server/TCPProtocolStackData.h>
namespace Poco
{
@ -23,5 +24,9 @@ public:
/// Same as Poco::Net::TCPServerConnectionFactory except we can pass the TCPServer
virtual Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) = 0;
virtual Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server, TCPProtocolStackData &/* stack_data */)
{
return createConnection(socket, tcp_server);
}
};
}

59
src/Server/TLSHandler.h Normal file
View File

@ -0,0 +1,59 @@
#pragma once
#include <Poco/Net/TCPServerConnection.h>
#include <Poco/SharedPtr.h>
#include <Common/Exception.h>
#include <Server/TCPProtocolStackData.h>
#if USE_SSL
# include <Poco/Net/Context.h>
# include <Poco/Net/SecureStreamSocket.h>
# include <Poco/Net/SSLManager.h>
#endif
namespace DB
{
namespace ErrorCodes
{
extern const int SUPPORT_IS_DISABLED;
}
class TLSHandler : public Poco::Net::TCPServerConnection
{
#if USE_SSL
using SecureStreamSocket = Poco::Net::SecureStreamSocket;
using SSLManager = Poco::Net::SSLManager;
using Context = Poco::Net::Context;
#endif
using StreamSocket = Poco::Net::StreamSocket;
public:
explicit TLSHandler(const StreamSocket & socket, const std::string & key_, const std::string & certificate_, TCPProtocolStackData & stack_data_)
: Poco::Net::TCPServerConnection(socket)
, key(key_)
, certificate(certificate_)
, stack_data(stack_data_)
{}
void run() override
{
#if USE_SSL
auto ctx = SSLManager::instance().defaultServerContext();
if (!key.empty() && !certificate.empty())
ctx = new Context(Context::Usage::SERVER_USE, key, certificate, ctx->getCAPaths().caLocation);
socket() = SecureStreamSocket::attach(socket(), ctx);
stack_data.socket = socket();
stack_data.certificate = certificate;
#else
throw Exception{"SSL support for TCP protocol is disabled because Poco library was built without NetSSL support.",
ErrorCodes::SUPPORT_IS_DISABLED};
#endif
}
private:
std::string key [[maybe_unused]];
std::string certificate [[maybe_unused]];
TCPProtocolStackData & stack_data [[maybe_unused]];
};
}

View File

@ -0,0 +1,64 @@
#pragma once
#include <Poco/Logger.h>
#include <Poco/Net/TCPServerConnection.h>
#include <Poco/Net/NetException.h>
#include <Poco/Util/LayeredConfiguration.h>
#include <Server/TLSHandler.h>
#include <Server/IServer.h>
#include <Server/TCPServer.h>
#include <Server/TCPProtocolStackData.h>
#include <Common/logger_useful.h>
namespace DB
{
class TLSHandlerFactory : public TCPServerConnectionFactory
{
private:
IServer & server;
Poco::Logger * log;
std::string conf_name;
class DummyTCPHandler : public Poco::Net::TCPServerConnection
{
public:
using Poco::Net::TCPServerConnection::TCPServerConnection;
void run() override {}
};
public:
explicit TLSHandlerFactory(IServer & server_, const std::string & conf_name_)
: server(server_), log(&Poco::Logger::get("TLSHandlerFactory")), conf_name(conf_name_)
{
}
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer & tcp_server) override
{
TCPProtocolStackData stack_data;
return createConnection(socket, tcp_server, stack_data);
}
Poco::Net::TCPServerConnection * createConnection(const Poco::Net::StreamSocket & socket, TCPServer &/* tcp_server*/, TCPProtocolStackData & stack_data) override
{
try
{
LOG_TRACE(log, "TCP Request. Address: {}", socket.peerAddress().toString());
return new TLSHandler(
socket,
server.config().getString(conf_name + ".privateKeyFile", ""),
server.config().getString(conf_name + ".certificateFile", ""),
stack_data);
}
catch (const Poco::Net::NetException &)
{
LOG_TRACE(log, "TCP Request. Client is not connected (most likely RST packet was sent).");
return new DummyTCPHandler(socket);
}
}
};
}

View File

@ -220,8 +220,11 @@ getColumnsForNewDataPart(
if (!isWidePart(source_part))
return {updated_header.getNamesAndTypesList(), new_serialization_infos};
Names source_column_names = source_part->getColumns().getNames();
NameSet source_columns_name_set(source_column_names.begin(), source_column_names.end());
const auto & source_columns = source_part->getColumns();
std::unordered_map<String, DataTypePtr> source_columns_name_to_type;
for (const auto & it : source_columns)
source_columns_name_to_type[it.name] = it.type;
for (auto it = storage_columns.begin(); it != storage_columns.end();)
{
if (updated_header.has(it->name))
@ -233,14 +236,25 @@ getColumnsForNewDataPart(
}
else
{
if (!source_columns_name_set.contains(it->name))
auto source_col = source_columns_name_to_type.find(it->name);
if (source_col == source_columns_name_to_type.end())
{
/// Source part doesn't have column but some other column
/// was renamed to it's name.
auto renamed_it = renamed_columns_to_from.find(it->name);
if (renamed_it != renamed_columns_to_from.end()
&& source_columns_name_set.contains(renamed_it->second))
++it;
if (renamed_it != renamed_columns_to_from.end())
{
source_col = source_columns_name_to_type.find(renamed_it->second);
if (source_col == source_columns_name_to_type.end())
it = storage_columns.erase(it);
else
{
/// Take a type from source part column.
/// It may differ from column type in storage.
it->type = source_col->second;
++it;
}
}
else
it = storage_columns.erase(it);
}
@ -262,7 +276,12 @@ getColumnsForNewDataPart(
if (!renamed_columns_to_from.contains(it->name) && (was_renamed || was_removed))
it = storage_columns.erase(it);
else
{
/// Take a type from source part column.
/// It may differ from column type in storage.
it->type = source_col->second;
++it;
}
}
}
}

View File

@ -315,7 +315,12 @@ void ReplicatedMergeTreeSink::commitPart(
DataPartStorageBuilderPtr builder,
size_t replicas_num)
{
metadata_snapshot->check(part->getColumns());
/// It is possible that we alter a part with different types of source columns.
/// In this case, if column was not altered, the result type will be different with what we have in metadata.
/// For now, consider it is ok. See 02461_alter_update_respect_part_column_type_bug for an example.
///
/// metadata_snapshot->check(part->getColumns());
assertSessionIsNotExpired(zookeeper);
String temporary_part_relative_path = part->data_part_storage->getPartDirectory();

View File

@ -26,6 +26,8 @@
#include <Processors/QueryPlan/ExpressionStep.h>
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Processors/Transforms/PartialSortingTransform.h>
#include <Processors/Sinks/SinkToStorage.h>
#include <Processors/ISource.h>
#include <Processors/QueryPlan/QueryPlan.h>
@ -334,6 +336,14 @@ void StorageBuffer::read(
pipes_from_buffers.emplace_back(std::make_shared<BufferSource>(column_names, buf, storage_snapshot));
pipe_from_buffers = Pipe::unitePipes(std::move(pipes_from_buffers));
if (query_info.getInputOrderInfo())
{
/// Each buffer has one block, and it not guaranteed that rows in each block are sorted by order keys
pipe_from_buffers.addSimpleTransform([&](const Block & header)
{
return std::make_shared<PartialSortingTransform>(header, query_info.getInputOrderInfo()->sort_description_for_merging, 0);
});
}
}
if (pipe_from_buffers.empty())

View File

@ -8,7 +8,14 @@ DEFAULT_QUERY_TIMEOUT = 600
class Client:
def __init__(self, host, port=9000, command="/usr/bin/clickhouse-client"):
def __init__(
self,
host,
port=9000,
command="/usr/bin/clickhouse-client",
secure=False,
config=None,
):
self.host = host
self.port = port
self.command = [command]
@ -16,6 +23,11 @@ class Client:
if os.path.basename(command) == "clickhouse":
self.command.append("client")
if secure:
self.command.append("--secure")
if config is not None:
self.command += ["--config-file", config]
self.command += ["--host", self.host, "--port", str(self.port), "--stacktrace"]
def stacktraces_on_timeout_decorator(func):

View File

@ -0,0 +1,10 @@
<clickhouse>
<openSSL>
<client>
<verificationMode>none</verificationMode>
<invalidCertificateHandler>
<name>AcceptCertificateHandler</name>
</invalidCertificateHandler>
</client>
</openSSL>
</clickhouse>

View File

@ -0,0 +1,63 @@
<clickhouse>
<!-- Used with https_port and tcp_port_secure. Full ssl options list: https://github.com/ClickHouse-Extras/poco/blob/master/NetSSL_OpenSSL/include/Poco/Net/SSLManager.h#L71 -->
<openSSL>
<server> <!-- Used for https server AND secure tcp port -->
<!-- openssl req -subj "/CN=localhost" -new -newkey rsa:2048 -days 365 -nodes -x509 -keyout /etc/clickhouse-server/server.key -out /etc/clickhouse-server/server.crt -->
<certificateFile>/etc/clickhouse-server/config.d/server.crt</certificateFile>
<privateKeyFile>/etc/clickhouse-server/config.d/server.key</privateKeyFile>
<verificationMode>none</verificationMode>
<loadDefaultCAFile>true</loadDefaultCAFile>
<cacheSessions>true</cacheSessions>
<disableProtocols>sslv2,sslv3</disableProtocols>
<preferServerCiphers>true</preferServerCiphers>
</server>
</openSSL>
<listen_host>0.0.0.0</listen_host>
<protocols>
<tcp>
<type>tcp</type>
<host>0.0.0.0</host>
<port>9000</port>
<description>native protocol (tcp)</description>
</tcp>
<tcp_secure>
<type>tls</type>
<impl>tcp</impl>
<port>9440</port>
<description>secure native protocol (tcp_secure)</description>
</tcp_secure>
<tcp_endpoint>
<impl>tcp</impl>
<host>0.0.0.0</host>
<port>9001</port>
<description>native protocol endpoint (tcp)</description>
</tcp_endpoint>
<tcp_proxy>
<type>proxy1</type>
<impl>tcp</impl>
<port>9100</port>
<description>native protocol with PROXYv1 (tcp_proxy)</description>
</tcp_proxy>
<http>
<type>http</type>
<port>8123</port>
<description>http protocol</description>
</http>
<https>
<type>tls</type>
<impl>http</impl>
<host>0.0.0.0</host>
<port>8443</port>
<description>https protocol</description>
</https>
<https_endpoint>
<impl>https</impl>
<port>8444</port>
<description>https protocol endpoint</description>
</https_endpoint>
</protocols>
<!--tcp_port>9010</tcp_port-->
</clickhouse>

View File

@ -0,0 +1,18 @@
-----BEGIN CERTIFICATE-----
MIIC+zCCAeOgAwIBAgIJAIhI9ozZJ+TWMA0GCSqGSIb3DQEBCwUAMBQxEjAQBgNV
BAMMCWxvY2FsaG9zdDAeFw0xOTA0MjIwNDMyNTJaFw0yMDA0MjEwNDMyNTJaMBQx
EjAQBgNVBAMMCWxvY2FsaG9zdDCCASIwDQYJKoZIhvcNAQEBBQADggEPADCCAQoC
ggEBAK+wVUEdqF2uXvN0MJBgnAHyXi6JTi4p/F6igsrCjSNjJWzHH0vQmK8ujfcF
CkifW88i+W5eHctuEtQqNHK+t9x9YiZtXrj6m/XkOXs20mYgENSmbbbHbriTPnZB
zZrq6UqMlwIHNNAa+I3NMORQxVRaI0ybXnGVO5elr70xHpk03xL0JWKHpEqYp4db
2aBQgF6y3Ww4khxjIYqpUYXWXGFnVIRU7FKVEAM1xyKqvQzXjQ5sVM/wyHknveEF
3b/X4ggN+KNl5KOc0cWDh1/XaatJAPaUUPqZcq76tynLbP64Xm3dxHcj+gtRkO67
ef6MSg6l63m3XQP6Qb+MIkd06OsCAwEAAaNQME4wHQYDVR0OBBYEFDmODTO8QLDN
ykR3x0LIOnjNhrKhMB8GA1UdIwQYMBaAFDmODTO8QLDNykR3x0LIOnjNhrKhMAwG
A1UdEwQFMAMBAf8wDQYJKoZIhvcNAQELBQADggEBAAwaiJc7uqEpnH3aukbftDwX
m8GfEnj1HVdgg+9GGNq+9rvUYBF6gdPmjRCX9dO0cclLFx8jc2org0rTSq9WoOhX
E6qL4Eqrmc5SE3Y9jZM0h6GRD4oXK014FmtZ3T6ddZU3dQLj3BS2r1XrvmubTvGN
ZuTJNY8nx8Hh6H5XINmsEjUF9E5hog+PwCE03xt2adIdYL+gsbxASeNYyeUFpZv5
zcXR3VoakBWnAaOVgCHq2qh96QAnL7ZKzFkGf/MdwV10KU3dmb+ICbQUUdf9Gc17
aaDCIRws312F433FdXBkGs2UkB7ZZme9dfn6O1QbeTNvex2VLMqYx/CTkfFbOQA=
-----END CERTIFICATE-----

View File

@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvQIBADANBgkqhkiG9w0BAQEFAASCBKcwggSjAgEAAoIBAQCvsFVBHahdrl7z
dDCQYJwB8l4uiU4uKfxeooLKwo0jYyVsxx9L0JivLo33BQpIn1vPIvluXh3LbhLU
KjRyvrfcfWImbV64+pv15Dl7NtJmIBDUpm22x264kz52Qc2a6ulKjJcCBzTQGviN
zTDkUMVUWiNMm15xlTuXpa+9MR6ZNN8S9CVih6RKmKeHW9mgUIBest1sOJIcYyGK
qVGF1lxhZ1SEVOxSlRADNcciqr0M140ObFTP8Mh5J73hBd2/1+IIDfijZeSjnNHF
g4df12mrSQD2lFD6mXKu+rcpy2z+uF5t3cR3I/oLUZDuu3n+jEoOpet5t10D+kG/
jCJHdOjrAgMBAAECggEARF66zrxb6RkSmmt8+rKeA6PuQu3sHsr4C1vyyjUr97l9
tvdGlpp20LWtSZQMjHZ3pARYTTsTHTeY3DgQcRcHNicVKx8k3ZepWeeW9vw+pL+V
zSt3RsoVrH6gsCSrfr4sS3aqzX9AbjwQvh48CJ3mLQ1m70kHV+xbZIh1+4pB/hyP
1wKyUE18ZkOptXvO/TtoHzLQCecpkXtWzmry1Eh2isvXA+NMrAtLibGsyM1mtm7i
5ozevzHabvvCDBEe+KgZdONgVhhhvm2eOd+/s4w3rw4ETud4fI/ZAJyWXhiIKFnA
VJbElWruSAoVBW7p2bsF5PbmVzvo8vXL+VylxYD+AQKBgQDhLoRKTVhNkn/QjKxq
sdOh+QZra0LzjVpAmkQzu7wZMSHEz9qePQciDQQrYKrmRF1vNcIRCVUTqWYheJ/1
lKRrCGa0ab6k96zkWMqLHD5u+UeJV7r1dJIx08ME9kNJ+x/XtB8klRIji16NiQUS
qc6p8z0M2AnbJzsRfWZRH8FeYwKBgQDHu8dzdtVGI7MtxfPOE/bfajiopDg8BdTC
pdug2T8XofRHRq7Q+0vYjTAZFT/slib91Pk6VvvPdo9VBZiL4omv4dAq6mOOdX/c
U14mJe1X5GCrr8ExZ8BfNJ3t/6sV1fcxyJwAw7iBguqxA2JqdM/wFk10K8XqvzVn
CD6O9yGt2QKBgFX1BMi8N538809vs41S7l9hCQNOQZNo/O+2M5yv6ECRkbtoQKKw
1x03bMUGNJaLuELweXE5Z8GGo5bZTe5X3F+DKHlr+DtO1C+ieUaa9HY2MAmMdLCn
2/qrREGLo+oEs4YKmuzC/taUp/ZNPKOAMISNdluFyFVg51pozPrgrVbTAoGBAKkE
LBl3O67o0t0vH8sJdeVFG8EJhlS0koBMnfgVHqC++dm+5HwPyvTrNQJkyv1HaqNt
r6FArkG3ED9gRuBIyT6+lctbIPgSUip9mbQqcBfqOCvQxGksZMur2ODncz09HLtS
CUFUXjOqNzOnq4ZuZu/Bz7U4vXiSaXxQq6+LTUKxAoGAFZU/qrI06XxnrE9A1X0W
l7DSkpZaDcu11NrZ473yONih/xOZNh4SSBpX8a7F6Pmh9BdtGqphML8NFPvQKcfP
b9H2iid2tc292uyrUEb5uTMmv61zoTwtitqLzO0+tS6PT3fXobX+eyeEWKzPBljL
HFtxG5CCXpkdnWRmaJnhTzA=
-----END PRIVATE KEY-----

View File

@ -0,0 +1,16 @@
<clickhouse>
<profiles>
<default>
</default>
</profiles>
<users>
<default>
<password></password>
<networks replace="replace">
<ip>::/0</ip>
</networks>
<profile>default</profile>
</default>
</users>
</clickhouse>

View File

@ -0,0 +1,94 @@
import ssl
import pytest
import os.path as p
import os
from helpers.cluster import ClickHouseCluster
from helpers.client import Client
import urllib.request, urllib.parse
import subprocess
import socket
SCRIPT_DIR = os.path.dirname(os.path.realpath(__file__))
cluster = ClickHouseCluster(__file__)
server = cluster.add_instance(
"server",
base_config_dir="configs",
main_configs=["configs/server.crt", "configs/server.key"],
)
@pytest.fixture(scope="module", autouse=True)
def setup_nodes():
try:
cluster.start()
yield cluster
finally:
cluster.shutdown()
def execute_query_https(host, port, query):
url = f"https://{host}:{port}/?query={urllib.parse.quote(query)}"
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
request = urllib.request.Request(url)
response = urllib.request.urlopen(request, context=ctx).read()
return response.decode("utf-8")
def execute_query_http(host, port, query):
url = f"http://{host}:{port}/?query={urllib.parse.quote(query)}"
request = urllib.request.Request(url)
response = urllib.request.urlopen(request).read()
return response.decode("utf-8")
def netcat(hostname, port, content):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((hostname, port))
s.sendall(content)
s.shutdown(socket.SHUT_WR)
data = []
while 1:
d = s.recv(1024)
if len(d) == 0:
break
data.append(d)
s.close()
return b"".join(data)
def test_connections():
client = Client(server.ip_address, 9000, command=cluster.client_bin_path)
assert client.query("SELECT 1") == "1\n"
client = Client(
server.ip_address,
9440,
command=cluster.client_bin_path,
secure=True,
config=f"{SCRIPT_DIR}/configs/client.xml",
)
assert client.query("SELECT 1") == "1\n"
client = Client(server.ip_address, 9001, command=cluster.client_bin_path)
assert client.query("SELECT 1") == "1\n"
assert execute_query_http(server.ip_address, 8123, "SELECT 1") == "1\n"
assert execute_query_https(server.ip_address, 8443, "SELECT 1") == "1\n"
assert execute_query_https(server.ip_address, 8444, "SELECT 1") == "1\n"
data = "PROXY TCP4 255.255.255.255 255.255.255.255 65535 65535\r\n\0\021ClickHouse client\024\r\253\251\003\0\007default\0\004\001\0\001\0\0\t0.0.0.0:0\001\tmilovidov\021milovidov-desktop\vClickHouse \024\r\253\251\003\0\001\0\0\0\002\001\025SELECT 'Hello, world'\002\0\247\203\254l\325\\z|\265\254F\275\333\206\342\024\202\024\0\0\0\n\0\0\0\240\01\0\02\377\377\377\377\0\0\0"
assert (
netcat(server.ip_address, 9100, bytearray(data, "latin-1")).find(
bytearray("Hello, world", "latin-1")
)
>= 0
)

View File

@ -11,5 +11,6 @@ alter table enum_alter_issue detach partition id 'all';
alter table enum_alter_issue modify column a Enum8('one' = 1, 'two' = 2, 'three' = 3);
insert into enum_alter_issue values ('one', 1), ('two', 1);
alter table enum_alter_issue attach partition id 'all'; -- {serverError TYPE_MISMATCH}
alter table enum_alter_issue attach partition id 'all';
select * from enum_alter_issue;
drop table enum_alter_issue;

View File

@ -0,0 +1,18 @@
#!/usr/bin/env bash
# Tags: no-fasttest
CUR_DIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CUR_DIR"/../shell_config.sh
$CLICKHOUSE_CLIENT -q "insert into function file('${CLICKHOUSE_TEST_UNIQUE_NAME}_data2.jsonl') select NULL as x SETTINGS engine_file_truncate_on_insert = 1";
$CLICKHOUSE_CLIENT -q "insert into function file('${CLICKHOUSE_TEST_UNIQUE_NAME}_data3.jsonl') select * from numbers(0) SETTINGS engine_file_truncate_on_insert = 1";
$CLICKHOUSE_CLIENT -q "insert into function file('${CLICKHOUSE_TEST_UNIQUE_NAME}_data4.jsonl') select 1 as x SETTINGS engine_file_truncate_on_insert = 1";
$CLICKHOUSE_CLIENT -q "select * from file('${CLICKHOUSE_TEST_UNIQUE_NAME}_data*.jsonl') order by x";
$CLICKHOUSE_CLIENT -q "insert into function file('${CLICKHOUSE_TEST_UNIQUE_NAME}_data4.jsonl', 'TSV') select 1 as x";
$CLICKHOUSE_CLIENT -q "insert into function file('${CLICKHOUSE_TEST_UNIQUE_NAME}_data1.jsonl', 'TSV') select [1,2,3] as x SETTINGS engine_file_truncate_on_insert = 1";
$CLICKHOUSE_CLIENT -q "select * from file('${CLICKHOUSE_TEST_UNIQUE_NAME}_data*.jsonl') settings schema_inference_use_cache_for_file=0" 2>&1 | grep -F -q "INCORRECT_DATA" && echo "OK" || echo "FAIL";

View File

@ -1,11 +0,0 @@
-- Tags: no-fasttest, no-parallel
insert into function file('02267_data2.jsonl') select NULL as x;
insert into function file('02267_data3.jsonl') select * from numbers(0);
insert into function file('02267_data4.jsonl') select 1 as x;
select * from file('02267_data*.jsonl') order by x;
insert into function file('02267_data1.jsonl', 'TSV') select 1 as x;
insert into function file('02267_data1.jsonl', 'TSV') select [1,2,3] as x;
select * from file('02267_data*.jsonl') settings schema_inference_use_cache_for_file=0; --{serverError INCORRECT_DATA}

View File

@ -0,0 +1,5 @@
9
8
7
6
5

View File

@ -0,0 +1,13 @@
CREATE TABLE mytable_stored (`a` UInt8) ENGINE = MergeTree ORDER BY a;
CREATE TABLE mytable (`a` UInt8) ENGINE = Buffer(currentDatabase(), 'mytable_stored', 4, 600, 3600, 10, 100, 10000, 10000000);
INSERT INTO mytable VALUES (0);
INSERT INTO mytable VALUES (1);
INSERT INTO mytable VALUES (2);
INSERT INTO mytable VALUES (3);
INSERT INTO mytable VALUES (4);
INSERT INTO mytable VALUES (5);
INSERT INTO mytable VALUES (6);
INSERT INTO mytable VALUES (7);
INSERT INTO mytable VALUES (8);
INSERT INTO mytable VALUES (9);
SELECT a FROM mytable ORDER BY a DESC LIMIT 5;

View File

@ -0,0 +1,9 @@
1 one test1
one one test1
one one test
one one test
-----
1 one test1
one one test1
one one test
one one test

View File

@ -0,0 +1,94 @@
drop table if exists src;
create table src( A Int64, B String, C String) Engine=MergeTree order by A SETTINGS min_bytes_for_wide_part=0;
insert into src values(1, 'one', 'test');
alter table src detach partition tuple();
alter table src modify column B Nullable(String);
alter table src attach partition tuple();
alter table src update C = 'test1' where 1 settings mutations_sync=2;
select * from src;
drop table if exists src;
create table src( A String, B String, C String) Engine=MergeTree order by A SETTINGS min_bytes_for_wide_part=0;
insert into src values('one', 'one', 'test');
alter table src detach partition tuple();
alter table src modify column A LowCardinality(String);
alter table src attach partition tuple();
alter table src update C = 'test1' where 1 settings mutations_sync=2;
select * from src;
drop table if exists src;
create table src( A String, B String, C String) Engine=MergeTree order by A SETTINGS min_bytes_for_wide_part=0;
insert into src values('one', 'one', 'test');
alter table src detach partition tuple();
alter table src modify column A LowCardinality(String);
alter table src attach partition tuple();
alter table src modify column C LowCardinality(String);
select * from src;
drop table if exists src;
create table src( A String, B String, C String) Engine=MergeTree order by A SETTINGS min_bytes_for_wide_part=0;
insert into src values('one', 'one', 'test');
alter table src detach partition tuple();
alter table src modify column B Nullable(String);
alter table src attach partition tuple();
alter table src rename column B to D;
select * from src;
select '-----';
drop table if exists src;
create table src( A Int64, B String, C String) Engine=ReplicatedMergeTree('/clickhouse/{database}/test/src1', '1') order by A SETTINGS min_bytes_for_wide_part=0;
insert into src values(1, 'one', 'test');
alter table src detach partition tuple();
alter table src modify column B Nullable(String);
alter table src attach partition tuple();
alter table src update C = 'test1' where 1 settings mutations_sync=2;
select * from src;
drop table if exists src;
create table src( A String, B String, C String) Engine=ReplicatedMergeTree('/clickhouse/{database}/test/src2', '1') order by A SETTINGS min_bytes_for_wide_part=0;
insert into src values('one', 'one', 'test');
alter table src detach partition tuple();
alter table src modify column A LowCardinality(String);
alter table src attach partition tuple();
alter table src update C = 'test1' where 1 settings mutations_sync=2;
select * from src;
drop table if exists src;
create table src( A String, B String, C String) Engine=ReplicatedMergeTree('/clickhouse/{database}/test/src3', '1') order by A SETTINGS min_bytes_for_wide_part=0;
insert into src values('one', 'one', 'test');
alter table src detach partition tuple();
alter table src modify column A LowCardinality(String);
alter table src attach partition tuple();
alter table src modify column C LowCardinality(String);
select * from src;
drop table if exists src;
create table src( A String, B String, C String) Engine=ReplicatedMergeTree('/clickhouse/{database}/test/src4', '1') order by A SETTINGS min_bytes_for_wide_part=0;
insert into src values('one', 'one', 'test');
alter table src detach partition tuple();
alter table src modify column B Nullable(String);
alter table src attach partition tuple();
alter table src rename column B to D;
select * from src;