Merge branch 'master' into stress-test

This commit is contained in:
mergify[bot] 2022-03-09 16:22:45 +00:00 committed by GitHub
commit 78c8029aeb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
115 changed files with 1491 additions and 648 deletions

View File

@ -4,7 +4,7 @@ FROM ubuntu:20.04
ARG apt_archive="http://archive.ubuntu.com"
RUN sed -i "s|http://archive.ubuntu.com|$apt_archive|g" /etc/apt/sources.list
ARG repository="deb https://repo.clickhouse.com/deb/stable/ main/"
ARG repository="deb https://packages.clickhouse.com/deb stable main"
ARG version=22.1.1.*
# set non-empty deb_location_url url to create a docker image
@ -58,7 +58,7 @@ RUN groupadd -r clickhouse --gid=101 \
wget \
tzdata \
&& mkdir -p /etc/apt/sources.list.d \
&& apt-key adv --keyserver keyserver.ubuntu.com --recv E0C56BD4 \
&& apt-key adv --keyserver keyserver.ubuntu.com --recv 8919F6BD2B48D754 \
&& echo $repository > /etc/apt/sources.list.d/clickhouse.list \
&& if [ -n "$deb_location_url" ]; then \
echo "installing from custom url with deb packages: $deb_location_url" \

View File

@ -42,6 +42,9 @@ COPY prepare_hive_data.sh /
COPY demo_data.txt /
ENV PATH=/apache-hive-2.3.9-bin/bin:/hadoop-3.1.0/bin:/hadoop-3.1.0/sbin:$PATH
RUN service ssh start && sed s/HOSTNAME/$HOSTNAME/ /hadoop-3.1.0/etc/hadoop/core-site.xml.template > /hadoop-3.1.0/etc/hadoop/core-site.xml && hdfs namenode -format
RUN apt install -y python3 python3-pip
RUN pip3 install flask requests
COPY http_api_server.py /
COPY start.sh /

View File

@ -0,0 +1,70 @@
import os
import subprocess
import datetime
from flask import Flask, flash, request, redirect, url_for
def run_command(command, wait=False):
print("{} - execute shell command:{}".format(datetime.datetime.now(), command))
lines = []
p = subprocess.Popen(command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True)
if wait:
for l in iter(p.stdout.readline, b''):
lines.append(l)
p.poll()
return (lines, p.returncode)
else:
return(iter(p.stdout.readline, b''), 0)
UPLOAD_FOLDER = './'
ALLOWED_EXTENSIONS = {'txt', 'sh'}
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
@app.route('/')
def hello_world():
return 'Hello World'
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/upload', methods=['GET', 'POST'])
def upload_file():
if request.method == 'POST':
# check if the post request has the file part
if 'file' not in request.files:
flash('No file part')
return redirect(request.url)
file = request.files['file']
# If the user does not select a file, the browser submits an
# empty file without a filename.
if file.filename == '':
flash('No selected file')
return redirect(request.url)
if file and allowed_file(file.filename):
filename = file.filename
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
return redirect(url_for('upload_file', name=filename))
return '''
<!doctype html>
<title>Upload new File</title>
<h1>Upload new File</h1>
<form method=post enctype=multipart/form-data>
<input type=file name=file>
<input type=submit value=Upload>
</form>
'''
@app.route('/run', methods=['GET', 'POST'])
def parse_request():
data = request.data # data is empty
run_command(data, wait=True)
return 'Ok'
if __name__ == '__main__':
app.run(port=5011)

View File

@ -2,5 +2,9 @@
hive -e "create database test"
hive -e "create table test.demo(id string, score int) PARTITIONED BY(day string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'; create table test.demo_orc(id string, score int) PARTITIONED BY(day string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'; "
hive -e "create table test.parquet_demo(id string, score int) PARTITIONED BY(day string, hour string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'"
hive -e "create table test.demo_text(id string, score int, day string)row format delimited fields terminated by ','; load data local inpath '/demo_data.txt' into table test.demo_text "
hive -e "set hive.exec.dynamic.partition.mode=nonstrict;insert into test.demo partition(day) select * from test.demo_text; insert into test.demo_orc partition(day) select * from test.demo_text"
hive -e "set hive.exec.dynamic.partition.mode=nonstrict;insert into test.demo partition(day) select * from test.demo_text; insert into test.demo_orc partition(day) select * from test.demo_text"
hive -e "set hive.exec.dynamic.partition.mode=nonstrict;insert into test.parquet_demo partition(day, hour) select id, score, day, '00' as hour from test.demo;"
hive -e "set hive.exec.dynamic.partition.mode=nonstrict;insert into test.parquet_demo partition(day, hour) select id, score, day, '01' as hour from test.demo;"

View File

@ -1,6 +1,5 @@
service ssh start
sed s/HOSTNAME/$HOSTNAME/ /hadoop-3.1.0/etc/hadoop/core-site.xml.template > /hadoop-3.1.0/etc/hadoop/core-site.xml
hadoop namenode -format
start-all.sh
service mysql start
mysql -u root -e "CREATE USER \"test\"@\"localhost\" IDENTIFIED BY \"test\""
@ -9,4 +8,4 @@ schematool -initSchema -dbType mysql
#nohup hiveserver2 &
nohup hive --service metastore &
bash /prepare_hive_data.sh
while true; do sleep 1000; done
python3 http_api_server.py

View File

@ -1,11 +1,11 @@
sudo apt-get install apt-transport-https ca-certificates dirmngr
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4
sudo apt-get install -y apt-transport-https ca-certificates dirmngr
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 8919F6BD2B48D754
echo "deb https://repo.clickhouse.com/deb/stable/ main/" | sudo tee \
echo "deb https://packages.clickhouse.com/deb stable main" | sudo tee \
/etc/apt/sources.list.d/clickhouse.list
sudo apt-get update
sudo apt-get install -y clickhouse-server clickhouse-client
sudo service clickhouse-server start
clickhouse-client # or "clickhouse-client --password" if you set up a password.
clickhouse-client # or "clickhouse-client --password" if you've set up a password.

View File

@ -0,0 +1,11 @@
sudo apt-get install apt-transport-https ca-certificates dirmngr
sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv E0C56BD4
echo "deb https://repo.clickhouse.com/deb/stable/ main/" | sudo tee \
/etc/apt/sources.list.d/clickhouse.list
sudo apt-get update
sudo apt-get install -y clickhouse-server clickhouse-client
sudo service clickhouse-server start
clickhouse-client # or "clickhouse-client --password" if you set up a password.

View File

@ -1,7 +1,6 @@
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/clickhouse.repo
sudo yum install clickhouse-server clickhouse-client
sudo yum install -y yum-utils
sudo yum-config-manager --add-repo https://packages.clickhouse.com/rpm/clickhouse.repo
sudo yum install -y clickhouse-server clickhouse-client
sudo /etc/init.d/clickhouse-server start
clickhouse-client # or "clickhouse-client --password" if you set up a password.

View File

@ -0,0 +1,7 @@
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/clickhouse.repo
sudo yum install clickhouse-server clickhouse-client
sudo /etc/init.d/clickhouse-server start
clickhouse-client # or "clickhouse-client --password" if you set up a password.

View File

@ -1,19 +1,20 @@
export LATEST_VERSION=$(curl -s https://repo.clickhouse.com/tgz/stable/ | \
LATEST_VERSION=$(curl -s https://packages.clickhouse.com/tgz/stable/ | \
grep -Eo '[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+' | sort -V -r | head -n 1)
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-common-static-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-common-static-dbg-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-server-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-client-$LATEST_VERSION.tgz
export LATEST_VERSION
curl -O "https://packages.clickhouse.com/tgz/stable/clickhouse-common-static-$LATEST_VERSION.tgz"
curl -O "https://packages.clickhouse.com/tgz/stable/clickhouse-common-static-dbg-$LATEST_VERSION.tgz"
curl -O "https://packages.clickhouse.com/tgz/stable/clickhouse-server-$LATEST_VERSION.tgz"
curl -O "https://packages.clickhouse.com/tgz/stable/clickhouse-client-$LATEST_VERSION.tgz"
tar -xzvf clickhouse-common-static-$LATEST_VERSION.tgz
sudo clickhouse-common-static-$LATEST_VERSION/install/doinst.sh
tar -xzvf "clickhouse-common-static-$LATEST_VERSION.tgz"
sudo "clickhouse-common-static-$LATEST_VERSION/install/doinst.sh"
tar -xzvf clickhouse-common-static-dbg-$LATEST_VERSION.tgz
sudo clickhouse-common-static-dbg-$LATEST_VERSION/install/doinst.sh
tar -xzvf "clickhouse-common-static-dbg-$LATEST_VERSION.tgz"
sudo "clickhouse-common-static-dbg-$LATEST_VERSION/install/doinst.sh"
tar -xzvf clickhouse-server-$LATEST_VERSION.tgz
sudo clickhouse-server-$LATEST_VERSION/install/doinst.sh
tar -xzvf "clickhouse-server-$LATEST_VERSION.tgz"
sudo "clickhouse-server-$LATEST_VERSION/install/doinst.sh"
sudo /etc/init.d/clickhouse-server start
tar -xzvf clickhouse-client-$LATEST_VERSION.tgz
sudo clickhouse-client-$LATEST_VERSION/install/doinst.sh
tar -xzvf "clickhouse-client-$LATEST_VERSION.tgz"
sudo "clickhouse-client-$LATEST_VERSION/install/doinst.sh"

View File

@ -0,0 +1,19 @@
export LATEST_VERSION=$(curl -s https://repo.clickhouse.com/tgz/stable/ | \
grep -Eo '[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+' | sort -V -r | head -n 1)
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-common-static-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-common-static-dbg-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-server-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-client-$LATEST_VERSION.tgz
tar -xzvf clickhouse-common-static-$LATEST_VERSION.tgz
sudo clickhouse-common-static-$LATEST_VERSION/install/doinst.sh
tar -xzvf clickhouse-common-static-dbg-$LATEST_VERSION.tgz
sudo clickhouse-common-static-dbg-$LATEST_VERSION/install/doinst.sh
tar -xzvf clickhouse-server-$LATEST_VERSION.tgz
sudo clickhouse-server-$LATEST_VERSION/install/doinst.sh
sudo /etc/init.d/clickhouse-server start
tar -xzvf clickhouse-client-$LATEST_VERSION.tgz
sudo clickhouse-client-$LATEST_VERSION/install/doinst.sh

View File

@ -27,9 +27,17 @@ It is recommended to use official pre-compiled `deb` packages for Debian or Ubun
{% include 'install/deb.sh' %}
```
<details markdown="1">
<summary>Deprecated Method for installing deb-packages</summary>
``` bash
{% include 'install/deb_repo.sh' %}
```
</details>
You can replace `stable` with `lts` or `testing` to use different [release trains](../faq/operations/production.md) based on your needs.
You can also download and install packages manually from [here](https://repo.clickhouse.com/deb/stable/main/).
You can also download and install packages manually from [here](https://packages.clickhouse.com/deb/pool/stable).
#### Packages {#packages}
@ -49,11 +57,17 @@ It is recommended to use official pre-compiled `rpm` packages for CentOS, RedHat
First, you need to add the official repository:
``` bash
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
{% include 'install/rpm.sh' %}
```
<details markdown="1">
<summary>Deprecated Method for installing rpm-packages</summary>
``` bash
{% include 'install/rpm_repo.sh' %}
```
</details>
If you want to use the most recent version, replace `stable` with `testing` (this is recommended for your testing environments). `prestable` is sometimes also available.
Then run these commands to install packages:
@ -62,36 +76,27 @@ Then run these commands to install packages:
sudo yum install clickhouse-server clickhouse-client
```
You can also download and install packages manually from [here](https://repo.clickhouse.com/rpm/stable/x86_64).
You can also download and install packages manually from [here](https://packages.clickhouse.com/rpm/stable).
### From Tgz Archives {#from-tgz-archives}
It is recommended to use official pre-compiled `tgz` archives for all Linux distributions, where installation of `deb` or `rpm` packages is not possible.
The required version can be downloaded with `curl` or `wget` from repository https://repo.clickhouse.com/tgz/.
The required version can be downloaded with `curl` or `wget` from repository https://packages.clickhouse.com/tgz/.
After that downloaded archives should be unpacked and installed with installation scripts. Example for the latest stable version:
``` bash
export LATEST_VERSION=`curl https://api.github.com/repos/ClickHouse/ClickHouse/tags 2>/dev/null | grep stable | grep -Eo '[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+' | head -n 1`
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-common-static-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-common-static-dbg-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-server-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-client-$LATEST_VERSION.tgz
tar -xzvf clickhouse-common-static-$LATEST_VERSION.tgz
sudo clickhouse-common-static-$LATEST_VERSION/install/doinst.sh
tar -xzvf clickhouse-common-static-dbg-$LATEST_VERSION.tgz
sudo clickhouse-common-static-dbg-$LATEST_VERSION/install/doinst.sh
tar -xzvf clickhouse-server-$LATEST_VERSION.tgz
sudo clickhouse-server-$LATEST_VERSION/install/doinst.sh
sudo /etc/init.d/clickhouse-server start
tar -xzvf clickhouse-client-$LATEST_VERSION.tgz
sudo clickhouse-client-$LATEST_VERSION/install/doinst.sh
{% include 'install/tgz.sh' %}
```
<details markdown="1">
<summary>Deprecated Method for installing tgz archives</summary>
``` bash
{% include 'install/tgz_repo.sh' %}
```
</details>
For production environments, its recommended to use the latest `stable`-version. You can find its number on GitHub page https://github.com/ClickHouse/ClickHouse/tags with postfix `-stable`.
### From Docker Image {#from-docker-image}

View File

@ -1,11 +1,11 @@
---
toc_priority: 59
toc_title: Yandex.Metrica Dictionaries
toc_title: Embedded Dictionaries
---
# Functions for Working with Yandex.Metrica Dictionaries {#functions-for-working-with-yandex-metrica-dictionaries}
# Functions for Working with Embedded Dictionaries
In order for the functions below to work, the server config must specify the paths and addresses for getting all the Yandex.Metrica dictionaries. The dictionaries are loaded at the first call of any of these functions. If the reference lists cant be loaded, an exception is thrown.
In order for the functions below to work, the server config must specify the paths and addresses for getting all the embedded dictionaries. The dictionaries are loaded at the first call of any of these functions. If the reference lists cant be loaded, an exception is thrown.
For information about creating reference lists, see the section “Dictionaries”.
@ -33,7 +33,7 @@ regionToCountry(RegionID, 'ua') Uses the dictionary for the 'ua' key: /opt/g
### regionToCity(id\[, geobase\]) {#regiontocityid-geobase}
Accepts a UInt32 number the region ID from the Yandex geobase. If this region is a city or part of a city, it returns the region ID for the appropriate city. Otherwise, returns 0.
Accepts a UInt32 number the region ID from the geobase. If this region is a city or part of a city, it returns the region ID for the appropriate city. Otherwise, returns 0.
### regionToArea(id\[, geobase\]) {#regiontoareaid-geobase}
@ -117,7 +117,7 @@ regionToTopContinent(id[, geobase])
**Arguments**
- `id` — Region ID from the Yandex geobase. [UInt32](../../sql-reference/data-types/int-uint.md).
- `id` — Region ID from the geobase. [UInt32](../../sql-reference/data-types/int-uint.md).
- `geobase` — Dictionary key. See [Multiple Geobases](#multiple-geobases). [String](../../sql-reference/data-types/string.md). Optional.
**Returned value**
@ -132,7 +132,7 @@ Type: `UInt32`.
Gets the population for a region.
The population can be recorded in files with the geobase. See the section “External dictionaries”.
If the population is not recorded for the region, it returns 0.
In the Yandex geobase, the population might be recorded for child regions, but not for parent regions.
In the geobase, the population might be recorded for child regions, but not for parent regions.
### regionIn(lhs, rhs\[, geobase\]) {#regioninlhs-rhs-geobase}
@ -141,12 +141,12 @@ The relationship is reflexive any region also belongs to itself.
### regionHierarchy(id\[, geobase\]) {#regionhierarchyid-geobase}
Accepts a UInt32 number the region ID from the Yandex geobase. Returns an array of region IDs consisting of the passed region and all parents along the chain.
Accepts a UInt32 number the region ID from the geobase. Returns an array of region IDs consisting of the passed region and all parents along the chain.
Example: `regionHierarchy(toUInt32(213)) = [213,1,3,225,10001,10000]`.
### regionToName(id\[, lang\]) {#regiontonameid-lang}
Accepts a UInt32 number the region ID from the Yandex geobase. A string with the name of the language can be passed as a second argument. Supported languages are: ru, en, ua, uk, by, kz, tr. If the second argument is omitted, the language ru is used. If the language is not supported, an exception is thrown. Returns a string the name of the region in the corresponding language. If the region with the specified ID does not exist, an empty string is returned.
Accepts a UInt32 number the region ID from the geobase. A string with the name of the language can be passed as a second argument. Supported languages are: ru, en, ua, uk, by, kz, tr. If the second argument is omitted, the language ru is used. If the language is not supported, an exception is thrown. Returns a string the name of the region in the corresponding language. If the region with the specified ID does not exist, an empty string is returned.
`ua` and `uk` both mean Ukrainian.

View File

@ -11,7 +11,7 @@ ClickHouse supports the following syntax variants:
- `LIMIT [offset_value, ]n BY expressions`
- `LIMIT n OFFSET offset_value BY expressions`
During query processing, ClickHouse selects data ordered by sorting key. The sorting key is set explicitly using an [ORDER BY](../../../sql-reference/statements/select/order-by.md) clause or implicitly as a property of the table engine. Then ClickHouse applies `LIMIT n BY expressions` and returns the first `n` rows for each distinct combination of `expressions`. If `OFFSET` is specified, then for each data block that belongs to a distinct combination of `expressions`, ClickHouse skips `offset_value` number of rows from the beginning of the block and returns a maximum of `n` rows as a result. If `offset_value` is bigger than the number of rows in the data block, ClickHouse returns zero rows from the block.
During query processing, ClickHouse selects data ordered by sorting key. The sorting key is set explicitly using an [ORDER BY](order-by.md#select-order-by) clause or implicitly as a property of the table engine (row order is only guaranteed when using [ORDER BY](order-by.md#select-order-by), otherwise the row blocks will not be ordered due to multi-threading). Then ClickHouse applies `LIMIT n BY expressions` and returns the first `n` rows for each distinct combination of `expressions`. If `OFFSET` is specified, then for each data block that belongs to a distinct combination of `expressions`, ClickHouse skips `offset_value` number of rows from the beginning of the block and returns a maximum of `n` rows as a result. If `offset_value` is bigger than the number of rows in the data block, ClickHouse returns zero rows from the block.
!!! note "Note"
`LIMIT BY` is not related to [LIMIT](../../../sql-reference/statements/select/limit.md). They can both be used in the same query.

View File

@ -28,9 +28,17 @@ Debian や Ubuntu 用にコンパイル済みの公式パッケージ `deb` を
{% include 'install/deb.sh' %}
```
<details markdown="1">
<summary>Deprecated Method for installing deb-packages</summary>
``` bash
{% include 'install/deb_repo.sh' %}
```
</details>
最新版を使いたい場合は、`stable`を`testing`に置き換えてください。(テスト環境ではこれを推奨します)
同様に、[こちら](https://repo.clickhouse.com/deb/stable/main/)からパッケージをダウンロードして、手動でインストールすることもできます。
同様に、[こちら](https://packages.clickhouse.com/deb/pool/stable)からパッケージをダウンロードして、手動でインストールすることもできます。
#### パッケージ {#packages}
@ -46,11 +54,17 @@ CentOS、RedHat、その他すべてのrpmベースのLinuxディストリビュ
まず、公式リポジトリを追加する必要があります:
``` bash
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
{% include 'install/rpm.sh' %}
```
<details markdown="1">
<summary>Deprecated Method for installing rpm-packages</summary>
``` bash
{% include 'install/rpm_repo.sh' %}
```
</details>
最新版を使いたい場合は `stable``testing` に置き換えてください。(テスト環境ではこれが推奨されています)。`prestable` もしばしば同様に利用できます。
そして、以下のコマンドを実行してパッケージをインストールします:
@ -59,35 +73,26 @@ sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
sudo yum install clickhouse-server clickhouse-client
```
同様に、[こちら](https://repo.clickhouse.com/rpm/stable/x86_64) からパッケージをダウンロードして、手動でインストールすることもできます。
同様に、[こちら](https://packages.clickhouse.com/rpm/stable) からパッケージをダウンロードして、手動でインストールすることもできます。
### Tgzアーカイブから {#from-tgz-archives}
すべての Linux ディストリビューションで、`deb` や `rpm` パッケージがインストールできない場合は、公式のコンパイル済み `tgz` アーカイブを使用することをお勧めします。
必要なバージョンは、リポジトリ https://repo.clickhouse.com/tgz/ から `curl` または `wget` でダウンロードできます。その後、ダウンロードしたアーカイブを解凍し、インストールスクリプトでインストールしてください。最新版の例は以下です:
必要なバージョンは、リポジトリ https://packages.clickhouse.com/tgz/ から `curl` または `wget` でダウンロードできます。その後、ダウンロードしたアーカイブを解凍し、インストールスクリプトでインストールしてください。最新版の例は以下です:
``` bash
export LATEST_VERSION=`curl https://api.github.com/repos/ClickHouse/ClickHouse/tags 2>/dev/null | grep -Eo '[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+' | head -n 1`
curl -O https://repo.clickhouse.com/tgz/clickhouse-common-static-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/clickhouse-common-static-dbg-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/clickhouse-server-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/clickhouse-client-$LATEST_VERSION.tgz
tar -xzvf clickhouse-common-static-$LATEST_VERSION.tgz
sudo clickhouse-common-static-$LATEST_VERSION/install/doinst.sh
tar -xzvf clickhouse-common-static-dbg-$LATEST_VERSION.tgz
sudo clickhouse-common-static-dbg-$LATEST_VERSION/install/doinst.sh
tar -xzvf clickhouse-server-$LATEST_VERSION.tgz
sudo clickhouse-server-$LATEST_VERSION/install/doinst.sh
sudo /etc/init.d/clickhouse-server start
tar -xzvf clickhouse-client-$LATEST_VERSION.tgz
sudo clickhouse-client-$LATEST_VERSION/install/doinst.sh
{% include 'install/tgz.sh' %}
```
<details markdown="1">
<summary>Deprecated Method for installing tgz archives</summary>
``` bash
{% include 'install/tgz_repo.sh' %}
```
</details>
本番環境では、最新の `stable` バージョンを使うことをお勧めします。GitHub のページ https://github.com/ClickHouse/ClickHouse/tags で 接尾辞 `-stable` となっているバージョン番号として確認できます。
### Dockerイメージから {#from-docker-image}

View File

@ -27,11 +27,17 @@ grep -q sse4_2 /proc/cpuinfo && echo "SSE 4.2 supported" || echo "SSE 4.2 not su
{% include 'install/deb.sh' %}
```
Также эти пакеты можно скачать и установить вручную отсюда: https://repo.clickhouse.com/deb/stable/main/.
<details markdown="1">
<summary>Устаревший способ установки deb-пакетов</summary>
``` bash
{% include 'install/deb_repo.sh' %}
```
</details>
Чтобы использовать различные [версии ClickHouse](../faq/operations/production.md) в зависимости от ваших потребностей, вы можете заменить `stable` на `lts` или `testing`.
Также вы можете вручную скачать и установить пакеты из [репозитория](https://repo.clickhouse.com/deb/stable/main/).
Также вы можете вручную скачать и установить пакеты из [репозитория](https://packages.clickhouse.com/deb/pool/stable).
#### Пакеты {#packages}
@ -51,11 +57,17 @@ grep -q sse4_2 /proc/cpuinfo && echo "SSE 4.2 supported" || echo "SSE 4.2 not su
Сначала нужно подключить официальный репозиторий:
``` bash
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
{% include 'install/rpm.sh' %}
```
<details markdown="1">
<summary>Устаревший способ установки rpm-пакетов</summary>
``` bash
{% include 'install/rpm_repo.sh' %}
```
</details>
Для использования наиболее свежих версий нужно заменить `stable` на `testing` (рекомендуется для тестовых окружений). Также иногда доступен `prestable`.
Для, собственно, установки пакетов необходимо выполнить следующие команды:
@ -64,36 +76,27 @@ sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
sudo yum install clickhouse-server clickhouse-client
```
Также есть возможность установить пакеты вручную, скачав отсюда: https://repo.clickhouse.com/rpm/stable/x86_64.
Также есть возможность установить пакеты вручную, скачав отсюда: https://packages.clickhouse.com/rpm/stable.
### Из Tgz архивов {#from-tgz-archives}
Команда ClickHouse в Яндексе рекомендует использовать предкомпилированные бинарники из `tgz` архивов для всех дистрибутивов, где невозможна установка `deb` и `rpm` пакетов.
Интересующую версию архивов можно скачать вручную с помощью `curl` или `wget` из репозитория https://repo.clickhouse.com/tgz/.
Интересующую версию архивов можно скачать вручную с помощью `curl` или `wget` из репозитория https://packages.clickhouse.com/tgz/.
После этого архивы нужно распаковать и воспользоваться скриптами установки. Пример установки самой свежей версии:
``` bash
export LATEST_VERSION=`curl https://api.github.com/repos/ClickHouse/ClickHouse/tags 2>/dev/null | grep -Eo '[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+' | head -n 1`
curl -O https://repo.clickhouse.com/tgz/clickhouse-common-static-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/clickhouse-common-static-dbg-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/clickhouse-server-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/clickhouse-client-$LATEST_VERSION.tgz
tar -xzvf clickhouse-common-static-$LATEST_VERSION.tgz
sudo clickhouse-common-static-$LATEST_VERSION/install/doinst.sh
tar -xzvf clickhouse-common-static-dbg-$LATEST_VERSION.tgz
sudo clickhouse-common-static-dbg-$LATEST_VERSION/install/doinst.sh
tar -xzvf clickhouse-server-$LATEST_VERSION.tgz
sudo clickhouse-server-$LATEST_VERSION/install/doinst.sh
sudo /etc/init.d/clickhouse-server start
tar -xzvf clickhouse-client-$LATEST_VERSION.tgz
sudo clickhouse-client-$LATEST_VERSION/install/doinst.sh
{% include 'install/tgz.sh' %}
```
<details markdown="1">
<summary>Устаревший способ установки из архивов tgz</summary>
``` bash
{% include 'install/tgz_repo.sh' %}
```
</details>
Для production окружений рекомендуется использовать последнюю `stable`-версию. Её номер также можно найти на github с на вкладке https://github.com/ClickHouse/ClickHouse/tags c постфиксом `-stable`.
### Из Docker образа {#from-docker-image}

View File

@ -11,7 +11,7 @@ ClickHouse поддерживает следующий синтаксис:
- `LIMIT [offset_value, ]n BY expressions`
- `LIMIT n OFFSET offset_value BY expressions`
Во время обработки запроса, ClickHouse выбирает данные, упорядоченные по ключу сортировки. Ключ сортировки задаётся явно в секции [ORDER BY](order-by.md#select-order-by) или неявно в свойствах движка таблицы. Затем ClickHouse применяет `LIMIT n BY expressions` и возвращает первые `n` для каждой отличной комбинации `expressions`. Если указан `OFFSET`, то для каждого блока данных, который принадлежит отдельной комбинации `expressions`, ClickHouse отступает `offset_value` строк от начала блока и возвращает не более `n`. Если `offset_value` больше, чем количество строк в блоке данных, ClickHouse не возвращает ни одной строки.
Во время обработки запроса, ClickHouse выбирает данные, упорядоченные по ключу сортировки. Ключ сортировки задаётся явно в секции [ORDER BY](order-by.md#select-order-by) или неявно в свойствах движка таблицы (порядок строк гарантирован только при использовании [ORDER BY](order-by.md#select-order-by), в ином случае блоки строк не будут упорядочены из-за многопоточной обработки). Затем ClickHouse применяет `LIMIT n BY expressions` и возвращает первые `n` для каждой отличной комбинации `expressions`. Если указан `OFFSET`, то для каждого блока данных, который принадлежит отдельной комбинации `expressions`, ClickHouse отступает `offset_value` строк от начала блока и возвращает не более `n`. Если `offset_value` больше, чем количество строк в блоке данных, ClickHouse не возвращает ни одной строки.
`LIMIT BY` не связана с секцией `LIMIT`. Их можно использовать в одном запросе.

View File

@ -38,5 +38,46 @@ CREATE TABLE test
ENGINE = EmbeddedRocksDB
PRIMARY KEY key
```
## 指标
还有一个`system.rocksdb` 表, 公开rocksdb的统计信息:
```sql
SELECT
name,
value
FROM system.rocksdb
┌─name──────────────────────┬─value─┐
│ no.file.opens │ 1 │
│ number.block.decompressed │ 1 │
└───────────────────────────┴───────┘
```
## 配置
你能修改任何[rocksdb options](https://github.com/facebook/rocksdb/wiki/Option-String-and-Option-Map) 配置,使用配置文件:
```xml
<rocksdb>
<options>
<max_background_jobs>8</max_background_jobs>
</options>
<column_family_options>
<num_levels>2</num_levels>
</column_family_options>
<tables>
<table>
<name>TABLE</name>
<options>
<max_background_jobs>8</max_background_jobs>
</options>
<column_family_options>
<num_levels>2</num_levels>
</column_family_options>
</table>
</tables>
</rocksdb>
```
[原始文章](https://clickhouse.com/docs/en/engines/table-engines/integrations/embedded-rocksdb/) <!--hide-->

View File

@ -27,9 +27,17 @@ $ grep -q sse4_2 /proc/cpuinfo && echo "SSE 4.2 supported" || echo "SSE 4.2 not
{% include 'install/deb.sh' %}
```
<details markdown="1">
<summary>Deprecated Method for installing deb-packages</summary>
``` bash
{% include 'install/deb_repo.sh' %}
```
</details>
如果您想使用最新的版本,请用`testing`替代`stable`(我们只推荐您用于测试环境)。
你也可以从这里手动下载安装包:[下载](https://repo.clickhouse.com/deb/stable/main/)。
你也可以从这里手动下载安装包:[下载](https://packages.clickhouse.com/deb/pool/stable)。
安装包列表:
@ -45,11 +53,17 @@ $ grep -q sse4_2 /proc/cpuinfo && echo "SSE 4.2 supported" || echo "SSE 4.2 not
首先,您需要添加官方存储库:
``` bash
sudo yum install yum-utils
sudo rpm --import https://repo.clickhouse.com/CLICKHOUSE-KEY.GPG
sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
{% include 'install/rpm.sh' %}
```
<details markdown="1">
<summary>Deprecated Method for installing rpm-packages</summary>
``` bash
{% include 'install/rpm_repo.sh' %}
```
</details>
如果您想使用最新的版本,请用`testing`替代`stable`(我们只推荐您用于测试环境)。`prestable`有时也可用。
然后运行命令安装:
@ -58,37 +72,28 @@ sudo yum-config-manager --add-repo https://repo.clickhouse.com/rpm/stable/x86_64
sudo yum install clickhouse-server clickhouse-client
```
你也可以从这里手动下载安装包:[下载](https://repo.clickhouse.com/rpm/stable/x86_64)。
你也可以从这里手动下载安装包:[下载](https://packages.clickhouse.com/rpm/stable)。
### `Tgz`安装包 {#from-tgz-archives}
如果您的操作系统不支持安装`deb`或`rpm`包,建议使用官方预编译的`tgz`软件包。
所需的版本可以通过`curl`或`wget`从存储库`https://repo.clickhouse.com/tgz/`下载。
所需的版本可以通过`curl`或`wget`从存储库`https://packages.clickhouse.com/tgz/`下载。
下载后解压缩下载资源文件并使用安装脚本进行安装。以下是一个最新稳定版本的安装示例:
``` bash
export LATEST_VERSION=`curl https://api.github.com/repos/ClickHouse/ClickHouse/tags 2>/dev/null | grep stable | grep -Eo '[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+' | head -n 1`
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-common-static-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-common-static-dbg-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-server-$LATEST_VERSION.tgz
curl -O https://repo.clickhouse.com/tgz/stable/clickhouse-client-$LATEST_VERSION.tgz
tar -xzvf clickhouse-common-static-$LATEST_VERSION.tgz
sudo clickhouse-common-static-$LATEST_VERSION/install/doinst.sh
tar -xzvf clickhouse-common-static-dbg-$LATEST_VERSION.tgz
sudo clickhouse-common-static-dbg-$LATEST_VERSION/install/doinst.sh
tar -xzvf clickhouse-server-$LATEST_VERSION.tgz
sudo clickhouse-server-$LATEST_VERSION/install/doinst.sh
sudo /etc/init.d/clickhouse-server start
tar -xzvf clickhouse-client-$LATEST_VERSION.tgz
sudo clickhouse-client-$LATEST_VERSION/install/doinst.sh
{% include 'install/tgz.sh' %}
```
<details markdown="1">
<summary>Deprecated Method for installing tgz archives</summary>
``` bash
{% include 'install/tgz_repo.sh' %}
```
</details>
对于生产环境,建议使用最新的`stable`版本。你可以在GitHub页面https://github.com/ClickHouse/ClickHouse/tags找到它它以后缀`-stable`标志。
### `Docker`安装包 {#from-docker-image}

View File

@ -1 +0,0 @@
../../../../en/sql-reference/statements/alter/row-policy.md

View File

@ -0,0 +1,19 @@
---
toc_priority: 47
toc_title: 行策略
---
# 操作行策略 {#alter-row-policy-statement}
修改行策略.
语法:
``` sql
ALTER [ROW] POLICY [IF EXISTS] name1 [ON CLUSTER cluster_name1] ON [database1.]table1 [RENAME TO new_name1]
[, name2 [ON CLUSTER cluster_name2] ON [database2.]table2 [RENAME TO new_name2] ...]
[AS {PERMISSIVE | RESTRICTIVE}]
[FOR SELECT]
[USING {condition | NONE}][,...]
[TO {role [,...] | ALL | ALL EXCEPT role [,...]}]
```

View File

@ -1 +0,0 @@
../../../../en/sql-reference/statements/alter/settings-profile.md

View File

@ -0,0 +1,16 @@
---
toc_priority: 48
toc_title: 配置文件设置
---
## 更改配置文件设置 {#alter-settings-profile-statement}
更改配置文件设置。
语法:
``` sql
ALTER SETTINGS PROFILE [IF EXISTS] TO name1 [ON CLUSTER cluster_name1] [RENAME TO new_name1]
[, name2 [ON CLUSTER cluster_name2] [RENAME TO new_name2] ...]
[SETTINGS variable [= value] [MIN [=] min_value] [MAX [=] max_value] [READONLY|WRITABLE] | INHERIT 'profile_name'] [,...]
```

View File

@ -11,7 +11,7 @@ ClickHouse支持以下语法变体:
- `LIMIT [offset_value, ]n BY expressions`
- `LIMIT n OFFSET offset_value BY expressions`
查询处理过程中ClickHouse会选择按排序键排序的数据。 排序键使用以下命令显式设置 [ORDER BY](../../../sql-reference/statements/select/order-by.md) 子句或隐式作为表引擎的属性。 然后ClickHouse应用 `LIMIT n BY expressions` 并返回第一 `n` 每个不同组合的行 `expressions`. 如果 `OFFSET` 被指定,则对于每个数据块属于一个不同的组合 `expressions`ClickHouse跳过 `offset_value` 从块开始的行数,并返回最大值 `n` 行的结果。 如果 `offset_value` 如果数据块中的行数大于数据块中的行数ClickHouse将从该块返回零行。
进行查询处理时ClickHouse选择按排序键排序的数据。排序键设置显式地使用一个[ORDER BY](order-by.md#select-order-by)条款或隐式属性表的引擎(行顺序只是保证在使用[ORDER BY](order-by.md#select-order-by),否则不会命令行块由于多线程)。然后ClickHouse应用`LIMIT n BY 表达式`,并为每个不同的`表达式`组合返回前n行。如果指定了`OFFSET`,那么对于每个属于不同`表达式`组合的数据块ClickHouse将跳过`offset_value`从块开始的行数,并最终返回最多`n`行的结果。如果`offset_value`大于数据块中的行数则ClickHouse从数据块中返回零行。
!!! note "注"
`LIMIT BY` 是不相关的 [LIMIT](../../../sql-reference/statements/select/limit.md). 它们都可以在同一个查询中使用。

View File

@ -0,0 +1,31 @@
[clickhouse-stable]
name=ClickHouse - Stable Repository
baseurl=https://packages.clickhouse.com/rpm/stable/
gpgkey=https://packages.clickhouse.com/rpm/stable/repodata/repomd.xml.key
gpgcheck=0
repo_gpgcheck=1
enabled=0
[clickhouse-lts]
name=ClickHouse - LTS Repository
baseurl=https://packages.clickhouse.com/rpm/lts/
gpgkey=https://packages.clickhouse.com/rpm/lts/repodata/repomd.xml.key
gpgcheck=0
repo_gpgcheck=1
enabled=0
[clickhouse-prestable]
name=ClickHouse - Pre-stable Repository
baseurl=https://packages.clickhouse.com/rpm/prestable/
gpgkey=https://packages.clickhouse.com/rpm/prestable/repodata/repomd.xml.key
gpgcheck=0
repo_gpgcheck=1
enabled=0
[clickhouse-testing]
name=ClickHouse - Testing Repository
baseurl=https://packages.clickhouse.com/rpm/testing/
gpgkey=https://packages.clickhouse.com/rpm/testing/repodata/repomd.xml.key
gpgcheck=0
repo_gpgcheck=1
enabled=1

View File

@ -435,6 +435,8 @@ private:
Progress progress;
executor.setProgressCallback([&progress](const Progress & value) { progress.incrementPiecewiseAtomically(value); });
executor.sendQuery(ClientInfo::QueryKind::INITIAL_QUERY);
ProfileInfo info;
while (Block block = executor.read())
info.update(block);

View File

@ -411,7 +411,8 @@ void LocalServer::setupUsers()
void LocalServer::connect()
{
connection_parameters = ConnectionParameters(config());
connection = LocalConnection::createConnection(connection_parameters, global_context, need_render_progress);
connection = LocalConnection::createConnection(
connection_parameters, global_context, need_render_progress, need_render_profile_events, server_display_name);
}

View File

@ -144,7 +144,6 @@ list (APPEND dbms_sources
AggregateFunctions/AggregateFunctionState.cpp
AggregateFunctions/AggregateFunctionCount.cpp
AggregateFunctions/parseAggregateFunctionParameters.cpp)
list (APPEND dbms_headers
AggregateFunctions/IAggregateFunction.h
AggregateFunctions/IAggregateFunctionCombinator.h
@ -155,10 +154,25 @@ list (APPEND dbms_headers
AggregateFunctions/FactoryHelpers.h
AggregateFunctions/parseAggregateFunctionParameters.h)
list (APPEND dbms_sources TableFunctions/ITableFunction.cpp TableFunctions/TableFunctionFactory.cpp)
list (APPEND dbms_headers TableFunctions/ITableFunction.h TableFunctions/TableFunctionFactory.h)
list (APPEND dbms_sources Dictionaries/DictionaryFactory.cpp Dictionaries/DictionarySourceFactory.cpp Dictionaries/DictionaryStructure.cpp Dictionaries/getDictionaryConfigurationFromAST.cpp)
list (APPEND dbms_headers Dictionaries/DictionaryFactory.h Dictionaries/DictionarySourceFactory.h Dictionaries/DictionaryStructure.h Dictionaries/getDictionaryConfigurationFromAST.h)
list (APPEND dbms_sources
TableFunctions/ITableFunction.cpp
TableFunctions/TableFunctionView.cpp
TableFunctions/TableFunctionFactory.cpp)
list (APPEND dbms_headers
TableFunctions/ITableFunction.h
TableFunctions/TableFunctionView.h
TableFunctions/TableFunctionFactory.h)
list (APPEND dbms_sources
Dictionaries/DictionaryFactory.cpp
Dictionaries/DictionarySourceFactory.cpp
Dictionaries/DictionaryStructure.cpp
Dictionaries/getDictionaryConfigurationFromAST.cpp)
list (APPEND dbms_headers
Dictionaries/DictionaryFactory.h
Dictionaries/DictionarySourceFactory.h
Dictionaries/DictionaryStructure.h
Dictionaries/getDictionaryConfigurationFromAST.h)
if (NOT ENABLE_SSL)
list (REMOVE_ITEM clickhouse_common_io_sources Common/OpenSSLHelpers.cpp)
@ -253,18 +267,16 @@ if (TARGET ch_contrib::nuraft)
add_object_library(clickhouse_coordination Coordination)
endif()
set (DBMS_COMMON_LIBRARIES)
if (USE_STATIC_LIBRARIES OR NOT SPLIT_SHARED_LIBRARIES)
add_library (dbms STATIC ${dbms_headers} ${dbms_sources})
target_link_libraries (dbms PRIVATE ch_contrib::libdivide ${DBMS_COMMON_LIBRARIES})
target_link_libraries (dbms PRIVATE ch_contrib::libdivide)
if (TARGET ch_contrib::jemalloc)
target_link_libraries (dbms PRIVATE ch_contrib::jemalloc)
endif()
set (all_modules dbms)
else()
add_library (dbms SHARED ${dbms_headers} ${dbms_sources})
target_link_libraries (dbms PUBLIC ${all_modules} ${DBMS_COMMON_LIBRARIES})
target_link_libraries (dbms PUBLIC ${all_modules})
target_link_libraries (clickhouse_interpreters PRIVATE ch_contrib::libdivide)
if (TARGET ch_contrib::jemalloc)
target_link_libraries (clickhouse_interpreters PRIVATE ch_contrib::jemalloc)

View File

@ -873,7 +873,7 @@ void ClientBase::onProfileEvents(Block & block)
if (rows == 0)
return;
if (server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
if (getName() == "local" || server_revision >= DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
{
const auto & array_thread_id = typeid_cast<const ColumnUInt64 &>(*block.getByName("thread_id").column).getData();
const auto & names = typeid_cast<const ColumnString &>(*block.getByName("name").column);

View File

@ -219,6 +219,7 @@ protected:
ProgressIndication progress_indication;
bool need_render_progress = true;
bool need_render_profile_events = true;
bool written_first_block = false;
size_t processed_rows = 0; /// How many rows have been read or written.

View File

@ -6,6 +6,8 @@
#include <Processors/Executors/PushingAsyncPipelineExecutor.h>
#include <Storages/IStorage.h>
#include <Core/Protocol.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
namespace DB
@ -18,10 +20,12 @@ namespace ErrorCodes
extern const int NOT_IMPLEMENTED;
}
LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_)
LocalConnection::LocalConnection(ContextPtr context_, bool send_progress_, bool send_profile_events_, const String & server_display_name_)
: WithContext(context_)
, session(getContext(), ClientInfo::Interface::LOCAL)
, send_progress(send_progress_)
, send_profile_events(send_profile_events_)
, server_display_name(server_display_name_)
{
/// Authenticate and create a context to execute queries.
session.authenticate("default", "", Poco::Net::SocketAddress{});
@ -58,6 +62,11 @@ void LocalConnection::updateProgress(const Progress & value)
state->progress.incrementPiecewiseAtomically(value);
}
void LocalConnection::getProfileEvents(Block & block)
{
ProfileEvents::getProfileEvents(server_display_name, state->profile_queue, block, last_sent_snapshots);
}
void LocalConnection::sendQuery(
const ConnectionTimeouts &,
const String & query,
@ -77,18 +86,23 @@ void LocalConnection::sendQuery(
if (!current_database.empty())
query_context->setCurrentDatabase(current_database);
CurrentThread::QueryScope query_scope_holder(query_context);
state.reset();
state.emplace();
state->query_id = query_id;
state->query = query;
state->query_scope_holder = std::make_unique<CurrentThread::QueryScope>(query_context);
state->stage = QueryProcessingStage::Enum(stage);
state->profile_queue = std::make_shared<InternalProfileEventsQueue>(std::numeric_limits<int>::max());
CurrentThread::attachInternalProfileEventsQueue(state->profile_queue);
if (send_progress)
state->after_send_progress.restart();
if (send_profile_events)
state->after_send_profile_events.restart();
next_packet_type.reset();
try
@ -231,6 +245,16 @@ bool LocalConnection::poll(size_t)
return true;
}
if (send_profile_events && (state->after_send_profile_events.elapsedMicroseconds() >= query_context->getSettingsRef().interactive_delay))
{
Block block;
state->after_send_profile_events.restart();
next_packet_type = Protocol::Server::ProfileEvents;
getProfileEvents(block);
state->block.emplace(std::move(block));
return true;
}
try
{
pollImpl();
@ -459,9 +483,14 @@ void LocalConnection::sendMergeTreeReadTaskResponse(const PartitionReadResponse
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Not implemented");
}
ServerConnectionPtr LocalConnection::createConnection(const ConnectionParameters &, ContextPtr current_context, bool send_progress)
ServerConnectionPtr LocalConnection::createConnection(
const ConnectionParameters &,
ContextPtr current_context,
bool send_progress,
bool send_profile_events,
const String & server_display_name)
{
return std::make_unique<LocalConnection>(current_context, send_progress);
return std::make_unique<LocalConnection>(current_context, send_progress, send_profile_events, server_display_name);
}

View File

@ -5,6 +5,7 @@
#include <QueryPipeline/BlockIO.h>
#include <IO/TimeoutSetter.h>
#include <Interpreters/Session.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Storages/ColumnsDescription.h>
@ -29,6 +30,7 @@ struct LocalQueryState
std::unique_ptr<PullingAsyncPipelineExecutor> executor;
std::unique_ptr<PushingPipelineExecutor> pushing_executor;
std::unique_ptr<PushingAsyncPipelineExecutor> pushing_async_executor;
InternalProfileEventsQueuePtr profile_queue;
std::optional<Exception> exception;
@ -50,19 +52,28 @@ struct LocalQueryState
Progress progress;
/// Time after the last check to stop the request and send the progress.
Stopwatch after_send_progress;
Stopwatch after_send_profile_events;
std::unique_ptr<CurrentThread::QueryScope> query_scope_holder;
};
class LocalConnection : public IServerConnection, WithContext
{
public:
explicit LocalConnection(ContextPtr context_, bool send_progress_ = false);
explicit LocalConnection(
ContextPtr context_, bool send_progress_ = false, bool send_profile_events_ = false, const String & server_display_name_ = "");
~LocalConnection() override;
IServerConnection::Type getConnectionType() const override { return IServerConnection::Type::LOCAL; }
static ServerConnectionPtr createConnection(const ConnectionParameters & connection_parameters, ContextPtr current_context, bool send_progress = false);
static ServerConnectionPtr createConnection(
const ConnectionParameters & connection_parameters,
ContextPtr current_context,
bool send_progress = false,
bool send_profile_events = false,
const String & server_display_name = "");
void setDefaultDatabase(const String & database) override;
@ -129,12 +140,16 @@ private:
void updateProgress(const Progress & value);
void getProfileEvents(Block & block);
bool pollImpl();
ContextMutablePtr query_context;
Session session;
bool send_progress;
bool send_profile_events;
String server_display_name;
String description = "clickhouse-local";
std::optional<LocalQueryState> state;
@ -144,5 +159,7 @@ private:
std::optional<UInt64> next_packet_type;
String current_database;
ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots;
};
}

View File

@ -133,7 +133,12 @@ void MultiplexedConnections::sendQuery(
modified_settings.group_by_two_level_threshold_bytes = 0;
}
if (settings.max_parallel_replicas > 1 && settings.allow_experimental_parallel_reading_from_replicas)
bool parallel_reading_from_replicas = settings.max_parallel_replicas > 1
&& settings.allow_experimental_parallel_reading_from_replicas
/// To avoid trying to coordinate with clickhouse-benchmark,
/// since it uses the same code.
&& client_info.query_kind != ClientInfo::QueryKind::INITIAL_QUERY;
if (parallel_reading_from_replicas)
{
client_info.collaborate_with_initiator = true;
client_info.count_participating_replicas = replica_info.all_replicas_count;

View File

@ -103,6 +103,7 @@ void Suggest::load(ContextPtr context, const ConnectionParameters & connection_p
{
loading_thread = std::thread([context=Context::createCopy(context), connection_parameters, suggestion_limit, this]
{
ThreadStatus thread_status;
for (size_t retry = 0; retry < 10; ++retry)
{
try

View File

@ -664,6 +664,10 @@ void ConfigProcessor::savePreprocessedConfig(const LoadedConfig & loaded_config,
new_path.erase(0, main_config_path.size());
std::replace(new_path.begin(), new_path.end(), '/', '_');
/// If we have config file in YAML format, the preprocessed config will inherit .yaml extension
/// but will contain config in XML format, so some tools like clickhouse extract-from-config won't work
new_path = fs::path(new_path).replace_extension(".xml").string();
if (preprocessed_dir.empty())
{
if (!loaded_config.configuration->has("path"))

View File

@ -9,6 +9,7 @@
#include <boost/noncopyable.hpp>
#include <base/strong_typedef.h>
#include <base/getPageSize.h>
#include <Common/Allocator.h>
#include <Common/Exception.h>
@ -196,7 +197,7 @@ protected:
/// The operation is slow and performed only for debug builds.
void protectImpl(int prot)
{
static constexpr size_t PROTECT_PAGE_SIZE = 4096;
static size_t PROTECT_PAGE_SIZE = ::getPageSize();
char * left_rounded_up = reinterpret_cast<char *>((reinterpret_cast<intptr_t>(c_start) - pad_left + PROTECT_PAGE_SIZE - 1) / PROTECT_PAGE_SIZE * PROTECT_PAGE_SIZE);
char * right_rounded_down = reinterpret_cast<char *>((reinterpret_cast<intptr_t>(c_end_of_storage) + pad_right) / PROTECT_PAGE_SIZE * PROTECT_PAGE_SIZE);

View File

@ -202,7 +202,7 @@ void print(IFourLetterCommand::StringBuffer & buf, const String & key, uint64_t
String MonitorCommand::run()
{
KeeperConnectionStats stats = keeper_dispatcher.getKeeperConnectionStats();
auto & stats = keeper_dispatcher.getKeeperConnectionStats();
Keeper4LWInfo keeper_info = keeper_dispatcher.getKeeper4LWInfo();
if (!keeper_info.has_leader)
@ -288,7 +288,7 @@ String ServerStatCommand::run()
writeText('\n', buf);
};
KeeperConnectionStats stats = keeper_dispatcher.getKeeperConnectionStats();
auto & stats = keeper_dispatcher.getKeeperConnectionStats();
Keeper4LWInfo keeper_info = keeper_dispatcher.getKeeper4LWInfo();
write("ClickHouse Keeper version", String(VERSION_DESCRIBE) + "-" + VERSION_GITHASH);
@ -314,7 +314,7 @@ String StatCommand::run()
auto write = [&buf] (const String & key, const String & value) { buf << key << ": " << value << '\n'; };
KeeperConnectionStats stats = keeper_dispatcher.getKeeperConnectionStats();
auto & stats = keeper_dispatcher.getKeeperConnectionStats();
Keeper4LWInfo keeper_info = keeper_dispatcher.getKeeper4LWInfo();
write("ClickHouse Keeper version", String(VERSION_DESCRIBE) + "-" + VERSION_GITHASH);

View File

@ -1,3 +1,4 @@
#include <atomic>
#include <Coordination/KeeperConnectionStats.h>
namespace DB
@ -5,61 +6,58 @@ namespace DB
uint64_t KeeperConnectionStats::getMinLatency() const
{
return min_latency;
return min_latency.load(std::memory_order_relaxed);
}
uint64_t KeeperConnectionStats::getMaxLatency() const
{
return max_latency;
return max_latency.load(std::memory_order_relaxed);
}
uint64_t KeeperConnectionStats::getAvgLatency() const
{
if (count != 0)
return total_latency / count;
auto cnt = count.load(std::memory_order_relaxed);
if (cnt)
return total_latency.load(std::memory_order_relaxed) / cnt;
return 0;
}
uint64_t KeeperConnectionStats::getLastLatency() const
{
return last_latency;
return last_latency.load(std::memory_order_relaxed);
}
uint64_t KeeperConnectionStats::getPacketsReceived() const
{
return packets_received;
return packets_received.load(std::memory_order_relaxed);
}
uint64_t KeeperConnectionStats::getPacketsSent() const
{
return packets_sent;
return packets_sent.load(std::memory_order_relaxed);
}
void KeeperConnectionStats::incrementPacketsReceived()
{
packets_received++;
packets_received.fetch_add(1, std::memory_order_relaxed);
}
void KeeperConnectionStats::incrementPacketsSent()
{
packets_sent++;
packets_sent.fetch_add(1, std::memory_order_relaxed);
}
void KeeperConnectionStats::updateLatency(uint64_t latency_ms)
{
last_latency = latency_ms;
total_latency += (latency_ms);
count++;
last_latency.store(latency_ms, std::memory_order_relaxed);
total_latency.fetch_add(latency_ms, std::memory_order_relaxed);
count.fetch_add(1, std::memory_order_relaxed);
if (latency_ms < min_latency)
{
min_latency = latency_ms;
}
uint64_t prev_val = min_latency.load(std::memory_order_relaxed);
while (prev_val > latency_ms && !min_latency.compare_exchange_weak(prev_val, latency_ms, std::memory_order_relaxed)) {}
if (latency_ms > max_latency)
{
max_latency = latency_ms;
}
prev_val = max_latency.load(std::memory_order_relaxed);
while (prev_val < latency_ms && !max_latency.compare_exchange_weak(prev_val, latency_ms, std::memory_order_relaxed)) {}
}
void KeeperConnectionStats::reset()
@ -70,17 +68,17 @@ void KeeperConnectionStats::reset()
void KeeperConnectionStats::resetLatency()
{
total_latency = 0;
count = 0;
max_latency = 0;
min_latency = 0;
last_latency = 0;
total_latency.store(0, std::memory_order_relaxed);
count.store(0, std::memory_order_relaxed);
max_latency.store(0, std::memory_order_relaxed);
min_latency.store(0, std::memory_order_relaxed);
last_latency.store(0, std::memory_order_relaxed);
}
void KeeperConnectionStats::resetRequestCounters()
{
packets_received = 0;
packets_sent = 0;
packets_received.store(0, std::memory_order_relaxed);
packets_sent.store(0, std::memory_order_relaxed);
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <atomic>
#include <base/types.h>
#include <memory>
#include <cstdint>
@ -11,7 +12,10 @@ namespace DB
class KeeperConnectionStats
{
public:
KeeperConnectionStats() = default;
KeeperConnectionStats()
{
reset();
}
uint64_t getMinLatency() const;
uint64_t getMaxLatency() const;
@ -33,20 +37,20 @@ private:
void resetRequestCounters();
/// all response with watch response included
uint64_t packets_sent = 0;
std::atomic_uint64_t packets_sent;
/// All user requests
uint64_t packets_received = 0;
std::atomic_uint64_t packets_received;
/// For consistent with zookeeper measured by millisecond,
/// otherwise maybe microsecond is better
uint64_t total_latency = 0;
uint64_t max_latency = 0;
uint64_t min_latency = 0;
std::atomic_uint64_t total_latency;
std::atomic_uint64_t max_latency;
std::atomic_uint64_t min_latency;
/// last operation latency
uint64_t last_latency = 0;
std::atomic_uint64_t last_latency;
uint64_t count = 0;
std::atomic_uint64_t count;
};
}

View File

@ -594,7 +594,6 @@ void KeeperDispatcher::updateConfiguration(const Poco::Util::AbstractConfigurati
void KeeperDispatcher::updateKeeperStatLatency(uint64_t process_time_ms)
{
std::lock_guard lock(keeper_stats_mutex);
keeper_stats.updateLatency(process_time_ms);
}

View File

@ -68,7 +68,6 @@ private:
/// RAFT wrapper.
std::unique_ptr<KeeperServer> server;
mutable std::mutex keeper_stats_mutex;
KeeperConnectionStats keeper_stats;
KeeperConfigurationAndSettingsPtr configuration_and_settings;
@ -159,9 +158,8 @@ public:
uint64_t getSnapDirSize() const;
/// Request statistics such as qps, latency etc.
KeeperConnectionStats getKeeperConnectionStats() const
KeeperConnectionStats & getKeeperConnectionStats()
{
std::lock_guard lock(keeper_stats_mutex);
return keeper_stats;
}
@ -179,19 +177,16 @@ public:
void incrementPacketsSent()
{
std::lock_guard lock(keeper_stats_mutex);
keeper_stats.incrementPacketsSent();
}
void incrementPacketsReceived()
{
std::lock_guard lock(keeper_stats_mutex);
keeper_stats.incrementPacketsReceived();
}
void resetConnectionStats()
{
std::lock_guard lock(keeper_stats_mutex);
keeper_stats.reset();
}
};

View File

@ -39,7 +39,12 @@ namespace
request_for_session.request->xid = xid;
request_for_session.request->readImpl(buffer);
readIntBinary(request_for_session.time, buffer);
if (!buffer.eof())
readIntBinary(request_for_session.time, buffer);
else /// backward compatibility
request_for_session.time = std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::system_clock::now().time_since_epoch()).count();
return request_for_session;
}
}

View File

@ -3,6 +3,8 @@
#include <Coordination/Defines.h>
#include <Common/Exception.h>
#include <filesystem>
#include <Common/isLocalAddress.h>
#include <Common/DNSResolver.h>
namespace DB
{
@ -12,6 +14,70 @@ namespace ErrorCodes
extern const int RAFT_ERROR;
}
namespace
{
bool isLoopback(const std::string & hostname)
{
try
{
return DNSResolver::instance().resolveHost(hostname).isLoopback();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
return false;
}
bool isLocalhost(const std::string & hostname)
{
try
{
return isLocalAddress(DNSResolver::instance().resolveHost(hostname));
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
return false;
}
std::unordered_map<UInt64, std::string> getClientPorts(const Poco::Util::AbstractConfiguration & config)
{
static const char * config_port_names[] = {
"keeper_server.tcp_port",
"keeper_server.tcp_port_secure",
"interserver_http_port",
"interserver_https_port",
"tcp_port",
"tcp_with_proxy_port",
"tcp_port_secure",
"mysql_port",
"postgresql_port",
"grpc_port",
"prometheus.port",
};
std::unordered_map<UInt64, std::string> ports;
for (const auto & config_port_name : config_port_names)
{
if (config.has(config_port_name))
ports[config.getUInt64(config_port_name)] = config_port_name;
}
return ports;
}
}
/// this function quite long because contains a lot of sanity checks in config:
/// 1. No duplicate endpoints
/// 2. No "localhost" or "127.0.0.1" or another local addresses mixed with normal addresses
/// 3. Raft internal port is not equal to any other port for client
/// 4. No duplicate IDs
/// 5. Our ID present in hostnames list
KeeperStateManager::KeeperConfigurationWrapper KeeperStateManager::parseServersConfiguration(const Poco::Util::AbstractConfiguration & config, bool allow_without_us) const
{
KeeperConfigurationWrapper result;
@ -19,12 +85,17 @@ KeeperStateManager::KeeperConfigurationWrapper KeeperStateManager::parseServersC
Poco::Util::AbstractConfiguration::Keys keys;
config.keys(config_prefix + ".raft_configuration", keys);
auto client_ports = getClientPorts(config);
/// Sometimes (especially in cloud envs) users can provide incorrect
/// configuration with duplicated raft ids or endpoints. We check them
/// on config parsing stage and never commit to quorum.
std::unordered_map<std::string, int> check_duplicated_hostnames;
size_t total_servers = 0;
std::string loopback_hostname;
std::string non_local_hostname;
size_t local_address_counter = 0;
for (const auto & server_key : keys)
{
if (!startsWith(server_key, "server"))
@ -38,13 +109,33 @@ KeeperStateManager::KeeperConfigurationWrapper KeeperStateManager::parseServersC
int32_t priority = config.getInt(full_prefix + ".priority", 1);
bool start_as_follower = config.getBool(full_prefix + ".start_as_follower", false);
if (client_ports.count(port) != 0)
{
throw Exception(ErrorCodes::RAFT_ERROR, "Raft configuration contains hostname '{}' with port '{}' which is equal to '{}' in server configuration",
hostname, port, client_ports[port]);
}
if (isLoopback(hostname))
{
loopback_hostname = hostname;
local_address_counter++;
}
else if (isLocalhost(hostname))
{
local_address_counter++;
}
else
{
non_local_hostname = hostname;
}
if (start_as_follower)
result.servers_start_as_followers.insert(new_server_id);
auto endpoint = hostname + ":" + std::to_string(port);
if (check_duplicated_hostnames.count(endpoint))
{
throw Exception(ErrorCodes::RAFT_ERROR, "Raft config contain duplicate endpoints: "
throw Exception(ErrorCodes::RAFT_ERROR, "Raft config contains duplicate endpoints: "
"endpoint {} has been already added with id {}, but going to add it one more time with id {}",
endpoint, check_duplicated_hostnames[endpoint], new_server_id);
}
@ -54,7 +145,7 @@ KeeperStateManager::KeeperConfigurationWrapper KeeperStateManager::parseServersC
for (const auto & [id_endpoint, id] : check_duplicated_hostnames)
{
if (new_server_id == id)
throw Exception(ErrorCodes::RAFT_ERROR, "Raft config contain duplicate ids: id {} has been already added with endpoint {}, "
throw Exception(ErrorCodes::RAFT_ERROR, "Raft config contains duplicate ids: id {} has been already added with endpoint {}, "
"but going to add it one more time with endpoint {}", id, id_endpoint, endpoint);
}
check_duplicated_hostnames.emplace(endpoint, new_server_id);
@ -77,6 +168,24 @@ KeeperStateManager::KeeperConfigurationWrapper KeeperStateManager::parseServersC
if (result.servers_start_as_followers.size() == total_servers)
throw Exception(ErrorCodes::RAFT_ERROR, "At least one of servers should be able to start as leader (without <start_as_follower>)");
if (!loopback_hostname.empty() && !non_local_hostname.empty())
{
throw Exception(
ErrorCodes::RAFT_ERROR,
"Mixing loopback and non-local hostnames ('{}' and '{}') in raft_configuration is not allowed. "
"Different hosts can resolve it to themselves so it's not allowed.",
loopback_hostname, non_local_hostname);
}
if (!non_local_hostname.empty() && local_address_counter > 1)
{
throw Exception(
ErrorCodes::RAFT_ERROR,
"Local address specified more than once ({} times) and non-local hostnames also exists ('{}') in raft_configuration. "
"Such configuration is not allowed because single host can vote multiple times.",
local_address_counter, non_local_hostname);
}
return result;
}

View File

@ -8,7 +8,6 @@
#define DBMS_MIN_REVISION_WITH_SERVER_DISPLAY_NAME 54372
#define DBMS_MIN_REVISION_WITH_VERSION_PATCH 54401
#define DBMS_MIN_REVISION_WITH_SERVER_LOGS 54406
#define DBMS_MIN_REVISION_WITH_CLIENT_SUPPORT_EMBEDDED_DATA 54415
/// Minimum revision with exactly the same set of aggregation methods and rules to select them.
/// Two-level (bucketed) aggregation is incompatible if servers are inconsistent in these rules
/// (keys will be placed in different buckets and result will not be fully aggregated).

View File

@ -30,7 +30,7 @@ namespace ErrorCodes
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password", "db", "database", "table",
"update_field", "update_tag", "invalidate_query", "query", "where", "name", "secure"};
"update_field", "update_lag", "invalidate_query", "query", "where", "name", "secure"};
namespace
{

View File

@ -34,7 +34,7 @@ static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password",
"db", "database", "table", "schema",
"update_field", "invalidate_query", "priority",
"update_tag", "dont_check_update_time",
"update_lag", "dont_check_update_time",
"query", "where", "name" /* name_collection */, "socket",
"share_connection", "fail_on_connection_loss", "close_connection",
"ssl_ca", "ssl_cert", "ssl_key",

View File

@ -30,7 +30,7 @@ static const UInt64 max_block_size = 8192;
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password", "db", "database", "table", "schema",
"update_field", "update_tag", "invalidate_query", "query", "where", "name", "priority"};
"update_field", "update_lag", "invalidate_query", "query", "where", "name", "priority"};
namespace
{

View File

@ -6,6 +6,7 @@
#include <Interpreters/Context.h>
#include <Common/filesystemHelpers.h>
#include <Common/quoteString.h>
#include <Common/renameat2.h>
#include <IO/createReadBufferFromFileBase.h>
#include <fstream>
@ -325,7 +326,7 @@ DiskDirectoryIteratorPtr DiskLocal::iterateDirectory(const String & path)
void DiskLocal::moveFile(const String & from_path, const String & to_path)
{
fs::rename(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path);
renameNoReplace(fs::path(disk_path) / from_path, fs::path(disk_path) / to_path);
}
void DiskLocal::replaceFile(const String & from_path, const String & to_path)

View File

@ -286,7 +286,7 @@ std::unique_ptr<WriteBufferFromFileBase> DiskS3::writeFile(const String & path,
settings->s3_upload_part_size_multiply_parts_count_threshold,
settings->s3_max_single_part_upload_size,
std::move(object_metadata),
buf_size /*, std::move(schedule) */);
buf_size, std::move(schedule));
auto create_metadata_callback = [this, path, blob_name, mode] (size_t count)
{

View File

@ -267,7 +267,7 @@ public:
*/
virtual Monotonicity getMonotonicityForRange(const IDataType & /*type*/, const Field & /*left*/, const Field & /*right*/) const
{
throw Exception("Function " + getName() + " has no information about its monotonicity.", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Function " + getName() + " has no information about its monotonicity", ErrorCodes::NOT_IMPLEMENTED);
}
};
@ -452,7 +452,7 @@ public:
using Monotonicity = IFunctionBase::Monotonicity;
virtual Monotonicity getMonotonicityForRange(const IDataType & /*type*/, const Field & /*left*/, const Field & /*right*/) const
{
throw Exception("Function " + getName() + " has no information about its monotonicity.", ErrorCodes::NOT_IMPLEMENTED);
throw Exception("Function " + getName() + " has no information about its monotonicity", ErrorCodes::NOT_IMPLEMENTED);
}
/// For non-variadic functions, return number of arguments; otherwise return zero (that should be ignored).

View File

@ -207,7 +207,6 @@ public:
using ShardsInfo = std::vector<ShardInfo>;
String getHashOfAddresses() const { return hash_of_addresses; }
const ShardsInfo & getShardsInfo() const { return shards_info; }
const AddressesWithFailover & getShardsAddresses() const { return addresses_with_failover; }
@ -263,7 +262,6 @@ private:
/// Inter-server secret
String secret;
String hash_of_addresses;
/// Description of the cluster shards.
ShardsInfo shards_info;
/// Any remote shard.

View File

@ -116,7 +116,7 @@ void executeQuery(
const Settings & settings = context->getSettingsRef();
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth)
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
std::vector<QueryPlanPtr> plans;

View File

@ -179,10 +179,10 @@ struct ContextSharedPart
mutable VolumePtr backups_volume; /// Volume for all the backups.
mutable std::optional<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::optional<ExternalDictionariesLoader> external_dictionaries_loader;
mutable std::optional<ExternalUserDefinedExecutableFunctionsLoader> external_user_defined_executable_functions_loader;
mutable std::optional<ExternalModelsLoader> external_models_loader;
mutable std::unique_ptr<EmbeddedDictionaries> embedded_dictionaries; /// Metrica's dictionaries. Have lazy initialization.
mutable std::unique_ptr<ExternalDictionariesLoader> external_dictionaries_loader;
mutable std::unique_ptr<ExternalUserDefinedExecutableFunctionsLoader> external_user_defined_executable_functions_loader;
mutable std::unique_ptr<ExternalModelsLoader> external_models_loader;
ExternalLoaderXMLConfigRepository * external_models_config_repository = nullptr;
scope_guard models_repository_guard;
@ -214,10 +214,10 @@ struct ContextSharedPart
ConfigurationPtr users_config; /// Config with the users, profiles and quotas sections.
InterserverIOHandler interserver_io_handler; /// Handler for interserver communication.
mutable std::optional<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
mutable std::optional<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
mutable std::optional<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
mutable std::optional<BackgroundSchedulePool> message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used for message brokers, like RabbitMQ and Kafka)
mutable std::unique_ptr<BackgroundSchedulePool> buffer_flush_schedule_pool; /// A thread pool that can do background flush for Buffer tables.
mutable std::unique_ptr<BackgroundSchedulePool> schedule_pool; /// A thread pool that can run different jobs in background (used in replicated tables)
mutable std::unique_ptr<BackgroundSchedulePool> distributed_schedule_pool; /// A thread pool that can run different jobs in background (used for distributed sends)
mutable std::unique_ptr<BackgroundSchedulePool> message_broker_schedule_pool; /// A thread pool that can run different jobs in background (used for message brokers, like RabbitMQ and Kafka)
mutable ThrottlerPtr replicated_fetches_throttler; /// A server-wide throttler for replicated fetches
mutable ThrottlerPtr replicated_sends_throttler; /// A server-wide throttler for replicated sends
@ -344,12 +344,23 @@ struct ContextSharedPart
common_executor->wait();
std::unique_ptr<SystemLogs> delete_system_logs;
std::unique_ptr<EmbeddedDictionaries> delete_embedded_dictionaries;
std::unique_ptr<ExternalDictionariesLoader> delete_external_dictionaries_loader;
std::unique_ptr<ExternalUserDefinedExecutableFunctionsLoader> delete_external_user_defined_executable_functions_loader;
std::unique_ptr<ExternalModelsLoader> delete_external_models_loader;
std::unique_ptr<BackgroundSchedulePool> delete_buffer_flush_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_distributed_schedule_pool;
std::unique_ptr<BackgroundSchedulePool> delete_message_broker_schedule_pool;
std::unique_ptr<DDLWorker> delete_ddl_worker;
std::unique_ptr<AccessControl> delete_access_control;
{
auto lock = std::lock_guard(mutex);
/** Compiled expressions stored in cache need to be destroyed before destruction of static objects.
* Because CHJIT instance can be static object.
*/
/** Compiled expressions stored in cache need to be destroyed before destruction of static objects.
* Because CHJIT instance can be static object.
*/
#if USE_EMBEDDED_COMPILER
if (auto * cache = CompiledExpressionCacheFactory::instance().tryGetCache())
cache->reset();
@ -369,19 +380,19 @@ struct ContextSharedPart
/// but at least they can be preserved for storage termination.
dictionaries_xmls.reset();
user_defined_executable_functions_xmls.reset();
models_repository_guard.reset();
delete_system_logs = std::move(system_logs);
embedded_dictionaries.reset();
external_dictionaries_loader.reset();
external_user_defined_executable_functions_loader.reset();
models_repository_guard.reset();
external_models_loader.reset();
buffer_flush_schedule_pool.reset();
schedule_pool.reset();
distributed_schedule_pool.reset();
message_broker_schedule_pool.reset();
ddl_worker.reset();
access_control.reset();
delete_embedded_dictionaries = std::move(embedded_dictionaries);
delete_external_dictionaries_loader = std::move(external_dictionaries_loader);
delete_external_user_defined_executable_functions_loader = std::move(external_user_defined_executable_functions_loader);
delete_external_models_loader = std::move(external_models_loader);
delete_buffer_flush_schedule_pool = std::move(buffer_flush_schedule_pool);
delete_schedule_pool = std::move(schedule_pool);
delete_distributed_schedule_pool = std::move(distributed_schedule_pool);
delete_message_broker_schedule_pool = std::move(message_broker_schedule_pool);
delete_ddl_worker = std::move(ddl_worker);
delete_access_control = std::move(access_control);
/// Stop trace collector if any
trace_collector.reset();
@ -391,6 +402,17 @@ struct ContextSharedPart
/// Can be removed w/o context lock
delete_system_logs.reset();
delete_embedded_dictionaries.reset();
delete_external_dictionaries_loader.reset();
delete_external_user_defined_executable_functions_loader.reset();
delete_external_models_loader.reset();
delete_ddl_worker.reset();
delete_buffer_flush_schedule_pool.reset();
delete_schedule_pool.reset();
delete_distributed_schedule_pool.reset();
delete_message_broker_schedule_pool.reset();
delete_ddl_worker.reset();
delete_access_control.reset();
}
bool hasTraceCollector() const
@ -1365,7 +1387,8 @@ ExternalDictionariesLoader & Context::getExternalDictionariesLoader()
ExternalDictionariesLoader & Context::getExternalDictionariesLoaderUnlocked()
{
if (!shared->external_dictionaries_loader)
shared->external_dictionaries_loader.emplace(getGlobalContext());
shared->external_dictionaries_loader =
std::make_unique<ExternalDictionariesLoader>(getGlobalContext());
return *shared->external_dictionaries_loader;
}
@ -1383,7 +1406,8 @@ ExternalUserDefinedExecutableFunctionsLoader & Context::getExternalUserDefinedEx
ExternalUserDefinedExecutableFunctionsLoader & Context::getExternalUserDefinedExecutableFunctionsLoaderUnlocked()
{
if (!shared->external_user_defined_executable_functions_loader)
shared->external_user_defined_executable_functions_loader.emplace(getGlobalContext());
shared->external_user_defined_executable_functions_loader =
std::make_unique<ExternalUserDefinedExecutableFunctionsLoader>(getGlobalContext());
return *shared->external_user_defined_executable_functions_loader;
}
@ -1401,7 +1425,8 @@ ExternalModelsLoader & Context::getExternalModelsLoader()
ExternalModelsLoader & Context::getExternalModelsLoaderUnlocked()
{
if (!shared->external_models_loader)
shared->external_models_loader.emplace(getGlobalContext());
shared->external_models_loader =
std::make_unique<ExternalModelsLoader>(getGlobalContext());
return *shared->external_models_loader;
}
@ -1436,7 +1461,7 @@ EmbeddedDictionaries & Context::getEmbeddedDictionariesImpl(const bool throw_on_
{
auto geo_dictionaries_loader = std::make_unique<GeoDictionariesLoader>();
shared->embedded_dictionaries.emplace(
shared->embedded_dictionaries = std::make_unique<EmbeddedDictionaries>(
std::move(geo_dictionaries_loader),
getGlobalContext(),
throw_on_error);
@ -1695,7 +1720,7 @@ BackgroundSchedulePool & Context::getBufferFlushSchedulePool() const
{
auto lock = getLock();
if (!shared->buffer_flush_schedule_pool)
shared->buffer_flush_schedule_pool.emplace(
shared->buffer_flush_schedule_pool = std::make_unique<BackgroundSchedulePool>(
settings.background_buffer_flush_schedule_pool_size,
CurrentMetrics::BackgroundBufferFlushSchedulePoolTask,
"BgBufSchPool");
@ -1737,7 +1762,7 @@ BackgroundSchedulePool & Context::getSchedulePool() const
{
auto lock = getLock();
if (!shared->schedule_pool)
shared->schedule_pool.emplace(
shared->schedule_pool = std::make_unique<BackgroundSchedulePool>(
settings.background_schedule_pool_size,
CurrentMetrics::BackgroundSchedulePoolTask,
"BgSchPool");
@ -1748,7 +1773,7 @@ BackgroundSchedulePool & Context::getDistributedSchedulePool() const
{
auto lock = getLock();
if (!shared->distributed_schedule_pool)
shared->distributed_schedule_pool.emplace(
shared->distributed_schedule_pool = std::make_unique<BackgroundSchedulePool>(
settings.background_distributed_schedule_pool_size,
CurrentMetrics::BackgroundDistributedSchedulePoolTask,
"BgDistSchPool");
@ -1759,7 +1784,7 @@ BackgroundSchedulePool & Context::getMessageBrokerSchedulePool() const
{
auto lock = getLock();
if (!shared->message_broker_schedule_pool)
shared->message_broker_schedule_pool.emplace(
shared->message_broker_schedule_pool = std::make_unique<BackgroundSchedulePool>(
settings.background_message_broker_schedule_pool_size,
CurrentMetrics::BackgroundMessageBrokerSchedulePoolTask,
"BgMBSchPool");

View File

@ -1902,20 +1902,16 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
else if (interpreter_subquery)
{
/// Subquery.
/// If we need less number of columns that subquery have - update the interpreter.
if (required_columns.size() < source_header.columns())
{
ASTPtr subquery = extractTableExpression(query, 0);
if (!subquery)
throw Exception("Subquery expected", ErrorCodes::LOGICAL_ERROR);
ASTPtr subquery = extractTableExpression(query, 0);
if (!subquery)
throw Exception("Subquery expected", ErrorCodes::LOGICAL_ERROR);
interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
subquery, getSubqueryContext(context),
options.copy().subquery().noModify(), required_columns);
interpreter_subquery = std::make_unique<InterpreterSelectWithUnionQuery>(
subquery, getSubqueryContext(context),
options.copy().subquery().noModify(), required_columns);
if (query_analyzer->hasAggregation())
interpreter_subquery->ignoreWithTotals();
}
if (query_analyzer->hasAggregation())
interpreter_subquery->ignoreWithTotals();
interpreter_subquery->buildQueryPlan(query_plan);
query_plan.addInterpreterContext(context);

View File

@ -208,8 +208,10 @@ Block InterpreterSelectWithUnionQuery::getCurrentChildResultHeader(const ASTPtr
if (ast_ptr_->as<ASTSelectWithUnionQuery>())
return InterpreterSelectWithUnionQuery(ast_ptr_, context, options.copy().analyze().noModify(), required_result_column_names)
.getSampleBlock();
else
else if (ast_ptr_->as<ASTSelectQuery>())
return InterpreterSelectQuery(ast_ptr_, context, options.copy().analyze().noModify()).getSampleBlock();
else
return InterpreterSelectIntersectExceptQuery(ast_ptr_, context, options.copy().analyze().noModify()).getSampleBlock();
}
std::unique_ptr<IInterpreterUnionOrSelectQuery>

View File

@ -183,7 +183,9 @@ std::unique_ptr<InterpreterSelectWithUnionQuery> JoinedTables::makeLeftTableSubq
{
if (!isLeftTableSubquery())
return {};
return std::make_unique<InterpreterSelectWithUnionQuery>(left_table_expression, context, select_options);
/// Only build dry_run interpreter during analysis. We will reconstruct the subquery interpreter during plan building.
return std::make_unique<InterpreterSelectWithUnionQuery>(left_table_expression, context, select_options.copy().analyze());
}
StoragePtr JoinedTables::getLeftTableStorage()

View File

@ -1,5 +1,7 @@
#include "ProfileEventsExt.h"
#include <Common/typeid_cast.h>
#include <Common/MemoryTracker.h>
#include <Common/CurrentThread.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnArray.h>
@ -36,7 +38,7 @@ void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column,
if (nonzero_only && 0 == value)
continue;
const char * desc = ProfileEvents::getName(event);
const char * desc = getName(event);
key_column.insertData(desc, strlen(desc));
value_column.insert(value);
size++;
@ -45,4 +47,133 @@ void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column,
offsets.push_back(offsets.back() + size);
}
/// Add records about provided non-zero ProfileEvents::Counters.
static void dumpProfileEvents(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name)
{
size_t rows = 0;
auto & name_column = columns[NAME_COLUMN_INDEX];
auto & value_column = columns[VALUE_COLUMN_INDEX];
for (Event event = 0; event < Counters::num_counters; ++event)
{
Int64 value = snapshot.counters[event];
if (value == 0)
continue;
const char * desc = getName(event);
name_column->insertData(desc, strlen(desc));
value_column->insert(value);
rows++;
}
// Fill the rest of the columns with data
for (size_t row = 0; row < rows; ++row)
{
size_t i = 0;
columns[i++]->insertData(host_name.data(), host_name.size());
columns[i++]->insert(UInt64(snapshot.current_time));
columns[i++]->insert(UInt64{snapshot.thread_id});
columns[i++]->insert(Type::INCREMENT);
}
}
static void dumpMemoryTracker(ProfileEventsSnapshot const & snapshot, DB::MutableColumns & columns, String const & host_name)
{
size_t i = 0;
columns[i++]->insertData(host_name.data(), host_name.size());
columns[i++]->insert(UInt64(snapshot.current_time));
columns[i++]->insert(UInt64{snapshot.thread_id});
columns[i++]->insert(Type::GAUGE);
columns[i++]->insertData(MemoryTracker::USAGE_EVENT_NAME, strlen(MemoryTracker::USAGE_EVENT_NAME));
columns[i++]->insert(snapshot.memory_usage);
}
void getProfileEvents(
const String & server_display_name,
DB::InternalProfileEventsQueuePtr profile_queue,
DB::Block & block,
ThreadIdToCountersSnapshot & last_sent_snapshots)
{
using namespace DB;
static const NamesAndTypesList column_names_and_types = {
{"host_name", std::make_shared<DataTypeString>()},
{"current_time", std::make_shared<DataTypeDateTime>()},
{"thread_id", std::make_shared<DataTypeUInt64>()},
{"type", TypeEnum},
{"name", std::make_shared<DataTypeString>()},
{"value", std::make_shared<DataTypeInt64>()},
};
ColumnsWithTypeAndName temp_columns;
for (auto const & name_and_type : column_names_and_types)
temp_columns.emplace_back(name_and_type.type, name_and_type.name);
block = std::move(temp_columns);
MutableColumns columns = block.mutateColumns();
auto thread_group = CurrentThread::getGroup();
auto const current_thread_id = CurrentThread::get().thread_id;
std::vector<ProfileEventsSnapshot> snapshots;
ThreadIdToCountersSnapshot new_snapshots;
ProfileEventsSnapshot group_snapshot;
{
auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads();
snapshots.reserve(stats.size());
for (auto & stat : stats)
{
auto const thread_id = stat.thread_id;
if (thread_id == current_thread_id)
continue;
auto current_time = time(nullptr);
auto previous_snapshot = last_sent_snapshots.find(thread_id);
auto increment =
previous_snapshot != last_sent_snapshots.end()
? CountersIncrement(stat.counters, previous_snapshot->second)
: CountersIncrement(stat.counters);
snapshots.push_back(ProfileEventsSnapshot{
thread_id,
std::move(increment),
stat.memory_usage,
current_time
});
new_snapshots[thread_id] = std::move(stat.counters);
}
group_snapshot.thread_id = 0;
group_snapshot.current_time = time(nullptr);
group_snapshot.memory_usage = thread_group->memory_tracker.get();
auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot();
auto prev_group_snapshot = last_sent_snapshots.find(0);
group_snapshot.counters =
prev_group_snapshot != last_sent_snapshots.end()
? CountersIncrement(group_counters, prev_group_snapshot->second)
: CountersIncrement(group_counters);
new_snapshots[0] = std::move(group_counters);
}
last_sent_snapshots = std::move(new_snapshots);
for (auto & snapshot : snapshots)
{
dumpProfileEvents(snapshot, columns, server_display_name);
dumpMemoryTracker(snapshot, columns, server_display_name);
}
dumpProfileEvents(group_snapshot, columns, server_display_name);
dumpMemoryTracker(group_snapshot, columns, server_display_name);
Block curr_block;
size_t rows = 0;
for (; profile_queue->tryPop(curr_block); ++rows)
{
auto curr_columns = curr_block.getColumns();
for (size_t j = 0; j < curr_columns.size(); ++j)
columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size());
}
bool empty = columns[0]->empty();
if (!empty)
block.setColumns(std::move(columns));
}
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <Common/ProfileEvents.h>
#include <Common/ThreadStatus.h>
#include <DataTypes/DataTypeEnum.h>
#include <Columns/IColumn.h>
@ -7,9 +8,28 @@
namespace ProfileEvents
{
constexpr size_t NAME_COLUMN_INDEX = 4;
constexpr size_t VALUE_COLUMN_INDEX = 5;
struct ProfileEventsSnapshot
{
UInt64 thread_id;
CountersIncrement counters;
Int64 memory_usage;
time_t current_time;
};
using ThreadIdToCountersSnapshot = std::unordered_map<UInt64, Counters::Snapshot>;
/// Dumps profile events to columns Map(String, UInt64)
void dumpToMapColumn(const Counters::Snapshot & counters, DB::IColumn * column, bool nonzero_only = true);
void getProfileEvents(
const String & server_display_name,
DB::InternalProfileEventsQueuePtr profile_queue,
DB::Block & block,
ThreadIdToCountersSnapshot & last_sent_snapshots);
/// This is for ProfileEvents packets.
enum Type : int8_t
{

View File

@ -445,7 +445,7 @@ MergeTreeSetIndex::MergeTreeSetIndex(const Columns & set_elements, std::vector<K
* 1: the intersection of the set and the range is non-empty
* 2: the range contains elements not in the set
*/
BoolMask MergeTreeSetIndex::checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types) const
BoolMask MergeTreeSetIndex::checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types, bool single_point) const
{
size_t tuple_size = indexes_mapping.size();
@ -468,7 +468,8 @@ BoolMask MergeTreeSetIndex::checkInRange(const std::vector<Range> & key_ranges,
std::optional<Range> new_range = KeyCondition::applyMonotonicFunctionsChainToRange(
key_ranges[indexes_mapping[i].key_index],
indexes_mapping[i].functions,
data_types[indexes_mapping[i].key_index]);
data_types[indexes_mapping[i].key_index],
single_point);
if (!new_range)
return {true, true};

View File

@ -214,7 +214,7 @@ public:
bool hasMonotonicFunctionsChain() const;
BoolMask checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types) const;
BoolMask checkInRange(const std::vector<Range> & key_ranges, const DataTypes & data_types, bool single_point = false) const;
private:
// If all arguments in tuple are key columns, we can optimize NOT IN when there is only one element.

View File

@ -442,9 +442,9 @@ namespace
pattern_list_args->children = {
std::make_shared<ASTLiteral>("^["),
to_remove,
std::make_shared<ASTLiteral>("]*|["),
std::make_shared<ASTLiteral>("]+|["),
to_remove,
std::make_shared<ASTLiteral>("]*$")
std::make_shared<ASTLiteral>("]+$")
};
func_name = "replaceRegexpAll";
}
@ -455,7 +455,7 @@ namespace
pattern_list_args->children = {
std::make_shared<ASTLiteral>("^["),
to_remove,
std::make_shared<ASTLiteral>("]*")
std::make_shared<ASTLiteral>("]+")
};
}
else
@ -464,7 +464,7 @@ namespace
pattern_list_args->children = {
std::make_shared<ASTLiteral>("["),
to_remove,
std::make_shared<ASTLiteral>("]*$")
std::make_shared<ASTLiteral>("]+$")
};
}
func_name = "replaceRegexpOne";

View File

@ -210,7 +210,7 @@ static Block adaptBlockStructure(const Block & block, const Block & header)
return res;
}
void RemoteQueryExecutor::sendQuery()
void RemoteQueryExecutor::sendQuery(ClientInfo::QueryKind query_kind)
{
if (sent_query)
return;
@ -237,13 +237,7 @@ void RemoteQueryExecutor::sendQuery()
auto timeouts = ConnectionTimeouts::getTCPTimeoutsWithFailover(settings);
ClientInfo modified_client_info = context->getClientInfo();
modified_client_info.query_kind = ClientInfo::QueryKind::SECONDARY_QUERY;
/// Set initial_query_id to query_id for the clickhouse-benchmark.
///
/// (since first query of clickhouse-benchmark will be issued as SECONDARY_QUERY,
/// due to it executes queries via RemoteBlockInputStream)
if (modified_client_info.initial_query_id.empty())
modified_client_info.initial_query_id = query_id;
modified_client_info.query_kind = query_kind;
if (CurrentThread::isInitialized())
{
modified_client_info.client_trace_context = CurrentThread::get().thread_trace_context;

View File

@ -83,7 +83,13 @@ public:
~RemoteQueryExecutor();
/// Create connection and send query, external tables and scalars.
void sendQuery();
///
/// @param query_kind - kind of query, usually it is SECONDARY_QUERY,
/// since this is the queries between servers
/// (for which this code was written in general).
/// But clickhouse-benchmark uses the same code,
/// and it should pass INITIAL_QUERY.
void sendQuery(ClientInfo::QueryKind query_kind = ClientInfo::QueryKind::SECONDARY_QUERY);
/// Query is resent to a replica, the query itself can be modified.
std::atomic<bool> resent_query { false };

View File

@ -544,19 +544,13 @@ std::pair<Coordination::OpNum, Coordination::XID> KeeperTCPHandler::receiveReque
void KeeperTCPHandler::packageSent()
{
{
std::lock_guard lock(conn_stats_mutex);
conn_stats.incrementPacketsSent();
}
conn_stats.incrementPacketsSent();
keeper_dispatcher->incrementPacketsSent();
}
void KeeperTCPHandler::packageReceived()
{
{
std::lock_guard lock(conn_stats_mutex);
conn_stats.incrementPacketsReceived();
}
conn_stats.incrementPacketsReceived();
keeper_dispatcher->incrementPacketsReceived();
}
@ -566,10 +560,7 @@ void KeeperTCPHandler::updateStats(Coordination::ZooKeeperResponsePtr & response
if (response->xid != Coordination::WATCH_XID && response->getOpNum() != Coordination::OpNum::Heartbeat)
{
Int64 elapsed = (Poco::Timestamp() - operations[response->xid]) / 1000;
{
std::lock_guard lock(conn_stats_mutex);
conn_stats.updateLatency(elapsed);
}
conn_stats.updateLatency(elapsed);
operations.erase(response->xid);
keeper_dispatcher->updateKeeperStatLatency(elapsed);
@ -584,15 +575,14 @@ void KeeperTCPHandler::updateStats(Coordination::ZooKeeperResponsePtr & response
}
KeeperConnectionStats KeeperTCPHandler::getConnectionStats() const
KeeperConnectionStats & KeeperTCPHandler::getConnectionStats()
{
std::lock_guard lock(conn_stats_mutex);
return conn_stats;
}
void KeeperTCPHandler::dumpStats(WriteBufferFromOwnString & buf, bool brief)
{
KeeperConnectionStats stats = getConnectionStats();
auto & stats = getConnectionStats();
writeText(' ', buf);
writeText(socket().peerAddress().toString(), buf);
@ -641,10 +631,7 @@ void KeeperTCPHandler::dumpStats(WriteBufferFromOwnString & buf, bool brief)
void KeeperTCPHandler::resetStats()
{
{
std::lock_guard lock(conn_stats_mutex);
conn_stats.reset();
}
conn_stats.reset();
last_op.set(std::make_unique<LastOp>(EMPTY_LAST_OP));
}

View File

@ -51,7 +51,7 @@ public:
KeeperTCPHandler(IServer & server_, const Poco::Net::StreamSocket & socket_);
void run() override;
KeeperConnectionStats getConnectionStats() const;
KeeperConnectionStats & getConnectionStats();
void dumpStats(WriteBufferFromOwnString & buf, bool brief);
void resetStats();
@ -100,7 +100,6 @@ private:
LastOpMultiVersion last_op;
mutable std::mutex conn_stats_mutex;
KeeperConnectionStats conn_stats;
};

View File

@ -31,7 +31,6 @@
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/OpenTelemetrySpanLog.h>
#include <Interpreters/Session.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Server/TCPServer.h>
#include <Storages/StorageReplicatedMergeTree.h>
#include <Storages/MergeTree/MergeTreeDataPartUUID.h>
@ -853,163 +852,15 @@ void TCPHandler::sendExtremes(const Block & extremes)
}
}
namespace
{
using namespace ProfileEvents;
constexpr size_t NAME_COLUMN_INDEX = 4;
constexpr size_t VALUE_COLUMN_INDEX = 5;
struct ProfileEventsSnapshot
{
UInt64 thread_id;
ProfileEvents::CountersIncrement counters;
Int64 memory_usage;
time_t current_time;
};
/*
* Add records about provided non-zero ProfileEvents::Counters.
*/
void dumpProfileEvents(
ProfileEventsSnapshot const & snapshot,
MutableColumns & columns,
String const & host_name)
{
size_t rows = 0;
auto & name_column = columns[NAME_COLUMN_INDEX];
auto & value_column = columns[VALUE_COLUMN_INDEX];
for (ProfileEvents::Event event = 0; event < ProfileEvents::Counters::num_counters; ++event)
{
Int64 value = snapshot.counters[event];
if (value == 0)
continue;
const char * desc = ProfileEvents::getName(event);
name_column->insertData(desc, strlen(desc));
value_column->insert(value);
rows++;
}
// Fill the rest of the columns with data
for (size_t row = 0; row < rows; ++row)
{
size_t i = 0;
columns[i++]->insertData(host_name.data(), host_name.size());
columns[i++]->insert(UInt64(snapshot.current_time));
columns[i++]->insert(UInt64{snapshot.thread_id});
columns[i++]->insert(ProfileEvents::Type::INCREMENT);
}
}
void dumpMemoryTracker(
ProfileEventsSnapshot const & snapshot,
MutableColumns & columns,
String const & host_name)
{
{
size_t i = 0;
columns[i++]->insertData(host_name.data(), host_name.size());
columns[i++]->insert(UInt64(snapshot.current_time));
columns[i++]->insert(UInt64{snapshot.thread_id});
columns[i++]->insert(ProfileEvents::Type::GAUGE);
columns[i++]->insertData(MemoryTracker::USAGE_EVENT_NAME, strlen(MemoryTracker::USAGE_EVENT_NAME));
columns[i++]->insert(snapshot.memory_usage);
}
}
}
void TCPHandler::sendProfileEvents()
{
if (client_tcp_protocol_version < DBMS_MIN_PROTOCOL_VERSION_WITH_INCREMENTAL_PROFILE_EVENTS)
return;
NamesAndTypesList column_names_and_types = {
{ "host_name", std::make_shared<DataTypeString>() },
{ "current_time", std::make_shared<DataTypeDateTime>() },
{ "thread_id", std::make_shared<DataTypeUInt64>() },
{ "type", ProfileEvents::TypeEnum },
{ "name", std::make_shared<DataTypeString>() },
{ "value", std::make_shared<DataTypeInt64>() },
};
ColumnsWithTypeAndName temp_columns;
for (auto const & name_and_type : column_names_and_types)
temp_columns.emplace_back(name_and_type.type, name_and_type.name);
Block block(std::move(temp_columns));
MutableColumns columns = block.mutateColumns();
auto thread_group = CurrentThread::getGroup();
auto const current_thread_id = CurrentThread::get().thread_id;
std::vector<ProfileEventsSnapshot> snapshots;
ThreadIdToCountersSnapshot new_snapshots;
ProfileEventsSnapshot group_snapshot;
Block block;
ProfileEvents::getProfileEvents(server_display_name, state.profile_queue, block, last_sent_snapshots);
if (block.rows() != 0)
{
auto stats = thread_group->getProfileEventsCountersAndMemoryForThreads();
snapshots.reserve(stats.size());
for (auto & stat : stats)
{
auto const thread_id = stat.thread_id;
if (thread_id == current_thread_id)
continue;
auto current_time = time(nullptr);
auto previous_snapshot = last_sent_snapshots.find(thread_id);
auto increment =
previous_snapshot != last_sent_snapshots.end()
? CountersIncrement(stat.counters, previous_snapshot->second)
: CountersIncrement(stat.counters);
snapshots.push_back(ProfileEventsSnapshot{
thread_id,
std::move(increment),
stat.memory_usage,
current_time
});
new_snapshots[thread_id] = std::move(stat.counters);
}
group_snapshot.thread_id = 0;
group_snapshot.current_time = time(nullptr);
group_snapshot.memory_usage = thread_group->memory_tracker.get();
auto group_counters = thread_group->performance_counters.getPartiallyAtomicSnapshot();
auto prev_group_snapshot = last_sent_snapshots.find(0);
group_snapshot.counters =
prev_group_snapshot != last_sent_snapshots.end()
? CountersIncrement(group_counters, prev_group_snapshot->second)
: CountersIncrement(group_counters);
new_snapshots[0] = std::move(group_counters);
}
last_sent_snapshots = std::move(new_snapshots);
for (auto & snapshot : snapshots)
{
dumpProfileEvents(snapshot, columns, server_display_name);
dumpMemoryTracker(snapshot, columns, server_display_name);
}
dumpProfileEvents(group_snapshot, columns, server_display_name);
dumpMemoryTracker(group_snapshot, columns, server_display_name);
MutableColumns logs_columns;
Block curr_block;
size_t rows = 0;
for (; state.profile_queue->tryPop(curr_block); ++rows)
{
auto curr_columns = curr_block.getColumns();
for (size_t j = 0; j < curr_columns.size(); ++j)
columns[j]->insertRangeFrom(*curr_columns[j], 0, curr_columns[j]->size());
}
bool empty = columns[0]->empty();
if (!empty)
{
block.setColumns(std::move(columns));
initProfileEventsBlockOutput(block);
writeVarUInt(Protocol::Server::ProfileEvents, *out);

View File

@ -3,9 +3,10 @@
#include <Poco/Net/TCPServerConnection.h>
#include <base/getFQDNOrHostName.h>
#include "Common/ProfileEvents.h"
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Common/Stopwatch.h>
#include <Common/ThreadStatus.h>
#include <Core/Protocol.h>
#include <Core/QueryProcessingStage.h>
#include <IO/Progress.h>
@ -13,7 +14,7 @@
#include <QueryPipeline/BlockIO.h>
#include <Interpreters/InternalTextLogsQueue.h>
#include <Interpreters/Context_fwd.h>
#include <Formats/NativeReader.h>
#include <Interpreters/ProfileEventsExt.h>
#include <Storages/MergeTree/ParallelReplicasReadingCoordinator.h>
@ -36,6 +37,8 @@ struct Settings;
class ColumnsDescription;
struct ProfileInfo;
class TCPServer;
class NativeWriter;
class NativeReader;
/// State of query processing.
struct QueryState
@ -189,9 +192,7 @@ private:
CurrentMetrics::Increment metric_increment{CurrentMetrics::TCPConnection};
using ThreadIdToCountersSnapshot = std::unordered_map<UInt64, ProfileEvents::Counters::Snapshot>;
ThreadIdToCountersSnapshot last_sent_snapshots;
ProfileEvents::ThreadIdToCountersSnapshot last_sent_snapshots;
/// It is the name of the server that will be sent to the client.
String server_display_name;

View File

@ -126,7 +126,7 @@ DistributedSink::DistributedSink(
, log(&Poco::Logger::get("DistributedBlockOutputStream"))
{
const auto & settings = context->getSettingsRef();
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth > settings.max_distributed_depth)
if (settings.max_distributed_depth && context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
context->getClientInfo().distributed_depth += 1;
random_shard_insert = settings.insert_distributed_one_random_shard && !storage.has_sharding_key;

View File

@ -34,7 +34,7 @@ IMPLEMENT_SETTINGS_TRAITS(EmptySettingsTraits, EMPTY_SETTINGS)
static const std::unordered_set<std::string_view> dictionary_allowed_keys = {
"host", "port", "user", "password", "db",
"database", "table", "schema", "replica",
"update_field", "update_tag", "invalidate_query", "query",
"update_field", "update_lag", "invalidate_query", "query",
"where", "name", "secure", "uri", "collection"};

View File

@ -16,7 +16,7 @@ struct ExternalDataSourceConfiguration
{
String host;
UInt16 port = 0;
String username;
String username = "default";
String password;
String database;
String table;

View File

@ -53,6 +53,7 @@ StorageFileLog::StorageFileLog(
ContextPtr context_,
const ColumnsDescription & columns_,
const String & path_,
const String & metadata_base_path_,
const String & format_name_,
std::unique_ptr<FileLogSettings> settings,
const String & comment,
@ -61,6 +62,7 @@ StorageFileLog::StorageFileLog(
, WithContext(context_->getGlobalContext())
, filelog_settings(std::move(settings))
, path(path_)
, metadata_base_path(std::filesystem::path(metadata_base_path_) / "metadata")
, format_name(format_name_)
, log(&Poco::Logger::get("StorageFileLog (" + table_id_.table_name + ")"))
, milliseconds_to_wait(filelog_settings->poll_directory_watch_events_backoff_init.totalMilliseconds())
@ -94,18 +96,24 @@ StorageFileLog::StorageFileLog(
void StorageFileLog::loadMetaFiles(bool attach)
{
const auto & storage = getStorageID();
/// FIXME Why do we need separate directory? Why not to use data directory?
root_meta_path
= std::filesystem::path(getContext()->getPath()) / "stream_engines/filelog/" / DatabaseCatalog::getPathForUUID(storage.uuid);
/// Attach table
if (attach)
{
/// Meta file may lost, log and create directory
if (!std::filesystem::exists(root_meta_path))
const auto & storage = getStorageID();
auto metadata_path_exist = std::filesystem::exists(metadata_base_path);
auto previous_path = std::filesystem::path(getContext()->getPath()) / ".filelog_storage_metadata" / storage.getDatabaseName() / storage.getTableName();
/// For compatibility with the previous path version.
if (std::filesystem::exists(previous_path) && !metadata_path_exist)
{
/// Create root_meta_path directory when store meta data
std::filesystem::copy(previous_path, metadata_base_path, std::filesystem::copy_options::recursive);
std::filesystem::remove_all(previous_path);
}
/// Meta file may lost, log and create directory
else if (!metadata_path_exist)
{
/// Create metadata_base_path directory when store meta data
LOG_ERROR(log, "Metadata files of table {} are lost.", getStorageID().getTableName());
}
/// Load all meta info to file_infos;
@ -114,14 +122,14 @@ void StorageFileLog::loadMetaFiles(bool attach)
/// Create table, just create meta data directory
else
{
if (std::filesystem::exists(root_meta_path))
if (std::filesystem::exists(metadata_base_path))
{
throw Exception(
ErrorCodes::TABLE_METADATA_ALREADY_EXISTS,
"Metadata files already exist by path: {}, remove them manually if it is intended",
root_meta_path);
metadata_base_path);
}
/// We do not create the root_meta_path directory at creation time, create it at the moment of serializing
/// We do not create the metadata_base_path directory at creation time, create it at the moment of serializing
/// meta files, such that can avoid unnecessarily create this directory if create table failed.
}
}
@ -212,9 +220,9 @@ void StorageFileLog::loadFiles()
void StorageFileLog::serialize() const
{
if (!std::filesystem::exists(root_meta_path))
if (!std::filesystem::exists(metadata_base_path))
{
std::filesystem::create_directories(root_meta_path);
std::filesystem::create_directories(metadata_base_path);
}
for (const auto & [inode, meta] : file_infos.meta_by_inode)
{
@ -236,9 +244,9 @@ void StorageFileLog::serialize() const
void StorageFileLog::serialize(UInt64 inode, const FileMeta & file_meta) const
{
if (!std::filesystem::exists(root_meta_path))
if (!std::filesystem::exists(metadata_base_path))
{
std::filesystem::create_directories(root_meta_path);
std::filesystem::create_directories(metadata_base_path);
}
auto full_name = getFullMetaPath(file_meta.file_name);
if (!std::filesystem::exists(full_name))
@ -257,11 +265,11 @@ void StorageFileLog::serialize(UInt64 inode, const FileMeta & file_meta) const
void StorageFileLog::deserialize()
{
if (!std::filesystem::exists(root_meta_path))
if (!std::filesystem::exists(metadata_base_path))
return;
/// In case of single file (not a watched directory),
/// iterated directory always has one file inside.
for (const auto & dir_entry : std::filesystem::directory_iterator{root_meta_path})
for (const auto & dir_entry : std::filesystem::directory_iterator{metadata_base_path})
{
if (!dir_entry.is_regular_file())
{
@ -269,7 +277,7 @@ void StorageFileLog::deserialize()
ErrorCodes::BAD_FILE_TYPE,
"The file {} under {} is not a regular file when deserializing meta files",
dir_entry.path().c_str(),
root_meta_path);
metadata_base_path);
}
ReadBufferFromFile in(dir_entry.path().c_str());
@ -373,8 +381,8 @@ void StorageFileLog::drop()
{
try
{
if (std::filesystem::exists(root_meta_path))
std::filesystem::remove_all(root_meta_path);
if (std::filesystem::exists(metadata_base_path))
std::filesystem::remove_all(metadata_base_path);
}
catch (...)
{
@ -802,6 +810,7 @@ void registerStorageFileLog(StorageFactory & factory)
args.getContext(),
args.columns,
path,
args.relative_data_path,
format,
std::move(filelog_settings),
args.comment,

View File

@ -89,7 +89,7 @@ public:
auto & getFileInfos() { return file_infos; }
String getFullMetaPath(const String & file_name) const { return std::filesystem::path(root_meta_path) / file_name; }
String getFullMetaPath(const String & file_name) const { return std::filesystem::path(metadata_base_path) / file_name; }
String getFullDataPath(const String & file_name) const { return std::filesystem::path(root_data_path) / file_name; }
NamesAndTypesList getVirtuals() const override;
@ -131,6 +131,7 @@ protected:
ContextPtr context_,
const ColumnsDescription & columns_,
const String & path_,
const String & metadata_base_path_,
const String & format_name_,
std::unique_ptr<FileLogSettings> settings,
const String & comment,
@ -145,7 +146,7 @@ private:
/// If path argument of the table is a regular file, it equals to user_files_path
/// otherwise, it equals to user_files_path/ + path_argument/, e.g. path
String root_data_path;
String root_meta_path;
String metadata_base_path;
FileInfos file_infos;

View File

@ -1,3 +1,4 @@
#include <memory>
#include <Storages/Hive/HiveCommon.h>
#if USE_HIVE
@ -5,6 +6,7 @@
#include <thrift/protocol/TBinaryProtocol.h>
#include <thrift/transport/TBufferTransports.h>
#include <thrift/transport/TSocket.h>
#include <Storages/Hive/HiveFile.h>
namespace DB
@ -15,6 +17,18 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
static const unsigned max_hive_metastore_client_connections = 16;
static const int max_hive_metastore_client_retry = 3;
static const UInt64 get_hive_metastore_client_timeout = 1000000;
static const int hive_metastore_client_conn_timeout_ms = 10000;
static const int hive_metastore_client_recv_timeout_ms = 10000;
static const int hive_metastore_client_send_timeout_ms = 10000;
ThriftHiveMetastoreClientPool::ThriftHiveMetastoreClientPool(ThriftHiveMetastoreClientBuilder builder_)
: PoolBase<Object>(max_hive_metastore_client_connections, &Poco::Logger::get("ThriftHiveMetastoreClientPool")), builder(builder_)
{
}
bool HiveMetastoreClient::shouldUpdateTableMetadata(
const String & db_name, const String & table_name, const std::vector<Apache::Hadoop::Hive::Partition> & partitions)
{
@ -40,25 +54,42 @@ bool HiveMetastoreClient::shouldUpdateTableMetadata(
return false;
}
void HiveMetastoreClient::tryCallHiveClient(std::function<void(ThriftHiveMetastoreClientPool::Entry &)> func)
{
int i = 0;
String err_msg;
for (; i < max_hive_metastore_client_retry; ++i)
{
auto client = client_pool.get(get_hive_metastore_client_timeout);
try
{
func(client);
}
catch (apache::thrift::transport::TTransportException & e)
{
client.expire();
err_msg = e.what();
continue;
}
break;
}
if (i >= max_hive_metastore_client_retry)
throw Exception(ErrorCodes::NO_HIVEMETASTORE, "Hive Metastore expired because {}", err_msg);
}
HiveMetastoreClient::HiveTableMetadataPtr HiveMetastoreClient::getTableMetadata(const String & db_name, const String & table_name)
{
LOG_TRACE(log, "Get table metadata for {}.{}", db_name, table_name);
std::lock_guard lock{mutex};
auto table = std::make_shared<Apache::Hadoop::Hive::Table>();
std::vector<Apache::Hadoop::Hive::Partition> partitions;
try
auto client_call = [&](ThriftHiveMetastoreClientPool::Entry & client)
{
client->get_table(*table, db_name, table_name);
/// Query the latest partition info to check new change.
client->get_partitions(partitions, db_name, table_name, -1);
}
catch (apache::thrift::transport::TTransportException & e)
{
setExpired();
throw Exception(ErrorCodes::NO_HIVEMETASTORE, "Hive Metastore expired because {}", String(e.what()));
}
};
tryCallHiveClient(client_call);
bool update_cache = shouldUpdateTableMetadata(db_name, table_name, partitions);
String cache_key = getCacheKey(db_name, table_name);
@ -103,23 +134,26 @@ HiveMetastoreClient::HiveTableMetadataPtr HiveMetastoreClient::getTableMetadata(
return metadata;
}
std::shared_ptr<Apache::Hadoop::Hive::Table> HiveMetastoreClient::getHiveTable(const String & db_name, const String & table_name)
{
auto table = std::make_shared<Apache::Hadoop::Hive::Table>();
auto client_call = [&](ThriftHiveMetastoreClientPool::Entry & client)
{
client->get_table(*table, db_name, table_name);
};
tryCallHiveClient(client_call);
return table;
}
void HiveMetastoreClient::clearTableMetadata(const String & db_name, const String & table_name)
{
String cache_key = getCacheKey(db_name, table_name);
std::lock_guard lock{mutex};
HiveTableMetadataPtr metadata = table_metadata_cache.get(cache_key);
if (metadata)
table_metadata_cache.remove(cache_key);
}
void HiveMetastoreClient::setClient(std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> client_)
{
std::lock_guard lock{mutex};
client = client_;
clearExpired();
}
bool HiveMetastoreClient::PartitionInfo::haveSameParameters(const Apache::Hadoop::Hive::Partition & other) const
{
/// Parameters include keys:numRows,numFiles,rawDataSize,totalSize,transient_lastDdlTime
@ -192,53 +226,52 @@ HiveMetastoreClientFactory & HiveMetastoreClientFactory::instance()
return factory;
}
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace Apache::Hadoop::Hive;
HiveMetastoreClientPtr HiveMetastoreClientFactory::getOrCreate(const String & name, ContextPtr context)
{
using namespace apache::thrift;
using namespace apache::thrift::protocol;
using namespace apache::thrift::transport;
using namespace Apache::Hadoop::Hive;
std::lock_guard lock(mutex);
auto it = clients.find(name);
if (it == clients.end() || it->second->isExpired())
if (it == clients.end())
{
/// Connect to hive metastore
Poco::URI hive_metastore_url(name);
const auto & host = hive_metastore_url.getHost();
auto port = hive_metastore_url.getPort();
std::shared_ptr<TSocket> socket = std::make_shared<TSocket>(host, port);
socket->setKeepAlive(true);
socket->setConnTimeout(conn_timeout_ms);
socket->setRecvTimeout(recv_timeout_ms);
socket->setSendTimeout(send_timeout_ms);
std::shared_ptr<TTransport> transport(new TBufferedTransport(socket));
std::shared_ptr<TProtocol> protocol(new TBinaryProtocol(transport));
std::shared_ptr<ThriftHiveMetastoreClient> thrift_client = std::make_shared<ThriftHiveMetastoreClient>(protocol);
try
auto builder = [name]()
{
transport->open();
}
catch (TException & tx)
{
throw Exception("connect to hive metastore:" + name + " failed." + tx.what(), ErrorCodes::BAD_ARGUMENTS);
}
if (it == clients.end())
{
HiveMetastoreClientPtr client = std::make_shared<HiveMetastoreClient>(std::move(thrift_client), context);
clients[name] = client;
return client;
}
else
{
it->second->setClient(std::move(thrift_client));
return it->second;
}
return createThriftHiveMetastoreClient(name);
};
auto client = std::make_shared<HiveMetastoreClient>(builder, context->getGlobalContext());
clients[name] = client;
return client;
}
return it->second;
}
std::shared_ptr<ThriftHiveMetastoreClient> HiveMetastoreClientFactory::createThriftHiveMetastoreClient(const String &name)
{
Poco::URI hive_metastore_url(name);
const auto & host = hive_metastore_url.getHost();
auto port = hive_metastore_url.getPort();
std::shared_ptr<TSocket> socket = std::make_shared<TSocket>(host, port);
socket->setKeepAlive(true);
socket->setConnTimeout(hive_metastore_client_conn_timeout_ms);
socket->setRecvTimeout(hive_metastore_client_recv_timeout_ms);
socket->setSendTimeout(hive_metastore_client_send_timeout_ms);
std::shared_ptr<TTransport> transport = std::make_shared<TBufferedTransport>(socket);
std::shared_ptr<TProtocol> protocol = std::make_shared<TBinaryProtocol>(transport);
std::shared_ptr<ThriftHiveMetastoreClient> thrift_client = std::make_shared<ThriftHiveMetastoreClient>(protocol);
try
{
transport->open();
}
catch (TException & tx)
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "connect to hive metastore: {} failed. {}", name, tx.what());
}
return thrift_client;
}
}
#endif

View File

@ -1,5 +1,6 @@
#pragma once
#include <memory>
#include <Common/config.h>
#if USE_HIVE
@ -10,12 +11,32 @@
#include <base/types.h>
#include <Common/LRUCache.h>
#include <Common/PoolBase.h>
#include <Storages/HDFS/HDFSCommon.h>
namespace DB
{
using ThriftHiveMetastoreClientBuilder = std::function<std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient>()>;
class ThriftHiveMetastoreClientPool : public PoolBase<Apache::Hadoop::Hive::ThriftHiveMetastoreClient>
{
public:
using Object = Apache::Hadoop::Hive::ThriftHiveMetastoreClient;
using ObjectPtr = std::shared_ptr<Object>;
using Entry = PoolBase<Apache::Hadoop::Hive::ThriftHiveMetastoreClient>::Entry;
explicit ThriftHiveMetastoreClientPool(ThriftHiveMetastoreClientBuilder builder_);
protected:
ObjectPtr allocObject() override
{
return builder();
}
private:
ThriftHiveMetastoreClientBuilder builder;
};
class HiveMetastoreClient : public WithContext
{
public:
@ -26,7 +47,9 @@ public:
UInt64 last_modify_time; /// In ms
size_t size;
FileInfo() = default;
explicit FileInfo() = default;
FileInfo & operator = (const FileInfo &) = default;
FileInfo(const FileInfo &) = default;
FileInfo(const String & path_, UInt64 last_modify_time_, size_t size_)
: path(path_), last_modify_time(last_modify_time_), size(size_)
{
@ -94,17 +117,18 @@ public:
using HiveTableMetadataPtr = std::shared_ptr<HiveMetastoreClient::HiveTableMetadata>;
explicit HiveMetastoreClient(std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> client_, ContextPtr context_)
: WithContext(context_), client(client_), table_metadata_cache(1000)
explicit HiveMetastoreClient(ThriftHiveMetastoreClientBuilder builder_, ContextPtr context_)
: WithContext(context_)
, table_metadata_cache(1000)
, client_pool(builder_)
{
}
HiveTableMetadataPtr getTableMetadata(const String & db_name, const String & table_name);
// Access hive table information by hive client
std::shared_ptr<Apache::Hadoop::Hive::Table> getHiveTable(const String & db_name, const String & table_name);
void clearTableMetadata(const String & db_name, const String & table_name);
void setClient(std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> client_);
bool isExpired() const { return expired; }
void setExpired() { expired = true; }
void clearExpired() { expired = false; }
private:
static String getCacheKey(const String & db_name, const String & table_name) { return db_name + "." + table_name; }
@ -112,10 +136,10 @@ private:
bool shouldUpdateTableMetadata(
const String & db_name, const String & table_name, const std::vector<Apache::Hadoop::Hive::Partition> & partitions);
std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> client;
void tryCallHiveClient(std::function<void(ThriftHiveMetastoreClientPool::Entry &)> func);
LRUCache<String, HiveTableMetadata> table_metadata_cache;
mutable std::mutex mutex;
std::atomic<bool> expired{false};
ThriftHiveMetastoreClientPool client_pool;
Poco::Logger * log = &Poco::Logger::get("HiveMetastoreClient");
};
@ -128,13 +152,11 @@ public:
HiveMetastoreClientPtr getOrCreate(const String & name, ContextPtr context);
static std::shared_ptr<Apache::Hadoop::Hive::ThriftHiveMetastoreClient> createThriftHiveMetastoreClient(const String & name);
private:
std::mutex mutex;
std::map<String, HiveMetastoreClientPtr> clients;
const int conn_timeout_ms = 10000;
const int recv_timeout_ms = 10000;
const int send_timeout_ms = 10000;
};
}

View File

@ -116,13 +116,12 @@ public:
, compression_method(compression_method_)
, max_block_size(max_block_size_)
, sample_block(std::move(sample_block_))
, to_read_block(sample_block)
, columns_description(getColumnsDescription(sample_block, source_info))
, text_input_field_names(text_input_field_names_)
, format_settings(getFormatSettings(getContext()))
{
/// Initialize to_read_block, which is used to read data from HDFS.
to_read_block = sample_block;
/// Initialize to_read_block, which is used to read data from HDFS.
for (const auto & name_type : source_info->partition_name_types)
{
to_read_block.erase(name_type.name);
@ -207,11 +206,17 @@ public:
/// Enrich with partition columns.
auto types = source_info->partition_name_types.getTypes();
auto names = source_info->partition_name_types.getNames();
auto fields = source_info->hive_files[current_idx]->getPartitionValues();
for (size_t i = 0; i < types.size(); ++i)
{
auto column = types[i]->createColumnConst(num_rows, source_info->hive_files[current_idx]->getPartitionValues()[i]);
auto previous_idx = sample_block.getPositionByName(source_info->partition_name_types.getNames()[i]);
columns.insert(columns.begin() + previous_idx, column->convertToFullColumnIfConst());
// Only add the required partition columns. partition columns are not read from readbuffer
// the column must be in sample_block, otherwise sample_block.getPositionByName(names[i]) will throw an exception
if (!sample_block.has(names[i]))
continue;
auto column = types[i]->createColumnConst(num_rows, fields[i]);
auto previous_idx = sample_block.getPositionByName(names[i]);
columns.insert(columns.begin() + previous_idx, column);
}
/// Enrich with virtual columns.
@ -286,14 +291,22 @@ StorageHive::StorageHive(
storage_metadata.setConstraints(constraints_);
storage_metadata.setComment(comment_);
setInMemoryMetadata(storage_metadata);
}
void StorageHive::lazyInitialize()
{
std::lock_guard lock{init_mutex};
if (has_initialized)
return;
auto hive_metastore_client = HiveMetastoreClientFactory::instance().getOrCreate(hive_metastore_url, getContext());
auto hive_table_metadata = hive_metastore_client->getTableMetadata(hive_database, hive_table);
auto hive_table_metadata = hive_metastore_client->getHiveTable(hive_database, hive_table);
hdfs_namenode_url = getNameNodeUrl(hive_table_metadata->getTable()->sd.location);
table_schema = hive_table_metadata->getTable()->sd.cols;
hdfs_namenode_url = getNameNodeUrl(hive_table_metadata->sd.location);
table_schema = hive_table_metadata->sd.cols;
FileFormat hdfs_file_format = IHiveFile::toFileFormat(hive_table_metadata->getTable()->sd.inputFormat);
FileFormat hdfs_file_format = IHiveFile::toFileFormat(hive_table_metadata->sd.inputFormat);
switch (hdfs_file_format)
{
case FileFormat::TEXT:
@ -331,6 +344,7 @@ StorageHive::StorageHive(
}
initMinMaxIndexExpression();
has_initialized = true;
}
void StorageHive::initMinMaxIndexExpression()
@ -542,7 +556,34 @@ HiveFilePtr StorageHive::createHiveFileIfNeeded(
}
return hive_file;
}
bool StorageHive::isColumnOriented() const
{
return format_name == "Parquet" || format_name == "ORC";
}
void StorageHive::getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const
{
if (!isColumnOriented())
sample_block = header_block;
UInt32 erased_columns = 0;
for (const auto & column : partition_columns)
{
if (sample_block.has(column))
erased_columns++;
}
if (erased_columns == sample_block.columns())
{
for (size_t i = 0; i < header_block.columns(); ++i)
{
const auto & col = header_block.getByPosition(i);
if (!partition_columns.count(col.name))
{
sample_block.insert(col);
break;
}
}
}
}
Pipe StorageHive::read(
const Names & column_names,
const StorageMetadataPtr & metadata_snapshot,
@ -552,6 +593,8 @@ Pipe StorageHive::read(
size_t max_block_size,
unsigned num_streams)
{
lazyInitialize();
HDFSBuilderWrapper builder = createHDFSBuilder(hdfs_namenode_url, context_->getGlobalContext()->getConfigRef());
HDFSFSPtr fs = createHDFSFS(builder.get());
auto hive_metastore_client = HiveMetastoreClientFactory::instance().getOrCreate(hive_metastore_url, getContext());
@ -606,14 +649,20 @@ Pipe StorageHive::read(
sources_info->table_name = hive_table;
sources_info->hive_metastore_client = hive_metastore_client;
sources_info->partition_name_types = partition_name_types;
const auto & header_block = metadata_snapshot->getSampleBlock();
Block sample_block;
for (const auto & column : column_names)
{
sample_block.insert(header_block.getByName(column));
if (column == "_path")
sources_info->need_path_column = true;
if (column == "_file")
sources_info->need_file_column = true;
}
getActualColumnsToRead(sample_block, header_block, NameSet{partition_names.begin(), partition_names.end()});
if (num_streams > sources_info->hive_files.size())
num_streams = sources_info->hive_files.size();
@ -625,7 +674,7 @@ Pipe StorageHive::read(
hdfs_namenode_url,
format_name,
compression_method,
metadata_snapshot->getSampleBlock(),
sample_block,
context_,
max_block_size,
text_input_field_names));

View File

@ -36,7 +36,7 @@ public:
ContextPtr /* query_context */,
const StorageMetadataPtr & /* metadata_snapshot */) const override
{
return false;
return true;
}
@ -53,6 +53,8 @@ public:
NamesAndTypesList getVirtuals() const override;
bool isColumnOriented() const override;
protected:
friend class StorageHiveSource;
StorageHive(
@ -88,12 +90,17 @@ private:
HiveFilePtr
createHiveFileIfNeeded(const FileInfo & file_info, const FieldVector & fields, SelectQueryInfo & query_info, ContextPtr context_);
void getActualColumnsToRead(Block & sample_block, const Block & header_block, const NameSet & partition_columns) const;
String hive_metastore_url;
/// Hive database and table
String hive_database;
String hive_table;
std::mutex init_mutex;
bool has_initialized = false;
/// Hive table meta
std::vector<Apache::Hadoop::Hive::FieldSchema> table_schema;
Names text_input_field_names; /// Defines schema of hive file, only used when text input format is TEXT
@ -116,6 +123,8 @@ private:
std::shared_ptr<HiveSettings> storage_settings;
Poco::Logger * log = &Poco::Logger::get("StorageHive");
void lazyInitialize();
};
}

View File

@ -448,7 +448,7 @@ KeyCondition::KeyCondition(
{
for (size_t i = 0, size = key_column_names.size(); i < size; ++i)
{
std::string name = key_column_names[i];
const auto & name = key_column_names[i];
if (!key_columns.count(name))
key_columns[name] = i;
}
@ -1999,7 +1999,7 @@ BoolMask KeyCondition::checkInHyperrectangle(
if (!element.set_index)
throw Exception("Set for IN is not created yet", ErrorCodes::LOGICAL_ERROR);
rpn_stack.emplace_back(element.set_index->checkInRange(hyperrectangle, data_types));
rpn_stack.emplace_back(element.set_index->checkInRange(hyperrectangle, data_types, single_point));
if (element.function == RPNElement::FUNCTION_NOT_IN_SET)
rpn_stack.back() = !rpn_stack.back();
}

View File

@ -126,13 +126,9 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
if (ctx->disk->exists(local_new_part_tmp_path))
throw Exception("Directory " + fullPath(ctx->disk, local_new_part_tmp_path) + " already exists", ErrorCodes::DIRECTORY_ALREADY_EXISTS);
{
std::lock_guard lock(global_ctx->mutator->tmp_parts_lock);
global_ctx->mutator->tmp_parts.emplace(local_tmp_part_basename);
}
global_ctx->data->temporary_parts.add(local_tmp_part_basename);
SCOPE_EXIT(
std::lock_guard lock(global_ctx->mutator->tmp_parts_lock);
global_ctx->mutator->tmp_parts.erase(local_tmp_part_basename);
global_ctx->data->temporary_parts.remove(local_tmp_part_basename);
);
global_ctx->all_column_names = global_ctx->metadata_snapshot->getColumns().getNamesOfPhysical();

View File

@ -1386,7 +1386,7 @@ static bool isOldPartDirectory(const DiskPtr & disk, const String & directory_pa
}
size_t MergeTreeData::clearOldTemporaryDirectories(const MergeTreeDataMergerMutator & merger_mutator, size_t custom_directories_lifetime_seconds)
size_t MergeTreeData::clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds)
{
/// If the method is already called from another thread, then we don't need to do anything.
std::unique_lock lock(clear_old_temporary_directories_mutex, std::defer_lock);
@ -1418,9 +1418,9 @@ size_t MergeTreeData::clearOldTemporaryDirectories(const MergeTreeDataMergerMuta
{
if (disk->isDirectory(it->path()) && isOldPartDirectory(disk, it->path(), deadline))
{
if (merger_mutator.hasTemporaryPart(basename))
if (temporary_parts.contains(basename))
{
LOG_WARNING(log, "{} is an active destination for one of merge/mutation (consider increasing temporary_directories_lifetime setting)", full_path);
LOG_WARNING(log, "{} is in use (by merge/mutation/INSERT) (consider increasing temporary_directories_lifetime setting)", full_path);
continue;
}
else

View File

@ -3,30 +3,31 @@
#include <Common/SimpleIncrement.h>
#include <Common/MultiVersion.h>
#include <Storages/IStorage.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/DataDestinationType.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromFile.h>
#include <IO/ReadBufferFromFile.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Processors/Merges/Algorithms/Graphite.h>
#include <Storages/MergeTree/BackgroundJobsAssignee.h>
#include <Storages/MergeTree/MergeTreeIndices.h>
#include <Storages/MergeTree/MergeTreePartInfo.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/MergeTree/MergeTreeMutationStatus.h>
#include <Storages/MergeTree/MergeList.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/IndicesDescription.h>
#include <Storages/MergeTree/MergeTreePartsMover.h>
#include <Storages/MergeTree/MergeTreeWriteAheadLog.h>
#include <Storages/MergeTree/PinnedPartUUIDs.h>
#include <Interpreters/PartLog.h>
#include <Disks/StoragePolicy.h>
#include <Storages/MergeTree/ZeroCopyLock.h>
#include <Storages/MergeTree/TemporaryParts.h>
#include <Storages/IndicesDescription.h>
#include <Storages/DataDestinationType.h>
#include <Storages/extractKeyExpressionList.h>
#include <Storages/PartitionCommands.h>
#include <Storages/MergeTree/ZeroCopyLock.h>
#include <Interpreters/PartLog.h>
#include <Disks/StoragePolicy.h>
#include <boost/multi_index_container.hpp>
@ -566,7 +567,7 @@ public:
/// Delete all directories which names begin with "tmp"
/// Must be called with locked lockForShare() because it's using relative_data_path.
size_t clearOldTemporaryDirectories(const MergeTreeDataMergerMutator & merger_mutator, size_t custom_directories_lifetime_seconds);
size_t clearOldTemporaryDirectories(size_t custom_directories_lifetime_seconds);
size_t clearEmptyParts();
@ -906,7 +907,6 @@ public:
mutable std::mutex currently_submerging_emerging_mutex;
protected:
friend class IMergeTreeDataPart;
friend class MergeTreeDataMergerMutator;
friend struct ReplicatedMergeTreeTableMetadata;
@ -1200,6 +1200,8 @@ private:
/// Create zero-copy exclusive lock for part and disk. Useful for coordination of
/// distributed operations which can lead to data duplication. Implemented only in ReplicatedMergeTree.
virtual std::optional<ZeroCopyLock> tryCreateZeroCopyExclusiveLock(const String &, const DiskPtr &) { return std::nullopt; }
TemporaryParts temporary_parts;
};
/// RAII struct to record big parts that are submerging or emerging.

View File

@ -782,10 +782,4 @@ ExecuteTTLType MergeTreeDataMergerMutator::shouldExecuteTTL(const StorageMetadat
}
bool MergeTreeDataMergerMutator::hasTemporaryPart(const std::string & basename) const
{
std::lock_guard lock(tmp_parts_lock);
return tmp_parts.contains(basename);
}
}

View File

@ -192,26 +192,6 @@ private:
ITTLMergeSelector::PartitionIdToTTLs next_recompress_ttl_merge_times_by_partition;
/// Performing TTL merges independently for each partition guarantees that
/// there is only a limited number of TTL merges and no partition stores data, that is too stale
public:
/// Returns true if passed part name is active.
/// (is the destination for one of active mutation/merge).
///
/// NOTE: that it accept basename (i.e. dirname), not the path,
/// since later requires canonical form.
bool hasTemporaryPart(const std::string & basename) const;
private:
/// Set of active temporary paths that is used as the destination.
/// List of such paths is required to avoid trying to remove them during cleanup.
///
/// NOTE: It is pretty short, so use STL is fine.
std::unordered_set<std::string> tmp_parts;
/// Lock for "tmp_parts".
///
/// NOTE: mutable is required to mark hasTemporaryPath() const
mutable std::mutex tmp_parts_lock;
};

View File

@ -94,6 +94,7 @@ bool MutatePlainMergeTreeTask::executeStep()
{
storage.updateMutationEntriesErrors(future_part, false, getCurrentExceptionMessage(false));
write_part_log(ExecutionStatus::fromCurrentException());
tryLogCurrentException(__PRETTY_FUNCTION__);
return false;
}
}

View File

@ -64,7 +64,7 @@ void ReplicatedMergeTreeCleanupThread::iterate()
/// Both use relative_data_path which changes during rename, so we
/// do it under share lock
storage.clearOldWriteAheadLogs();
storage.clearOldTemporaryDirectories(storage.merger_mutator, storage.getSettings()->temporary_directories_lifetime.totalSeconds());
storage.clearOldTemporaryDirectories(storage.getSettings()->temporary_directories_lifetime.totalSeconds());
}
/// This is loose condition: no problem if we actually had lost leadership at this moment

View File

@ -0,0 +1,24 @@
#include <Storages/MergeTree/TemporaryParts.h>
namespace DB
{
bool TemporaryParts::contains(const std::string & basename) const
{
std::lock_guard lock(mutex);
return parts.contains(basename);
}
void TemporaryParts::add(std::string basename)
{
std::lock_guard lock(mutex);
parts.emplace(std::move(basename));
}
void TemporaryParts::remove(const std::string & basename)
{
std::lock_guard lock(mutex);
parts.erase(basename);
}
}

View File

@ -0,0 +1,33 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <mutex>
#include <string>
#include <unordered_set>
namespace DB
{
/// Manages set of active temporary paths that should not be cleaned by background thread.
class TemporaryParts : private boost::noncopyable
{
private:
/// To add const qualifier for contains()
mutable std::mutex mutex;
/// NOTE: It is pretty short, so use STL is fine.
std::unordered_set<std::string> parts;
public:
/// Returns true if passed part name is active.
/// (is the destination for one of active mutation/merge).
///
/// NOTE: that it accept basename (i.e. dirname), not the path,
/// since later requires canonical form.
bool contains(const std::string & basename) const;
void add(std::string basename);
void remove(const std::string & basename);
};
}

View File

@ -1000,7 +1000,8 @@ void StorageBuffer::reschedule()
size_t min = std::max<ssize_t>(min_thresholds.time - time_passed, 1);
size_t max = std::max<ssize_t>(max_thresholds.time - time_passed, 1);
flush_handle->scheduleAfter(std::min(min, max) * 1000);
size_t flush = std::max<ssize_t>(flush_thresholds.time - time_passed, 1);
flush_handle->scheduleAfter(std::min({min, max, flush}) * 1000);
}
void StorageBuffer::checkAlterIsPossible(const AlterCommands & commands, ContextPtr local_context) const

View File

@ -56,6 +56,8 @@
#include <Interpreters/getClusterName.h>
#include <Interpreters/getTableExpressions.h>
#include <Functions/IFunction.h>
#include <TableFunctions/TableFunctionView.h>
#include <TableFunctions/TableFunctionFactory.h>
#include <Processors/Executors/PushingPipelineExecutor.h>
#include <Processors/QueryPlan/QueryPlan.h>
@ -118,6 +120,7 @@ namespace ErrorCodes
extern const int ALTER_OF_COLUMN_IS_FORBIDDEN;
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
extern const int ARGUMENT_OUT_OF_BOUND;
extern const int TOO_LARGE_DISTRIBUTED_DEPTH;
}
namespace ActionLocks
@ -705,6 +708,9 @@ SinkToStoragePtr StorageDistributed::write(const ASTPtr &, const StorageMetadata
QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuery & query, ContextPtr local_context)
{
const Settings & settings = local_context->getSettingsRef();
if (settings.max_distributed_depth && local_context->getClientInfo().distributed_depth >= settings.max_distributed_depth)
throw Exception("Maximum distributed depth exceeded", ErrorCodes::TOO_LARGE_DISTRIBUTED_DEPTH);
std::shared_ptr<StorageDistributed> storage_src;
auto & select = query.select->as<ASTSelectWithUnionQuery &>();
auto new_query = std::dynamic_pointer_cast<ASTInsertQuery>(query.clone());
@ -719,28 +725,60 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer
storage_src = std::dynamic_pointer_cast<StorageDistributed>(joined_tables.getLeftTableStorage());
if (storage_src)
{
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
/// Unwrap view() function.
if (storage_src->remote_table_function_ptr)
{
const TableFunctionPtr src_table_function =
TableFunctionFactory::instance().get(storage_src->remote_table_function_ptr, local_context);
const TableFunctionView * view_function =
assert_cast<const TableFunctionView *>(src_table_function.get());
new_query->select = view_function->getSelectQuery().clone();
}
else
{
const auto select_with_union_query = std::make_shared<ASTSelectWithUnionQuery>();
select_with_union_query->list_of_selects = std::make_shared<ASTExpressionList>();
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select_query->clone());
select_with_union_query->list_of_selects->children.push_back(new_select_query);
auto new_select_query = std::dynamic_pointer_cast<ASTSelectQuery>(select_query->clone());
select_with_union_query->list_of_selects->children.push_back(new_select_query);
new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName());
new_select_query->replaceDatabaseAndTable(storage_src->getRemoteDatabaseName(), storage_src->getRemoteTableName());
new_query->select = select_with_union_query;
new_query->select = select_with_union_query;
}
}
}
}
}
if (!storage_src || storage_src->getClusterName() != getClusterName())
const Cluster::AddressesWithFailover & src_addresses = storage_src ? storage_src->getCluster()->getShardsAddresses() : Cluster::AddressesWithFailover{};
const Cluster::AddressesWithFailover & dst_addresses = getCluster()->getShardsAddresses();
/// Compare addresses instead of cluster name, to handle remote()/cluster().
/// (since for remote()/cluster() the getClusterName() is empty string)
if (src_addresses != dst_addresses)
{
/// The warning should be produced only for root queries,
/// since in case of parallel_distributed_insert_select=1,
/// it will produce warning for the rewritten insert,
/// since destination table is still Distributed there.
if (local_context->getClientInfo().distributed_depth == 0)
{
LOG_WARNING(log,
"Parallel distributed INSERT SELECT is not possible "
"(source cluster={} ({} addresses), destination cluster={} ({} addresses))",
storage_src ? storage_src->getClusterName() : "<not a Distributed table>",
src_addresses.size(),
getClusterName(),
dst_addresses.size());
}
return nullptr;
}
if (settings.parallel_distributed_insert_select == PARALLEL_DISTRIBUTED_INSERT_SELECT_ALL)
{
new_query->table_id = StorageID(getRemoteDatabaseName(), getRemoteTableName());
/// Reset table function for INSERT INTO remote()/cluster()
new_query->table_function.reset();
}
const auto & cluster = getCluster();
@ -757,12 +795,15 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer
new_query_str = buf.str();
}
ContextMutablePtr query_context = Context::createCopy(local_context);
++query_context->getClientInfo().distributed_depth;
for (size_t shard_index : collections::range(0, shards_info.size()))
{
const auto & shard_info = shards_info[shard_index];
if (shard_info.isLocal())
{
InterpreterInsertQuery interpreter(new_query, local_context);
InterpreterInsertQuery interpreter(new_query, query_context);
pipelines.emplace_back(std::make_unique<QueryPipelineBuilder>());
pipelines.back()->init(interpreter.execute().pipeline);
}
@ -776,7 +817,7 @@ QueryPipelineBuilderPtr StorageDistributed::distributedWrite(const ASTInsertQuer
/// INSERT SELECT query returns empty block
auto remote_query_executor
= std::make_shared<RemoteQueryExecutor>(shard_info.pool, std::move(connections), new_query_str, Block{}, local_context);
= std::make_shared<RemoteQueryExecutor>(shard_info.pool, std::move(connections), new_query_str, Block{}, query_context);
pipelines.emplace_back(std::make_unique<QueryPipelineBuilder>());
pipelines.back()->init(Pipe(std::make_shared<RemoteSource>(remote_query_executor, false, settings.async_socket_for_remote)));
pipelines.back()->setSinks([](const Block & header, QueryPipelineBuilder::StreamType) -> ProcessorPtr

View File

@ -114,8 +114,6 @@ public:
/// Used by InterpreterInsertQuery
std::string getRemoteDatabaseName() const { return remote_database; }
std::string getRemoteTableName() const { return remote_table; }
/// Returns empty string if tables is used by TableFunctionRemote
std::string getClusterName() const { return cluster_name; }
ClusterPtr getCluster() const;
/// Used by InterpreterSystemQuery
@ -201,6 +199,7 @@ private:
std::optional<QueryProcessingStage::Enum> getOptimizedQueryProcessingStage(const SelectQueryInfo & query_info, const Settings & settings) const;
size_t getRandomShardIndex(const Cluster::ShardsInfo & shards);
std::string getClusterName() const { return cluster_name.empty() ? "<remote>" : cluster_name; }
const DistributedSettings & getDistributedSettingsRef() const { return distributed_settings; }

View File

@ -108,7 +108,7 @@ void StorageMergeTree::startup()
/// Temporary directories contain incomplete results of merges (after forced restart)
/// and don't allow to reinitialize them, so delete each of them immediately
clearOldTemporaryDirectories(merger_mutator, 0);
clearOldTemporaryDirectories(0);
/// NOTE background task will also do the above cleanups periodically.
time_after_previous_cleanup_parts.restart();
@ -1062,7 +1062,7 @@ bool StorageMergeTree::scheduleDataProcessingJob(BackgroundJobsAssignee & assign
assignee.scheduleCommonTask(ExecutableLambdaAdapter::create(
[this, share_lock] ()
{
return clearOldTemporaryDirectories(merger_mutator, getSettings()->temporary_directories_lifetime.totalSeconds());
return clearOldTemporaryDirectories(getSettings()->temporary_directories_lifetime.totalSeconds());
}, common_assignee_trigger, getStorageID()), /* need_trigger */ false);
scheduled = true;
}

View File

@ -451,7 +451,7 @@ StorageReplicatedMergeTree::StorageReplicatedMergeTree(
}
/// Temporary directories contain uninitialized results of Merges or Fetches (after forced restart),
/// don't allow to reinitialize them, delete each of them immediately.
clearOldTemporaryDirectories(merger_mutator, 0);
clearOldTemporaryDirectories(0);
clearOldWriteAheadLogs();
}

View File

@ -62,8 +62,19 @@ void StorageSystemAsynchronousInserts::fillData(MutableColumns & res_columns, Co
size_t i = 0;
res_columns[i++]->insert(queryToString(insert_query));
res_columns[i++]->insert(insert_query.table_id.getDatabaseName());
res_columns[i++]->insert(insert_query.table_id.getTableName());
/// If query is "INSERT INTO FUNCTION" then table_id is empty.
if (insert_query.table_id)
{
res_columns[i++]->insert(insert_query.table_id.getDatabaseName());
res_columns[i++]->insert(insert_query.table_id.getTableName());
}
else
{
res_columns[i++]->insertDefault();
res_columns[i++]->insertDefault();
}
res_columns[i++]->insert(insert_query.format);
res_columns[i++]->insert(time_in_microseconds(elem->data->first_update));
res_columns[i++]->insert(time_in_microseconds(elem->data->last_update));

View File

@ -4,14 +4,18 @@ if (TARGET ch_contrib::hivemetastore)
add_headers_and_sources(clickhouse_table_functions Hive)
endif ()
list(REMOVE_ITEM clickhouse_table_functions_sources ITableFunction.cpp TableFunctionFactory.cpp)
list(REMOVE_ITEM clickhouse_table_functions_headers ITableFunction.h TableFunctionFactory.h)
list(REMOVE_ITEM clickhouse_table_functions_sources
ITableFunction.cpp
TableFunctionView.cpp
TableFunctionFactory.cpp)
list(REMOVE_ITEM clickhouse_table_functions_headers
ITableFunction.h
TableFunctionView.h
TableFunctionFactory.h)
add_library(clickhouse_table_functions ${clickhouse_table_functions_sources})
target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms)
if (TARGET ch_contrib::hivemetastore)
target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms ch_contrib::hivemetastore ch_contrib::hdfs)
else ()
target_link_libraries(clickhouse_table_functions PRIVATE clickhouse_parsers clickhouse_storages_system dbms)
target_link_libraries(clickhouse_table_functions PRIVATE ch_contrib::hivemetastore ch_contrib::hdfs)
endif ()

View File

@ -15,6 +15,12 @@ namespace ErrorCodes
extern const int BAD_ARGUMENTS;
}
const ASTSelectWithUnionQuery & TableFunctionView::getSelectQuery() const
{
return *create.select;
}
void TableFunctionView::parseArguments(const ASTPtr & ast_function, ContextPtr /*context*/)
{
const auto * function = ast_function->as<ASTFunction>();

View File

@ -16,6 +16,9 @@ class TableFunctionView : public ITableFunction
public:
static constexpr auto name = "view";
std::string getName() const override { return name; }
const ASTSelectWithUnionQuery & getSelectQuery() const;
private:
StoragePtr executeImpl(const ASTPtr & ast_function, ContextPtr context, const String & table_name, ColumnsDescription cached_columns) const override;
const char * getStorageTypeName() const override { return "View"; }

View File

@ -231,7 +231,6 @@ CI_CONFIG = {
},
"Stateful tests (aarch64, actions)": {
"required_build": "package_aarch64",
"force_tests": True,
},
"Stateful tests (release, DatabaseOrdinary, actions)": {
"required_build": "package_release",
@ -259,7 +258,6 @@ CI_CONFIG = {
},
"Stateless tests (aarch64, actions)": {
"required_build": "package_aarch64",
"force_tests": True,
},
"Stateless tests (release, wide parts enabled, actions)": {
"required_build": "package_release",

View File

@ -8,6 +8,7 @@ from typing import Tuple
from artifactory import ArtifactorySaaSPath # type: ignore
from build_download_helper import dowload_build_with_progress
from env_helper import RUNNER_TEMP
from git_helper import TAG_REGEXP, commit, removeprefix, removesuffix
@ -19,7 +20,7 @@ def getenv(name: str, default: str = None):
raise KeyError(f"Necessary {name} environment is not set")
TEMP_PATH = getenv("TEMP_PATH", ".")
TEMP_PATH = os.path.join(RUNNER_TEMP, "push_to_artifactory")
# One of the following ENVs is necessary
JFROG_API_KEY = getenv("JFROG_API_KEY", "")
JFROG_TOKEN = getenv("JFROG_TOKEN", "")
@ -45,11 +46,11 @@ class Packages:
for name, arch in self.packages
)
self.tgz = tuple("{}-{}.tgz".format(name, version) for name, _ in self.packages)
self.tgz = tuple(f"{name}-{version}.tgz" for name, _ in self.packages)
def arch(self, deb_pkg: str) -> str:
if deb_pkg not in self.deb:
raise ValueError("{} not in {}".format(deb_pkg, self.deb))
raise ValueError(f"{deb_pkg} not in {self.deb}")
return removesuffix(deb_pkg, ".deb").split("_")[-1]
@staticmethod

View File

@ -449,7 +449,7 @@ class TestCase:
else:
os.environ['CLICKHOUSE_URL_PARAMS'] = self.base_url_params + '&' + '&'.join(self.random_settings)
new_options = "--allow_repeated_settings --" + " --".join(self.random_settings)
new_options = " --allow_repeated_settings --" + " --".join(self.random_settings)
os.environ['CLICKHOUSE_CLIENT_OPT'] = self.base_client_options + new_options + ' '
return client_options + new_options

View File

@ -30,7 +30,19 @@ def test_create_parquet_table(started_cluster):
node.query("set input_format_parquet_allow_missing_columns = true")
result = node.query("""
DROP TABLE IF EXISTS default.demo_parquet;
CREATE TABLE default.demo_parquet (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String)) ENGINE = Hive('thrift://hivetest:9083', 'test', 'demo') PARTITION BY(day)
CREATE TABLE default.demo_parquet (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String)) ENGINE = Hive('thrift://hivetest:9083', 'test', 'demo') PARTITION BY(day);
""")
logging.info("create result {}".format(result))
time.sleep(120)
assert result.strip() == ''
def test_create_parquet_table_1(started_cluster):
logging.info('Start testing creating hive table ...')
node = started_cluster.instances['h0_0_0']
node.query("set input_format_parquet_allow_missing_columns = true")
result = node.query("""
DROP TABLE IF EXISTS default.demo_parquet_parts;
CREATE TABLE default.demo_parquet_parts (`id` Nullable(String), `score` Nullable(Int32), `day` Nullable(String), `hour` String) ENGINE = Hive('thrift://hivetest:9083', 'test', 'parquet_demo') PARTITION BY(day, hour);
""")
logging.info("create result {}".format(result))
time.sleep(120)
@ -70,6 +82,17 @@ def test_parquet_groupby(started_cluster):
2021-11-16 2
"""
assert result == expected_result
def test_parquet_in_filter(started_cluster):
logging.info('Start testing groupby ...')
node = started_cluster.instances['h0_0_0']
result = node.query("""
SELECT count(*) FROM default.demo_parquet_parts where day = '2021-11-05' and hour in ('00')
""")
expected_result = """2
"""
logging.info("query result:{}".format(result))
assert result == expected_result
def test_orc_groupby(started_cluster):
logging.info('Start testing groupby ...')
node = started_cluster.instances['h0_0_0']
@ -143,4 +166,5 @@ def test_cache_read_bytes(started_cluster):
assert result == expected_result
result = node.query("select sum(ProfileEvent_ExternalDataSourceLocalCacheReadBytes) from system.metric_log where ProfileEvent_ExternalDataSourceLocalCacheReadBytes > 0")
logging.info("Read bytes from cache:{}".format(result))
assert result.strip() != '0'

View File

@ -1,3 +1,5 @@
-- Tags: no-random-settings
SET max_rows_to_read = 1000000;
SET read_overflow_mode = 'break';
SELECT concat(toString(number % 256 AS n), '') AS s, n, max(s) FROM system.numbers_mt GROUP BY s, n, n, n, n, n, n, n, n, n ORDER BY s, n;

Some files were not shown because too many files have changed in this diff Show More