Merge branch 'master' into update_to_gcc_10

This commit is contained in:
alesapin 2020-09-11 12:50:10 +03:00
commit c202ce8e63
242 changed files with 4055 additions and 1105 deletions

View File

@ -17,5 +17,4 @@ ClickHouse is an open-source column-oriented database management system that all
## Upcoming Events
* [ClickHouse Data Integration Virtual Meetup](https://www.eventbrite.com/e/clickhouse-september-virtual-meetup-data-integration-tickets-117421895049) on September 10, 2020.
* [ClickHouse talk at Ya.Subbotnik (in Russian)](https://ya.cc/t/cIBI-3yECj5JF) on September 12, 2020.

View File

@ -1,8 +1,6 @@
#pragma once
#include <algorithm>
#include <cstdint>
#include <cstdlib>
#include <string>
#include <type_traits>

View File

@ -1,9 +1,7 @@
#pragma once
#include <boost/noncopyable.hpp>
#include <mysqlxx/Types.h>
namespace mysqlxx
{
@ -22,6 +20,11 @@ class ResultBase
public:
ResultBase(MYSQL_RES * res_, Connection * conn_, const Query * query_);
ResultBase(const ResultBase &) = delete;
ResultBase & operator=(const ResultBase &) = delete;
ResultBase(ResultBase &&) = default;
ResultBase & operator=(ResultBase &&) = default;
Connection * getConnection() { return conn; }
MYSQL_FIELDS getFields() { return fields; }
unsigned getNumFields() { return num_fields; }

View File

@ -254,7 +254,23 @@ template <> inline std::string Value::get<std::string >() cons
template <> inline LocalDate Value::get<LocalDate >() const { return getDate(); }
template <> inline LocalDateTime Value::get<LocalDateTime >() const { return getDateTime(); }
template <typename T> inline T Value::get() const { return T(*this); }
namespace details
{
// To avoid stack overflow when converting to type with no appropriate c-tor,
// resulting in endless recursive calls from `Value::get<T>()` to `Value::operator T()` to `Value::get<T>()` to ...
template <typename T, typename std::enable_if_t<std::is_constructible_v<T, Value>>>
inline T contructFromValue(const Value & val)
{
return T(val);
}
}
template <typename T>
inline T Value::get() const
{
return details::contructFromValue<T>(*this);
}
inline std::ostream & operator<< (std::ostream & ostr, const Value & x)

View File

@ -18,7 +18,7 @@ ccache --zero-stats ||:
ln -s /usr/lib/x86_64-linux-gnu/libOpenCL.so.1.0.0 /usr/lib/libOpenCL.so ||:
rm -f CMakeCache.txt
cmake --debug-trycompile --verbose=1 -DCMAKE_VERBOSE_MAKEFILE=1 -LA -DCMAKE_BUILD_TYPE=$BUILD_TYPE -DSANITIZE=$SANITIZER $CMAKE_FLAGS ..
ninja $NINJA_FLAGS clickhouse-bundle
ninja -j $(($(nproc) / 2)) $NINJA_FLAGS clickhouse-bundle
mv ./programs/clickhouse* /output
mv ./src/unit_tests_dbms /output
find . -name '*.so' -print -exec mv '{}' /output \;

View File

@ -10,7 +10,7 @@ stage=${stage:-}
# A variable to pass additional flags to CMake.
# Here we explicitly default it to nothing so that bash doesn't complain about
# it being undefined. Also read it as array so that we can pass an empty list
# it being undefined. Also read it as array so that we can pass an empty list
# of additional variable to cmake properly, and it doesn't generate an extra
# empty parameter.
read -ra FASTTEST_CMAKE_FLAGS <<< "${FASTTEST_CMAKE_FLAGS:-}"
@ -127,6 +127,7 @@ ln -s /usr/share/clickhouse-test/config/access_management.xml /etc/clickhouse-se
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/strings_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/decimals_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/executable_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/macros.xml /etc/clickhouse-server/config.d/
ln -s /usr/share/clickhouse-test/config/disks.xml /etc/clickhouse-server/config.d/
#ln -s /usr/share/clickhouse-test/config/secure_ports.xml /etc/clickhouse-server/config.d/

View File

@ -24,6 +24,7 @@ ln -s /usr/share/clickhouse-test/config/access_management.xml /etc/clickhouse-se
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/strings_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/decimals_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/executable_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/macros.xml /etc/clickhouse-server/config.d/
ln -s /usr/share/clickhouse-test/config/disks.xml /etc/clickhouse-server/config.d/
ln -s /usr/share/clickhouse-test/config/secure_ports.xml /etc/clickhouse-server/config.d/

View File

@ -24,6 +24,7 @@ ln -s /usr/share/clickhouse-test/config/access_management.xml /etc/clickhouse-se
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/strings_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/decimals_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/executable_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/macros.xml /etc/clickhouse-server/config.d/
ln -s /usr/share/clickhouse-test/config/disks.xml /etc/clickhouse-server/config.d/
ln -s /usr/share/clickhouse-test/config/secure_ports.xml /etc/clickhouse-server/config.d/

View File

@ -57,6 +57,7 @@ ln -s /usr/share/clickhouse-test/config/access_management.xml /etc/clickhouse-se
ln -s /usr/share/clickhouse-test/config/ints_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/strings_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/decimals_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/executable_dictionary.xml /etc/clickhouse-server/
ln -s /usr/share/clickhouse-test/config/macros.xml /etc/clickhouse-server/config.d/
ln -s /usr/share/clickhouse-test/config/disks.xml /etc/clickhouse-server/config.d/
ln -s /usr/share/clickhouse-test/config/secure_ports.xml /etc/clickhouse-server/config.d/

View File

@ -10,42 +10,51 @@ results of a `SELECT`, and to perform `INSERT`s into a file-backed table.
The supported formats are:
| Format | Input | Output |
|-----------------------------------------------------------------|-------|--------|
| [TabSeparated](#tabseparated) | ✔ | ✔ |
| [TabSeparatedRaw](#tabseparatedraw) | ✔ | ✔ |
| [TabSeparatedWithNames](#tabseparatedwithnames) | ✔ | ✔ |
| [TabSeparatedWithNamesAndTypes](#tabseparatedwithnamesandtypes) | ✔ | ✔ |
| [Template](#format-template) | ✔ | ✔ |
| [TemplateIgnoreSpaces](#templateignorespaces) | ✔ | ✗ |
| [CSV](#csv) | ✔ | ✔ |
| [CSVWithNames](#csvwithnames) | ✔ | ✔ |
| [CustomSeparated](#format-customseparated) | ✔ | ✔ |
| [Values](#data-format-values) | ✔ | ✔ |
| [Vertical](#vertical) | ✗ | ✔ |
| [VerticalRaw](#verticalraw) | ✗ | ✔ |
| [JSON](#json) | ✗ | ✔ |
| [JSONCompact](#jsoncompact) | ✗ | ✔ |
| [JSONEachRow](#jsoneachrow) | ✔ | ✔ |
| [TSKV](#tskv) | ✔ | ✔ |
| [Pretty](#pretty) | ✗ | ✔ |
| [PrettyCompact](#prettycompact) | ✗ | ✔ |
| [PrettyCompactMonoBlock](#prettycompactmonoblock) | ✗ | ✔ |
| [PrettyNoEscapes](#prettynoescapes) | ✗ | ✔ |
| [PrettySpace](#prettyspace) | ✗ | ✔ |
| [Protobuf](#protobuf) | ✔ | ✔ |
| [Avro](#data-format-avro) | ✔ | ✔ |
| [AvroConfluent](#data-format-avro-confluent) | ✔ | ✗ |
| [Parquet](#data-format-parquet) | ✔ | ✔ |
| [Arrow](#data-format-arrow) | ✔ | ✔ |
| [ArrowStream](#data-format-arrow-stream) | ✔ | ✔ |
| [ORC](#data-format-orc) | ✔ | ✗ |
| [RowBinary](#rowbinary) | ✔ | ✔ |
| [RowBinaryWithNamesAndTypes](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [Native](#native) | ✔ | ✔ |
| [Null](#null) | ✗ | ✔ |
| [XML](#xml) | ✗ | ✔ |
| [CapnProto](#capnproto) | ✔ | ✗ |
| Format | Input | Output |
|-----------------------------------------------------------------------------------------|-------|--------|
| [TabSeparated](#tabseparated) | ✔ | ✔ |
| [TabSeparatedRaw](#tabseparatedraw) | ✔ | ✔ |
| [TabSeparatedWithNames](#tabseparatedwithnames) | ✔ | ✔ |
| [TabSeparatedWithNamesAndTypes](#tabseparatedwithnamesandtypes) | ✔ | ✔ |
| [Template](#format-template) | ✔ | ✔ |
| [TemplateIgnoreSpaces](#templateignorespaces) | ✔ | ✗ |
| [CSV](#csv) | ✔ | ✔ |
| [CSVWithNames](#csvwithnames) | ✔ | ✔ |
| [CustomSeparated](#format-customseparated) | ✔ | ✔ |
| [Values](#data-format-values) | ✔ | ✔ |
| [Vertical](#vertical) | ✗ | ✔ |
| [VerticalRaw](#verticalraw) | ✗ | ✔ |
| [JSON](#json) | ✗ | ✔ |
| [JSONString](#jsonstring) | ✗ | ✔ |
| [JSONCompact](#jsoncompact) | ✗ | ✔ |
| [JSONCompactString](#jsoncompactstring) | ✗ | ✔ |
| [JSONEachRow](#jsoneachrow) | ✔ | ✔ |
| [JSONEachRowWithProgress](#jsoneachrowwithprogress) | ✗ | ✔ |
| [JSONStringEachRow](#jsonstringeachrow) | ✔ | ✔ |
| [JSONStringEachRowWithProgress](#jsonstringeachrowwithprogress) | ✗ | ✔ |
| [JSONCompactEachRow](#jsoncompacteachrow) | ✔ | ✔ |
| [JSONCompactEachRowWithNamesAndTypes](#jsoncompacteachrowwithnamesandtypes) | ✔ | ✔ |
| [JSONCompactStringEachRow](#jsoncompactstringeachrow) | ✔ | ✔ |
| [JSONCompactStringEachRowWithNamesAndTypes](#jsoncompactstringeachrowwithnamesandtypes) | ✔ | ✔ |
| [TSKV](#tskv) | ✔ | ✔ |
| [Pretty](#pretty) | ✗ | ✔ |
| [PrettyCompact](#prettycompact) | ✗ | ✔ |
| [PrettyCompactMonoBlock](#prettycompactmonoblock) | ✗ | ✔ |
| [PrettyNoEscapes](#prettynoescapes) | ✗ | ✔ |
| [PrettySpace](#prettyspace) | ✗ | ✔ |
| [Protobuf](#protobuf) | ✔ | ✔ |
| [Avro](#data-format-avro) | ✔ | ✔ |
| [AvroConfluent](#data-format-avro-confluent) | ✔ | ✗ |
| [Parquet](#data-format-parquet) | ✔ | ✔ |
| [Arrow](#data-format-arrow) | ✔ | ✔ |
| [ArrowStream](#data-format-arrow-stream) | ✔ | ✔ |
| [ORC](#data-format-orc) | ✔ | ✗ |
| [RowBinary](#rowbinary) | ✔ | ✔ |
| [RowBinaryWithNamesAndTypes](#rowbinarywithnamesandtypes) | ✔ | ✔ |
| [Native](#native) | ✔ | ✔ |
| [Null](#null) | ✗ | ✔ |
| [XML](#xml) | ✗ | ✔ |
| [CapnProto](#capnproto) | ✔ | ✗ |
You can control some format processing parameters with the ClickHouse settings. For more information read the [Settings](../operations/settings/settings.md) section.
@ -392,62 +401,41 @@ SELECT SearchPhrase, count() AS c FROM test.hits GROUP BY SearchPhrase WITH TOTA
"meta":
[
{
"name": "SearchPhrase",
"name": "'hello'",
"type": "String"
},
{
"name": "c",
"name": "multiply(42, number)",
"type": "UInt64"
},
{
"name": "range(5)",
"type": "Array(UInt8)"
}
],
"data":
[
{
"SearchPhrase": "",
"c": "8267016"
"'hello'": "hello",
"multiply(42, number)": "0",
"range(5)": [0,1,2,3,4]
},
{
"SearchPhrase": "bathroom interior design",
"c": "2166"
"'hello'": "hello",
"multiply(42, number)": "42",
"range(5)": [0,1,2,3,4]
},
{
"SearchPhrase": "yandex",
"c": "1655"
},
{
"SearchPhrase": "spring 2014 fashion",
"c": "1549"
},
{
"SearchPhrase": "freeform photos",
"c": "1480"
"'hello'": "hello",
"multiply(42, number)": "84",
"range(5)": [0,1,2,3,4]
}
],
"totals":
{
"SearchPhrase": "",
"c": "8873898"
},
"rows": 3,
"extremes":
{
"min":
{
"SearchPhrase": "",
"c": "1480"
},
"max":
{
"SearchPhrase": "",
"c": "8267016"
}
},
"rows": 5,
"rows_before_limit_at_least": 141137
"rows_before_limit_at_least": 3
}
```
@ -468,63 +456,165 @@ ClickHouse supports [NULL](../sql-reference/syntax.md), which is displayed as `n
See also the [JSONEachRow](#jsoneachrow) format.
## JSONString {#jsonstring}
Differs from JSON only in that data fields are output in strings, not in typed json values.
Example:
```json
{
"meta":
[
{
"name": "'hello'",
"type": "String"
},
{
"name": "multiply(42, number)",
"type": "UInt64"
},
{
"name": "range(5)",
"type": "Array(UInt8)"
}
],
"data":
[
{
"'hello'": "hello",
"multiply(42, number)": "0",
"range(5)": "[0,1,2,3,4]"
},
{
"'hello'": "hello",
"multiply(42, number)": "42",
"range(5)": "[0,1,2,3,4]"
},
{
"'hello'": "hello",
"multiply(42, number)": "84",
"range(5)": "[0,1,2,3,4]"
}
],
"rows": 3,
"rows_before_limit_at_least": 3
}
```
## JSONCompact {#jsoncompact}
## JSONCompactString {#jsoncompactstring}
Differs from JSON only in that data rows are output in arrays, not in objects.
Example:
``` json
// JSONCompact
{
"meta":
[
{
"name": "SearchPhrase",
"name": "'hello'",
"type": "String"
},
{
"name": "c",
"name": "multiply(42, number)",
"type": "UInt64"
},
{
"name": "range(5)",
"type": "Array(UInt8)"
}
],
"data":
[
["", "8267016"],
["bathroom interior design", "2166"],
["yandex", "1655"],
["fashion trends spring 2014", "1549"],
["freeform photo", "1480"]
["hello", "0", [0,1,2,3,4]],
["hello", "42", [0,1,2,3,4]],
["hello", "84", [0,1,2,3,4]]
],
"totals": ["","8873898"],
"rows": 3,
"extremes":
{
"min": ["","1480"],
"max": ["","8267016"]
},
"rows": 5,
"rows_before_limit_at_least": 141137
"rows_before_limit_at_least": 3
}
```
This format is only appropriate for outputting a query result, but not for parsing (retrieving data to insert in a table).
See also the `JSONEachRow` format.
```json
// JSONCompactString
{
"meta":
[
{
"name": "'hello'",
"type": "String"
},
{
"name": "multiply(42, number)",
"type": "UInt64"
},
{
"name": "range(5)",
"type": "Array(UInt8)"
}
],
## JSONEachRow {#jsoneachrow}
"data":
[
["hello", "0", "[0,1,2,3,4]"],
["hello", "42", "[0,1,2,3,4]"],
["hello", "84", "[0,1,2,3,4]"]
],
When using this format, ClickHouse outputs rows as separated, newline-delimited JSON objects, but the data as a whole is not valid JSON.
"rows": 3,
``` json
{"SearchPhrase":"curtain designs","count()":"1064"}
{"SearchPhrase":"baku","count()":"1000"}
{"SearchPhrase":"","count()":"8267016"}
"rows_before_limit_at_least": 3
}
```
When inserting the data, you should provide a separate JSON object for each row.
## JSONEachRow {#jsoneachrow}
## JSONStringEachRow {#jsonstringeachrow}
## JSONCompactEachRow {#jsoncompacteachrow}
## JSONCompactStringEachRow {#jsoncompactstringeachrow}
When using these formats, ClickHouse outputs rows as separated, newline-delimited JSON values, but the data as a whole is not valid JSON.
``` json
{"some_int":42,"some_str":"hello","some_tuple":[1,"a"]} // JSONEachRow
[42,"hello",[1,"a"]] // JSONCompactEachRow
["42","hello","(2,'a')"] // JSONCompactStringsEachRow
```
When inserting the data, you should provide a separate JSON value for each row.
## JSONEachRowWithProgress {#jsoneachrowwithprogress}
## JSONStringEachRowWithProgress {#jsonstringeachrowwithprogress}
Differs from JSONEachRow/JSONStringEachRow in that ClickHouse will also yield progress information as JSON objects.
```json
{"row":{"'hello'":"hello","multiply(42, number)":"0","range(5)":[0,1,2,3,4]}}
{"row":{"'hello'":"hello","multiply(42, number)":"42","range(5)":[0,1,2,3,4]}}
{"row":{"'hello'":"hello","multiply(42, number)":"84","range(5)":[0,1,2,3,4]}}
{"progress":{"read_rows":"3","read_bytes":"24","written_rows":"0","written_bytes":"0","total_rows_to_read":"3"}}
```
## JSONCompactEachRowWithNamesAndTypes {#jsoncompacteachrowwithnamesandtypes}
## JSONCompactStringEachRowWithNamesAndTypes {#jsoncompactstringeachrowwithnamesandtypes}
Differs from JSONCompactEachRow/JSONCompactStringEachRow in that the column names and types are written as the first two rows.
```json
["'hello'", "multiply(42, number)", "range(5)"]
["String", "UInt64", "Array(UInt8)"]
["hello", "0", [0,1,2,3,4]]
["hello", "42", [0,1,2,3,4]]
["hello", "84", [0,1,2,3,4]]
```
### Inserting Data {#inserting-data}

View File

@ -3,6 +3,9 @@ toc_priority: 58
toc_title: External Dictionaries
---
!!! attention "Attention"
`dict_name` parameter must be fully qualified for dictionaries created with DDL queries. Eg. `<database>.<dict_name>`.
# Functions for Working with External Dictionaries {#ext_dict_functions}
For information on connecting and configuring external dictionaries, see [External dictionaries](../../sql-reference/dictionaries/external-dictionaries/external-dicts.md).

View File

@ -1,6 +1,4 @@
---
machine_translated: true
machine_translated_rev: 72537a2d527c63c07aa5d2361a8829f3895cf2bd
toc_priority: 12
toc_title: "\u6559\u7A0B"
---
@ -9,27 +7,27 @@ toc_title: "\u6559\u7A0B"
## 从本教程中可以期待什么? {#what-to-expect-from-this-tutorial}
通过本教程您将学习如何设置一个简单的ClickHouse集群。 它会很小,但容错和可扩展。 然后,我们将使用其中一个示例数据集来填充数据并执行一些演示查询。
通过本教程您将学习如何设置一个简单的ClickHouse集群。 它会很小,但却是容错和可扩展。 然后,我们将使用其中一个示例数据集来填充数据并执行一些演示查询。
## 单节点设置 {#single-node-setup}
为了推迟分布式环境的复杂性我们将首先在单个服务器或虚拟机上部署ClickHouse。 ClickHouse通常是从安装 [黛布](install.md#install-from-deb-packages) 或 [rpm](install.md#from-rpm-packages) 包,但也有 [替代办法](install.md#from-docker-image) 对于不支持它们的操作系统
为了推迟分布式环境的复杂性我们将首先在单个服务器或虚拟机上部署ClickHouse。 ClickHouse通常是从[deb](install.md#install-from-deb-packages) 或 [rpm](install.md#from-rpm-packages) 包安装,但对于不支持它们的操作系统也有 [替代方法](install.md#from-docker-image)
例如,您选择了 `deb` 包和执行:
例如,您选择了`deb` 包安装,执行:
``` bash
{% include 'install/deb.sh' %}
```
我们在安装的软件包中有什么:
在我们安装的软件中包含这些包:
- `clickhouse-client`包含 [ツ环板clientョツ嘉ッツ偲](../interfaces/cli.md) 应用程序交互式ClickHouse控制台客户端。
- `clickhouse-common` 包包含一个ClickHouse可执行文件。
- `clickhouse-server`包含要作为服务器运行ClickHouse的配置文件。
- `clickhouse-client`,包含 [clickhouse-client](../interfaces/cli.md) 应用程序,它是交互式ClickHouse控制台客户端。
- `clickhouse-common`包含一个ClickHouse可执行文件。
- `clickhouse-server`包含要作为服务端运行的ClickHouse配置文件。
服务器配置文件位于 `/etc/clickhouse-server/`. 在进一步讨论之前,请注意 `<path>` 元素in `config.xml`. Path确定数据存储的位置因此应该位于磁盘容量较大的卷上默认值为 `/var/lib/clickhouse/`. 如果你想调整配置,直接编辑并不方便 `config.xml` 文件,考虑到它可能会在未来的软件包更新中被重写。 复盖配置元素的推荐方法是创建 [在配置文件。d目录](../operations/configuration-files.md) 它作为 “patches” 要配置。xml
服务端配置文件位于 `/etc/clickhouse-server/`。 在进一步讨论之前,请注意 `config.xml`文件中的`<path>` 元素. Path决定了数据存储的位置因此该位置应该位于磁盘容量较大的卷上默认值为 `/var/lib/clickhouse/`。 如果你想调整配置,考虑到它可能会在未来的软件包更新中被重写,直接编辑`config.xml` 文件并不方便。 推荐的方法是在[配置文件](../operations/configuration-files.md)目录创建文件作为config.xml文件的“补丁”用以复写配置元素。
你可能已经注意到了, `clickhouse-server` 安装后不会自动启动。 它也不会在更新后自动重新启动。 您启动服务器的方式取决于您的init系统通常情况下它是:
你可能已经注意到了, `clickhouse-server` 安装后不会自动启动。 它也不会在更新后自动重新启动。 您启动服务端的方式取决于您的初始系统,通常情况下是这样:
``` bash
sudo service clickhouse-server start
@ -41,13 +39,13 @@ sudo service clickhouse-server start
sudo /etc/init.d/clickhouse-server start
```
服务器日志的默认位置是 `/var/log/clickhouse-server/`. 服务器已准备好处理客户端连接一旦它记录 `Ready for connections` 消息
服务端日志的默认位置是 `/var/log/clickhouse-server/`。当服务端在日志中记录 `Ready for connections` 消息,即表示服务端已准备好处理客户端连接。
一旦 `clickhouse-server` 正在运行我们可以利用 `clickhouse-client` 连接到服务器并运行一些测试查询,如 `SELECT "Hello, world!";`.
一旦 `clickhouse-server` 启动并运行,我们可以利用 `clickhouse-client` 连接到服务端,并运行一些测试查询,如 `SELECT "Hello, world!";`.
<details markdown="1">
<summary>Clickhouse-客户端的快速提示</summary>
<summary>Clickhouse-client的快速提示</summary>
交互模式:

View File

@ -866,6 +866,8 @@ private:
// will exit. The ping() would be the best match here, but it's
// private, probably for a good reason that the protocol doesn't allow
// pings at any possible moment.
// Don't forget to reset the default database which might have changed.
connection->setDefaultDatabase("");
connection->forceConnected(connection_parameters.timeouts);
if (text.size() > 4 * 1024)
@ -1103,7 +1105,9 @@ private:
{
last_exception_received_from_server = std::make_unique<Exception>(getCurrentExceptionMessage(true), getCurrentExceptionCode());
received_exception_from_server = true;
std::cerr << "Error on processing query: " << ast_to_process->formatForErrorMessage() << std::endl << last_exception_received_from_server->message();
fmt::print(stderr, "Error on processing query '{}': {}\n",
ast_to_process->formatForErrorMessage(),
last_exception_received_from_server->message());
}
if (!connection->isConnected())

View File

@ -15,6 +15,7 @@ namespace DB
namespace ErrorCodes
{
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int UNKNOWN_TYPE;
}
@ -86,6 +87,8 @@ namespace
case ValueType::vtUUID:
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.convert<std::string>()));
break;
default:
throw Exception("Unsupported value type", ErrorCodes::UNKNOWN_TYPE);
}
}

View File

@ -13,6 +13,11 @@
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_TYPE;
}
namespace
{
using ValueType = ExternalResultDescription::ValueType;
@ -79,6 +84,9 @@ namespace
return Poco::Dynamic::Var(std::to_string(LocalDateTime(time_t(field.get<UInt64>())))).convert<String>();
case ValueType::vtUUID:
return Poco::Dynamic::Var(UUID(field.get<UInt128>()).toUnderType().toHexString()).convert<std::string>();
default:
throw Exception("Unsupported value type", ErrorCodes::UNKNOWN_TYPE);
}
__builtin_unreachable();
}

View File

@ -171,6 +171,12 @@ public:
bool isNumeric() const override { return getDictionary().isNumeric(); }
bool lowCardinality() const override { return true; }
/**
* Checks if the dictionary column is Nullable(T).
* So LC(Nullable(T)) would return true, LC(U) -- false.
*/
bool nestedIsNullable() const { return isColumnNullable(*dictionary.getColumnUnique().getNestedColumn()); }
const IColumnUnique & getDictionary() const { return dictionary.getColumnUnique(); }
const ColumnPtr & getDictionaryPtr() const { return dictionary.getColumnUniquePtr(); }
/// IColumnUnique & getUnique() { return static_cast<IColumnUnique &>(*column_unique); }

View File

@ -2,8 +2,6 @@
LIBRARY()
ADDINCL(
contrib/libs/icu/common
contrib/libs/icu/i18n
contrib/libs/pdqsort
)

View File

@ -281,7 +281,7 @@ namespace ErrorCodes
extern const int DICTIONARY_IS_EMPTY = 281;
extern const int INCORRECT_INDEX = 282;
extern const int UNKNOWN_DISTRIBUTED_PRODUCT_MODE = 283;
extern const int UNKNOWN_GLOBAL_SUBQUERIES_METHOD = 284;
extern const int WRONG_GLOBAL_SUBQUERY = 284;
extern const int TOO_FEW_LIVE_REPLICAS = 285;
extern const int UNSATISFIED_QUORUM_FOR_PREVIOUS_WRITE = 286;
extern const int UNKNOWN_FORMAT_VERSION = 287;
@ -507,6 +507,7 @@ namespace ErrorCodes
extern const int CANNOT_DECLARE_RABBITMQ_EXCHANGE = 540;
extern const int CANNOT_CREATE_RABBITMQ_QUEUE_BINDING = 541;
extern const int CANNOT_REMOVE_RABBITMQ_EXCHANGE = 542;
extern const int UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL = 543;
extern const int KEEPER_EXCEPTION = 999;
extern const int POCO_EXCEPTION = 1000;

View File

@ -6,6 +6,7 @@
#include <Common/Exception.h>
#include <Common/formatReadable.h>
#include <common/logger_useful.h>
#include <Common/ProfileEvents.h>
#include <atomic>
#include <cmath>
@ -22,6 +23,10 @@ namespace DB
}
}
namespace ProfileEvents
{
extern const Event QueryMemoryLimitExceeded;
}
static constexpr size_t log_peak_memory_usage_every = 1ULL << 30;
@ -104,6 +109,7 @@ void MemoryTracker::alloc(Int64 size)
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
auto untrack_lock = blocker.cancel(); // NOLINT
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
std::stringstream message;
message << "Memory tracker";
if (const auto * description = description_ptr.load(std::memory_order_relaxed))
@ -136,6 +142,7 @@ void MemoryTracker::alloc(Int64 size)
/// Prevent recursion. Exception::ctor -> std::string -> new[] -> MemoryTracker::alloc
auto no_track = blocker.cancel(); // NOLINT
ProfileEvents::increment(ProfileEvents::QueryMemoryLimitExceeded);
std::stringstream message;
message << "Memory limit";
if (const auto * description = description_ptr.load(std::memory_order_relaxed))

View File

@ -233,6 +233,7 @@
M(S3WriteRequestsErrors, "Number of non-throttling errors in POST, DELETE, PUT and PATCH requests to S3 storage.") \
M(S3WriteRequestsThrottling, "Number of 429 and 503 errors in POST, DELETE, PUT and PATCH requests to S3 storage.") \
M(S3WriteRequestsRedirects, "Number of redirects in POST, DELETE, PUT and PATCH requests to S3 storage.") \
M(QueryMemoryLimitExceeded, "Number of times when memory limit exceeded for query.") \
namespace ProfileEvents

View File

@ -84,3 +84,6 @@ target_link_libraries (procfs_metrics_provider_perf PRIVATE clickhouse_common_io
add_executable (average average.cpp)
target_link_libraries (average PRIVATE clickhouse_common_io)
add_executable (shell_command_inout shell_command_inout.cpp)
target_link_libraries (shell_command_inout PRIVATE clickhouse_common_io)

View File

@ -0,0 +1,47 @@
#include <thread>
#include <Common/ShellCommand.h>
#include <Common/Exception.h>
#include <IO/ReadBufferFromFileDescriptor.h>
#include <IO/WriteBufferFromFileDescriptor.h>
#include <IO/copyData.h>
/** This example shows how we can proxy stdin to ShellCommand and obtain stdout in streaming fashion. */
int main(int argc, char ** argv)
try
{
using namespace DB;
if (argc < 2)
{
std::cerr << "Usage: shell_command_inout 'command...' < in > out\n";
return 1;
}
auto command = ShellCommand::execute(argv[1]);
ReadBufferFromFileDescriptor in(STDIN_FILENO);
WriteBufferFromFileDescriptor out(STDOUT_FILENO);
WriteBufferFromFileDescriptor err(STDERR_FILENO);
/// Background thread sends data and foreground thread receives result.
std::thread thread([&]
{
copyData(in, command->in);
command->in.close();
});
copyData(command->out, out);
copyData(command->err, err);
thread.join();
return 0;
}
catch (...)
{
std::cerr << DB::getCurrentExceptionMessage(true) << '\n';
throw;
}

View File

@ -72,9 +72,10 @@ bool CachedCompressedReadBuffer::nextImpl()
}
CachedCompressedReadBuffer::CachedCompressedReadBuffer(
const std::string & path_, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator_, UncompressedCache * cache_)
const std::string & path_, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator_, UncompressedCache * cache_, bool allow_different_codecs_)
: ReadBuffer(nullptr, 0), file_in_creator(std::move(file_in_creator_)), cache(cache_), path(path_), file_pos(0)
{
allow_different_codecs = allow_different_codecs_;
}
void CachedCompressedReadBuffer::seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block)

View File

@ -38,7 +38,7 @@ private:
clockid_t clock_type {};
public:
CachedCompressedReadBuffer(const std::string & path, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator, UncompressedCache * cache_);
CachedCompressedReadBuffer(const std::string & path, std::function<std::unique_ptr<ReadBufferFromFileBase>()> file_in_creator, UncompressedCache * cache_, bool allow_different_codecs_ = false);
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

View File

@ -105,13 +105,24 @@ size_t CompressedReadBufferBase::readCompressedData(size_t & size_decompressed,
uint8_t method = ICompressionCodec::readMethod(own_compressed_buffer.data());
if (!codec)
{
codec = CompressionCodecFactory::instance().get(method);
}
else if (method != codec->getMethodByte())
throw Exception("Data compressed with different methods, given method byte 0x"
+ getHexUIntLowercase(method)
+ ", previous method byte 0x"
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
{
if (allow_different_codecs)
{
codec = CompressionCodecFactory::instance().get(method);
}
else
{
throw Exception("Data compressed with different methods, given method byte 0x"
+ getHexUIntLowercase(method)
+ ", previous method byte 0x"
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
}
}
size_compressed_without_checksum = ICompressionCodec::readCompressedBlockSize(own_compressed_buffer.data());
size_decompressed = ICompressionCodec::readDecompressedBlockSize(own_compressed_buffer.data());
@ -163,21 +174,32 @@ void CompressedReadBufferBase::decompress(char * to, size_t size_decompressed, s
uint8_t method = ICompressionCodec::readMethod(compressed_buffer);
if (!codec)
{
codec = CompressionCodecFactory::instance().get(method);
}
else if (codec->getMethodByte() != method)
throw Exception("Data compressed with different methods, given method byte "
+ getHexUIntLowercase(method)
+ ", previous method byte "
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
{
if (allow_different_codecs)
{
codec = CompressionCodecFactory::instance().get(method);
}
else
{
throw Exception("Data compressed with different methods, given method byte "
+ getHexUIntLowercase(method)
+ ", previous method byte "
+ getHexUIntLowercase(codec->getMethodByte()),
ErrorCodes::CANNOT_DECOMPRESS);
}
}
codec->decompress(compressed_buffer, size_compressed_without_checksum, to);
}
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in)
: compressed_in(in), own_compressed_buffer(0)
CompressedReadBufferBase::CompressedReadBufferBase(ReadBuffer * in, bool allow_different_codecs_)
: compressed_in(in), own_compressed_buffer(0), allow_different_codecs(allow_different_codecs_)
{
}

View File

@ -26,6 +26,9 @@ protected:
/// Don't checksum on decompressing.
bool disable_checksum = false;
/// Allow reading data, compressed by different codecs from one file.
bool allow_different_codecs;
/// Read compressed data into compressed_buffer. Get size of decompressed data from block header. Checksum if need.
/// Returns number of compressed bytes read.
size_t readCompressedData(size_t & size_decompressed, size_t & size_compressed_without_checksum);
@ -34,7 +37,7 @@ protected:
public:
/// 'compressed_in' could be initialized lazily, but before first call of 'readCompressedData'.
CompressedReadBufferBase(ReadBuffer * in = nullptr);
CompressedReadBufferBase(ReadBuffer * in = nullptr, bool allow_different_codecs_ = false);
~CompressedReadBufferBase();
/** Disable checksums.

View File

@ -36,20 +36,22 @@ bool CompressedReadBufferFromFile::nextImpl()
return true;
}
CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf)
CompressedReadBufferFromFile::CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_)
: BufferWithOwnMemory<ReadBuffer>(0), p_file_in(std::move(buf)), file_in(*p_file_in)
{
compressed_in = &file_in;
allow_different_codecs = allow_different_codecs_;
}
CompressedReadBufferFromFile::CompressedReadBufferFromFile(
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, size_t buf_size)
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, size_t buf_size, bool allow_different_codecs_)
: BufferWithOwnMemory<ReadBuffer>(0)
, p_file_in(createReadBufferFromFileBase(path, estimated_size, aio_threshold, mmap_threshold, buf_size))
, file_in(*p_file_in)
{
compressed_in = &file_in;
allow_different_codecs = allow_different_codecs_;
}

View File

@ -28,10 +28,11 @@ private:
bool nextImpl() override;
public:
CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf);
CompressedReadBufferFromFile(std::unique_ptr<ReadBufferFromFileBase> buf, bool allow_different_codecs_ = false);
CompressedReadBufferFromFile(
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold, size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE);
const std::string & path, size_t estimated_size, size_t aio_threshold, size_t mmap_threshold,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, bool allow_different_codecs_ = false);
void seek(size_t offset_in_compressed_file, size_t offset_in_decompressed_block);

View File

@ -36,6 +36,11 @@ ASTPtr CompressionCodecDelta::getCodecDesc() const
return makeASTFunction("Delta", literal);
}
void CompressionCodecDelta::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
namespace
{

View File

@ -14,7 +14,10 @@ public:
ASTPtr getCodecDesc() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -339,6 +339,12 @@ ASTPtr CompressionCodecDoubleDelta::getCodecDesc() const
return std::make_shared<ASTIdentifier>("DoubleDelta");
}
void CompressionCodecDoubleDelta::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
hash.update(data_bytes_size);
}
UInt32 CompressionCodecDoubleDelta::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
const auto result = 2 // common header

View File

@ -100,7 +100,10 @@ public:
ASTPtr getCodecDesc() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -254,6 +254,12 @@ ASTPtr CompressionCodecGorilla::getCodecDesc() const
return std::make_shared<ASTIdentifier>("Gorilla");
}
void CompressionCodecGorilla::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
hash.update(data_bytes_size);
}
UInt32 CompressionCodecGorilla::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
const auto result = 2 // common header

View File

@ -97,7 +97,10 @@ public:
ASTPtr getCodecDesc() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -35,6 +35,11 @@ ASTPtr CompressionCodecLZ4::getCodecDesc() const
return std::make_shared<ASTIdentifier>("LZ4");
}
void CompressionCodecLZ4::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecLZ4::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
return LZ4_COMPRESSBOUND(uncompressed_size);

View File

@ -18,6 +18,8 @@ public:
UInt32 getAdditionalSizeAtTheEndOfBuffer() const override { return LZ4::ADDITIONAL_BYTES_AT_END_OF_BUFFER; }
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;

View File

@ -37,6 +37,12 @@ ASTPtr CompressionCodecMultiple::getCodecDesc() const
return result;
}
void CompressionCodecMultiple::updateHash(SipHash & hash) const
{
for (const auto & codec : codecs)
codec->updateHash(hash);
}
UInt32 CompressionCodecMultiple::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
UInt32 compressed_size = uncompressed_size;

View File

@ -19,7 +19,10 @@ public:
static std::vector<uint8_t> getCodecsBytesFromData(const char * source);
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 decompressed_size) const override;

View File

@ -17,6 +17,11 @@ ASTPtr CompressionCodecNone::getCodecDesc() const
return std::make_shared<ASTIdentifier>("NONE");
}
void CompressionCodecNone::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecNone::doCompressData(const char * source, UInt32 source_size, char * dest) const
{
memcpy(dest, source, source_size);

View File

@ -15,7 +15,10 @@ public:
ASTPtr getCodecDesc() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -646,6 +646,13 @@ ASTPtr CompressionCodecT64::getCodecDesc() const
return makeASTFunction("T64", literal);
}
void CompressionCodecT64::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
hash.update(type_idx);
hash.update(variant);
}
void registerCodecT64(CompressionCodecFactory & factory)
{
auto reg_func = [&](const ASTPtr & arguments, DataTypePtr type) -> CompressionCodecPtr

View File

@ -35,6 +35,8 @@ public:
ASTPtr getCodecDesc() const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * src, UInt32 src_size, char * dst) const override;
void doDecompressData(const char * src, UInt32 src_size, char * dst, UInt32 uncompressed_size) const override;

View File

@ -32,6 +32,11 @@ ASTPtr CompressionCodecZSTD::getCodecDesc() const
return makeASTFunction("ZSTD", literal);
}
void CompressionCodecZSTD::updateHash(SipHash & hash) const
{
getCodecDesc()->updateTreeHash(hash);
}
UInt32 CompressionCodecZSTD::getMaxCompressedDataSize(UInt32 uncompressed_size) const
{
return ZSTD_compressBound(uncompressed_size);

View File

@ -21,7 +21,10 @@ public:
UInt32 getMaxCompressedDataSize(UInt32 uncompressed_size) const override;
void updateHash(SipHash & hash) const override;
protected:
UInt32 doCompressData(const char * source, UInt32 source_size, char * dest) const override;
void doDecompressData(const char * source, UInt32 source_size, char * dest, UInt32 uncompressed_size) const override;

View File

@ -35,6 +35,13 @@ ASTPtr ICompressionCodec::getFullCodecDesc() const
return result;
}
UInt64 ICompressionCodec::getHash() const
{
SipHash hash;
updateHash(hash);
return hash.get64();
}
UInt32 ICompressionCodec::compress(const char * source, UInt32 source_size, char * dest) const
{
assert(source != nullptr && dest != nullptr);

View File

@ -5,6 +5,7 @@
#include <Compression/CompressionInfo.h>
#include <Core/Types.h>
#include <Parsers/IAST.h>
#include <Common/SipHash.h>
namespace DB
@ -36,6 +37,10 @@ public:
/// "CODEC(LZ4,LZ4HC(5))"
ASTPtr getFullCodecDesc() const;
/// Hash, that depends on codec ast and optional parameters like data type
virtual void updateHash(SipHash & hash) const = 0;
UInt64 getHash() const;
/// Compressed bytes from uncompressed source to dest. Dest should preallocate memory
UInt32 compress(const char * source, UInt32 source_size, char * dest) const;

View File

@ -129,7 +129,7 @@ private:
Shift shift;
if (decimal0 && decimal1)
{
auto result_type = decimalResultType(*decimal0, *decimal1, false, false);
auto result_type = decimalResultType<false, false>(*decimal0, *decimal1);
shift.a = static_cast<CompareInt>(result_type.scaleFactorFor(*decimal0, false).value);
shift.b = static_cast<CompareInt>(result_type.scaleFactorFor(*decimal1, false).value);
}

View File

@ -1,9 +1,11 @@
#include "ExternalResultDescription.h"
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypeUUID.h>
#include <DataTypes/DataTypesDecimal.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeEnum.h>
#include <Common/typeid_cast.h>
@ -64,6 +66,14 @@ void ExternalResultDescription::init(const Block & sample_block_)
types.emplace_back(ValueType::vtString, is_nullable);
else if (typeid_cast<const DataTypeEnum16 *>(type))
types.emplace_back(ValueType::vtString, is_nullable);
else if (typeid_cast<const DataTypeDateTime64 *>(type))
types.emplace_back(ValueType::vtDateTime64, is_nullable);
else if (typeid_cast<const DataTypeDecimal<Decimal32> *>(type))
types.emplace_back(ValueType::vtDecimal32, is_nullable);
else if (typeid_cast<const DataTypeDecimal<Decimal64> *>(type))
types.emplace_back(ValueType::vtDecimal64, is_nullable);
else if (typeid_cast<const DataTypeDecimal<Decimal128> *>(type))
types.emplace_back(ValueType::vtDecimal128, is_nullable);
else
throw Exception{"Unsupported type " + type->getName(), ErrorCodes::UNKNOWN_TYPE};
}

View File

@ -26,6 +26,10 @@ struct ExternalResultDescription
vtDate,
vtDateTime,
vtUUID,
vtDateTime64,
vtDecimal32,
vtDecimal64,
vtDecimal128
};
Block sample_block;

99
src/Core/MultiEnum.h Normal file
View File

@ -0,0 +1,99 @@
#pragma once
#include <cstdint>
#include <type_traits>
// Wrapper around enum that can have multiple values (or none) set at once.
template <typename EnumTypeT, typename StorageTypeT = std::uint64_t>
struct MultiEnum
{
using StorageType = StorageTypeT;
using EnumType = EnumTypeT;
MultiEnum() = default;
template <typename ... EnumValues, typename = std::enable_if_t<std::conjunction_v<std::is_same<EnumTypeT, EnumValues>...>>>
explicit MultiEnum(EnumValues ... v)
: MultiEnum((toBitFlag(v) | ... | 0u))
{}
template <typename ValueType, typename = std::enable_if_t<std::is_convertible_v<ValueType, StorageType>>>
explicit MultiEnum(ValueType v)
: bitset(v)
{
static_assert(std::is_unsigned_v<ValueType>);
static_assert(std::is_unsigned_v<StorageType> && std::is_integral_v<StorageType>);
}
MultiEnum(const MultiEnum & other) = default;
MultiEnum & operator=(const MultiEnum & other) = default;
bool isSet(EnumType value) const
{
return bitset & toBitFlag(value);
}
void set(EnumType value)
{
bitset |= toBitFlag(value);
}
void unSet(EnumType value)
{
bitset &= ~(toBitFlag(value));
}
void reset()
{
bitset = 0;
}
StorageType getValue() const
{
return bitset;
}
template <typename ValueType, typename = std::enable_if_t<std::is_convertible_v<ValueType, StorageType>>>
void setValue(ValueType new_value)
{
// Can't set value from any enum avoid confusion
static_assert(!std::is_enum_v<ValueType>);
bitset = new_value;
}
bool operator==(const MultiEnum & other) const
{
return bitset == other.bitset;
}
template <typename ValueType, typename = std::enable_if_t<std::is_convertible_v<ValueType, StorageType>>>
bool operator==(ValueType other) const
{
// Shouldn't be comparable with any enum to avoid confusion
static_assert(!std::is_enum_v<ValueType>);
return bitset == other;
}
template <typename U>
bool operator!=(U && other) const
{
return !(*this == other);
}
template <typename ValueType, typename = std::enable_if_t<std::is_convertible_v<ValueType, StorageType>>>
friend bool operator==(ValueType left, MultiEnum right)
{
return right == left;
}
template <typename L>
friend bool operator!=(L left, MultiEnum right)
{
return !(right == left);
}
private:
StorageType bitset = 0;
static StorageType toBitFlag(EnumType v) { return StorageType{1} << static_cast<StorageType>(v); }
};

View File

@ -382,6 +382,7 @@ class IColumn;
M(Bool, alter_partition_verbose_result, false, "Output information about affected parts. Currently works only for FREEZE and ATTACH commands.", 0) \
M(Bool, allow_experimental_database_materialize_mysql, false, "Allow to create database with Engine=MaterializeMySQL(...).", 0) \
M(Bool, system_events_show_zero_values, false, "Include all metrics, even with zero values", 0) \
M(MySQLDataTypesSupport, mysql_datatypes_support_level, 0, "Which MySQL types should be converted to corresponding ClickHouse types (rather than being represented as String). Can be empty or any combination of 'decimal' or 'datetime64'. When empty MySQL's DECIMAL and DATETIME/TIMESTAMP with non-zero precison are seen as String on ClickHouse's side.", 0) \
\
/** Obsolete settings that do nothing but left for compatibility reasons. Remove each one after half a year of obsolescence. */ \
\

View File

@ -11,6 +11,7 @@ namespace ErrorCodes
extern const int UNKNOWN_DISTRIBUTED_PRODUCT_MODE;
extern const int UNKNOWN_JOIN;
extern const int BAD_ARGUMENTS;
extern const int UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL;
}
@ -91,4 +92,8 @@ IMPLEMENT_SETTING_ENUM_WITH_RENAME(DefaultDatabaseEngine, ErrorCodes::BAD_ARGUME
{{"Ordinary", DefaultDatabaseEngine::Ordinary},
{"Atomic", DefaultDatabaseEngine::Atomic}})
IMPLEMENT_SETTING_MULTI_ENUM(MySQLDataTypesSupport, ErrorCodes::UNKNOWN_MYSQL_DATATYPES_SUPPORT_LEVEL,
{{"decimal", MySQLDataTypesSupport::DECIMAL},
{"datetime64", MySQLDataTypesSupport::DATETIME64}})
}

View File

@ -126,4 +126,15 @@ enum class DefaultDatabaseEngine
};
DECLARE_SETTING_ENUM(DefaultDatabaseEngine)
enum class MySQLDataTypesSupport
{
DECIMAL, // convert MySQL's decimal and number to ClickHouse Decimal when applicable
DATETIME64, // convert MySQL's DATETIME and TIMESTAMP and ClickHouse DateTime64 if precision is > 0 or range is greater that for DateTime.
// ENUM
};
DECLARE_SETTING_MULTI_ENUM(MySQLDataTypesSupport)
}

View File

@ -4,9 +4,11 @@
#include <Poco/URI.h>
#include <Core/Types.h>
#include <Core/Field.h>
#include <Core/MultiEnum.h>
#include <boost/range/adaptor/map.hpp>
#include <chrono>
#include <unordered_map>
#include <string_view>
namespace DB
@ -328,6 +330,113 @@ void SettingFieldEnum<EnumT, Traits>::readBinary(ReadBuffer & in)
throw Exception(msg, ERROR_CODE_FOR_UNEXPECTED_NAME); \
}
// Mostly like SettingFieldEnum, but can have multiple enum values (or none) set at once.
template <typename Enum, typename Traits>
struct SettingFieldMultiEnum
{
using EnumType = Enum;
using ValueType = MultiEnum<Enum>;
using StorageType = typename ValueType::StorageType;
ValueType value;
bool changed = false;
explicit SettingFieldMultiEnum(ValueType v = ValueType{}) : value{v} {}
explicit SettingFieldMultiEnum(EnumType e) : value{e} {}
explicit SettingFieldMultiEnum(StorageType s) : value(s) {}
explicit SettingFieldMultiEnum(const Field & f) : value(parseValueFromString(f.safeGet<const String &>())) {}
operator ValueType() const { return value; }
explicit operator StorageType() const { return value.getValue(); }
explicit operator Field() const { return toString(); }
SettingFieldMultiEnum & operator= (StorageType x) { changed = x != value.getValue(); value.setValue(x); return *this; }
SettingFieldMultiEnum & operator= (ValueType x) { changed = !(x == value); value = x; return *this; }
SettingFieldMultiEnum & operator= (const Field & x) { parseFromString(x.safeGet<const String &>()); return *this; }
String toString() const
{
static const String separator = ",";
String result;
for (StorageType i = 0; i < Traits::getEnumSize(); ++i)
{
const auto v = static_cast<Enum>(i);
if (value.isSet(v))
{
result += Traits::toString(v);
result += separator;
}
}
if (result.size() > 0)
result.erase(result.size() - separator.size());
return result;
}
void parseFromString(const String & str) { *this = parseValueFromString(str); }
void writeBinary(WriteBuffer & out) const;
void readBinary(ReadBuffer & in);
private:
static ValueType parseValueFromString(const std::string_view str)
{
static const String separators=", ";
ValueType result;
//to avoid allocating memory on substr()
const std::string_view str_view{str};
auto value_start = str_view.find_first_not_of(separators);
while (value_start != std::string::npos)
{
auto value_end = str_view.find_first_of(separators, value_start + 1);
if (value_end == std::string::npos)
value_end = str_view.size();
result.set(Traits::fromString(str_view.substr(value_start, value_end - value_start)));
value_start = str_view.find_first_not_of(separators, value_end);
}
return result;
}
};
template <typename EnumT, typename Traits>
void SettingFieldMultiEnum<EnumT, Traits>::writeBinary(WriteBuffer & out) const
{
SettingFieldEnumHelpers::writeBinary(toString(), out);
}
template <typename EnumT, typename Traits>
void SettingFieldMultiEnum<EnumT, Traits>::readBinary(ReadBuffer & in)
{
parseFromString(SettingFieldEnumHelpers::readBinary(in));
}
#define DECLARE_SETTING_MULTI_ENUM(ENUM_TYPE) \
DECLARE_SETTING_MULTI_ENUM_WITH_RENAME(ENUM_TYPE, ENUM_TYPE)
#define DECLARE_SETTING_MULTI_ENUM_WITH_RENAME(ENUM_TYPE, NEW_NAME) \
struct SettingField##NEW_NAME##Traits \
{ \
using EnumType = ENUM_TYPE; \
static size_t getEnumSize(); \
static const String & toString(EnumType value); \
static EnumType fromString(const std::string_view & str); \
}; \
\
using SettingField##NEW_NAME = SettingFieldMultiEnum<ENUM_TYPE, SettingField##NEW_NAME##Traits>;
#define IMPLEMENT_SETTING_MULTI_ENUM(ENUM_TYPE, ERROR_CODE_FOR_UNEXPECTED_NAME, ...) \
IMPLEMENT_SETTING_MULTI_ENUM_WITH_RENAME(ENUM_TYPE, ERROR_CODE_FOR_UNEXPECTED_NAME, __VA_ARGS__)
#define IMPLEMENT_SETTING_MULTI_ENUM_WITH_RENAME(NEW_NAME, ERROR_CODE_FOR_UNEXPECTED_NAME, ...) \
IMPLEMENT_SETTING_ENUM_WITH_RENAME(NEW_NAME, ERROR_CODE_FOR_UNEXPECTED_NAME, __VA_ARGS__)\
size_t SettingField##NEW_NAME##Traits::getEnumSize() {\
return std::initializer_list<std::pair<const char*, NEW_NAME>> __VA_ARGS__ .size();\
}
/// Can keep a value of any type. Used for user-defined settings.
struct SettingFieldCustom

View File

@ -0,0 +1,158 @@
#include <gtest/gtest.h>
#include <Core/Types.h>
#include <type_traits>
#include <Core/MultiEnum.h>
namespace
{
using namespace DB;
enum class TestEnum : UInt8
{
// name represents which bit is going to be set
ZERO,
ONE,
TWO,
THREE,
FOUR,
FIVE
};
}
GTEST_TEST(MultiEnum, WithDefault)
{
MultiEnum<TestEnum, UInt8> multi_enum;
ASSERT_EQ(0, multi_enum.getValue());
ASSERT_EQ(0, multi_enum);
ASSERT_FALSE(multi_enum.isSet(TestEnum::ZERO));
ASSERT_FALSE(multi_enum.isSet(TestEnum::ONE));
ASSERT_FALSE(multi_enum.isSet(TestEnum::TWO));
ASSERT_FALSE(multi_enum.isSet(TestEnum::THREE));
ASSERT_FALSE(multi_enum.isSet(TestEnum::FOUR));
ASSERT_FALSE(multi_enum.isSet(TestEnum::FIVE));
}
GTEST_TEST(MultiEnum, WitheEnum)
{
MultiEnum<TestEnum, UInt8> multi_enum(TestEnum::FOUR);
ASSERT_EQ(16, multi_enum.getValue());
ASSERT_EQ(16, multi_enum);
ASSERT_FALSE(multi_enum.isSet(TestEnum::ZERO));
ASSERT_FALSE(multi_enum.isSet(TestEnum::ONE));
ASSERT_FALSE(multi_enum.isSet(TestEnum::TWO));
ASSERT_FALSE(multi_enum.isSet(TestEnum::THREE));
ASSERT_TRUE(multi_enum.isSet(TestEnum::FOUR));
ASSERT_FALSE(multi_enum.isSet(TestEnum::FIVE));
}
GTEST_TEST(MultiEnum, WithValue)
{
const MultiEnum<TestEnum> multi_enum(13u); // (1 | (1 << 2 | 1 << 3)
ASSERT_TRUE(multi_enum.isSet(TestEnum::ZERO));
ASSERT_FALSE(multi_enum.isSet(TestEnum::ONE));
ASSERT_TRUE(multi_enum.isSet(TestEnum::TWO));
ASSERT_TRUE(multi_enum.isSet(TestEnum::THREE));
ASSERT_FALSE(multi_enum.isSet(TestEnum::FOUR));
ASSERT_FALSE(multi_enum.isSet(TestEnum::FIVE));
}
GTEST_TEST(MultiEnum, WithMany)
{
MultiEnum<TestEnum> multi_enum{TestEnum::ONE, TestEnum::FIVE};
ASSERT_EQ(1 << 1 | 1 << 5, multi_enum.getValue());
ASSERT_EQ(1 << 1 | 1 << 5, multi_enum);
ASSERT_FALSE(multi_enum.isSet(TestEnum::ZERO));
ASSERT_TRUE(multi_enum.isSet(TestEnum::ONE));
ASSERT_FALSE(multi_enum.isSet(TestEnum::TWO));
ASSERT_FALSE(multi_enum.isSet(TestEnum::THREE));
ASSERT_FALSE(multi_enum.isSet(TestEnum::FOUR));
ASSERT_TRUE(multi_enum.isSet(TestEnum::FIVE));
}
GTEST_TEST(MultiEnum, WithCopyConstructor)
{
const MultiEnum<TestEnum> multi_enum_source{TestEnum::ONE, TestEnum::FIVE};
MultiEnum<TestEnum> multi_enum{multi_enum_source};
ASSERT_EQ(1 << 1 | 1 << 5, multi_enum.getValue());
}
GTEST_TEST(MultiEnum, SetAndUnSet)
{
MultiEnum<TestEnum> multi_enum;
multi_enum.set(TestEnum::ONE);
ASSERT_EQ(1 << 1, multi_enum);
multi_enum.set(TestEnum::TWO);
ASSERT_EQ(1 << 1| (1 << 2), multi_enum);
multi_enum.unSet(TestEnum::ONE);
ASSERT_EQ(1 << 2, multi_enum);
}
GTEST_TEST(MultiEnum, SetValueOnDifferentTypes)
{
MultiEnum<TestEnum> multi_enum;
multi_enum.setValue(static_cast<UInt8>(1));
ASSERT_EQ(1, multi_enum);
multi_enum.setValue(static_cast<UInt16>(2));
ASSERT_EQ(2, multi_enum);
multi_enum.setValue(static_cast<UInt32>(3));
ASSERT_EQ(3, multi_enum);
multi_enum.setValue(static_cast<UInt64>(4));
ASSERT_EQ(4, multi_enum);
}
// shouldn't compile
//GTEST_TEST(MultiEnum, WithOtherEnumType)
//{
// MultiEnum<TestEnum> multi_enum;
// enum FOO {BAR, FOOBAR};
// MultiEnum<TestEnum> multi_enum2(BAR);
// MultiEnum<TestEnum> multi_enum3(BAR, FOOBAR);
// multi_enum.setValue(FOO::BAR);
// multi_enum == FOO::BAR;
// FOO::BAR == multi_enum;
//}
GTEST_TEST(MultiEnum, SetSameValueMultipleTimes)
{
// Setting same value is idempotent.
MultiEnum<TestEnum> multi_enum;
multi_enum.set(TestEnum::ONE);
ASSERT_EQ(1 << 1, multi_enum);
multi_enum.set(TestEnum::ONE);
ASSERT_EQ(1 << 1, multi_enum);
}
GTEST_TEST(MultiEnum, UnSetValuesThatWerentSet)
{
// Unsetting values that weren't set shouldn't change other flags nor aggregate value.
MultiEnum<TestEnum> multi_enum{TestEnum::ONE, TestEnum::THREE};
multi_enum.unSet(TestEnum::TWO);
ASSERT_EQ(1 << 1 | 1 << 3, multi_enum);
multi_enum.unSet(TestEnum::FOUR);
ASSERT_EQ(1 << 1 | 1 << 3, multi_enum);
multi_enum.unSet(TestEnum::FIVE);
ASSERT_EQ(1 << 1 | 1 << 3, multi_enum);
}
GTEST_TEST(MultiEnum, Reset)
{
MultiEnum<TestEnum> multi_enum{TestEnum::ONE, TestEnum::THREE};
multi_enum.reset();
ASSERT_EQ(0, multi_enum);
}

View File

@ -0,0 +1,146 @@
#include <gtest/gtest.h>
#include <Core/SettingsFields.h>
#include <Core/SettingsEnums.h>
#include <Core/Field.h>
namespace
{
using namespace DB;
using SettingMySQLDataTypesSupport = SettingFieldMultiEnum<MySQLDataTypesSupport, SettingFieldMySQLDataTypesSupportTraits>;
}
namespace DB
{
template <typename Enum, typename Traits>
bool operator== (const SettingFieldMultiEnum<Enum, Traits> & setting, const Field & f)
{
return Field(setting) == f;
}
template <typename Enum, typename Traits>
bool operator== (const Field & f, const SettingFieldMultiEnum<Enum, Traits> & setting)
{
return f == Field(setting);
}
}
GTEST_TEST(MySQLDataTypesSupport, WithDefault)
{
// Setting can be default-initialized and that means all values are unset.
const SettingMySQLDataTypesSupport setting;
ASSERT_EQ(0, setting.value.getValue());
ASSERT_EQ("", setting.toString());
ASSERT_EQ(setting, Field(""));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
}
GTEST_TEST(SettingMySQLDataTypesSupport, WithDECIMAL)
{
// Setting can be initialized with MySQLDataTypesSupport::DECIMAL
// and this value can be obtained in varios forms with getters.
const SettingMySQLDataTypesSupport setting(MySQLDataTypesSupport::DECIMAL);
ASSERT_EQ(1, setting.value.getValue());
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
ASSERT_EQ(Field("decimal"), setting);
}
GTEST_TEST(SettingMySQLDataTypesSupport, With1)
{
// Setting can be initialized with int value corresponding to DECIMAL
// and rest of the test is the same as for that value.
const SettingMySQLDataTypesSupport setting(1u);
ASSERT_EQ(1, setting.value.getValue());
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
ASSERT_EQ(Field("decimal"), setting);
}
GTEST_TEST(SettingMySQLDataTypesSupport, WithMultipleValues)
{
// Setting can be initialized with int value corresponding to (DECIMAL | DATETIME64)
const SettingMySQLDataTypesSupport setting(3u);
ASSERT_EQ(3, setting.value.getValue());
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal,datetime64", setting.toString());
ASSERT_EQ(Field("decimal,datetime64"), setting);
}
GTEST_TEST(SettingMySQLDataTypesSupport, SetString)
{
SettingMySQLDataTypesSupport setting;
setting = String("decimal");
ASSERT_TRUE(setting.changed);
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
ASSERT_EQ(Field("decimal"), setting);
setting = "datetime64,decimal";
ASSERT_TRUE(setting.changed);
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal,datetime64", setting.toString());
ASSERT_EQ(Field("decimal,datetime64"), setting);
// comma with spaces
setting = " datetime64 , decimal ";
ASSERT_FALSE(setting.changed); // false since value is the same as previous one.
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal,datetime64", setting.toString());
ASSERT_EQ(Field("decimal,datetime64"), setting);
setting = String(",,,,,,,, ,decimal");
ASSERT_TRUE(setting.changed);
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
ASSERT_EQ(Field("decimal"), setting);
setting = String(",decimal,decimal,decimal,decimal,decimal,decimal,decimal,decimal,decimal,");
ASSERT_FALSE(setting.changed); //since previous value was DECIMAL
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
ASSERT_EQ(Field("decimal"), setting);
setting = String("");
ASSERT_TRUE(setting.changed);
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("", setting.toString());
ASSERT_EQ(Field(""), setting);
}
GTEST_TEST(SettingMySQLDataTypesSupport, SetInvalidString)
{
// Setting can be initialized with int value corresponding to (DECIMAL | DATETIME64)
SettingMySQLDataTypesSupport setting;
EXPECT_THROW(setting = String("FOOBAR"), Exception);
ASSERT_FALSE(setting.changed);
ASSERT_EQ(0, setting.value.getValue());
EXPECT_THROW(setting = String("decimal,datetime64,123"), Exception);
ASSERT_FALSE(setting.changed);
ASSERT_EQ(0, setting.value.getValue());
EXPECT_NO_THROW(setting = String(", "));
ASSERT_FALSE(setting.changed);
ASSERT_EQ(0, setting.value.getValue());
}

View File

@ -37,6 +37,7 @@ namespace ErrorCodes
extern const int TYPE_MISMATCH;
extern const int MONGODB_CANNOT_AUTHENTICATE;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int UNKNOWN_TYPE;
}
@ -298,6 +299,8 @@ namespace
ErrorCodes::TYPE_MISMATCH};
break;
}
default:
throw Exception("Value of unsupported type:" + column.getName(), ErrorCodes::UNKNOWN_TYPE);
}
}

View File

@ -134,6 +134,7 @@ Block TTLBlockInputStream::readImpl()
removeValuesWithExpiredColumnTTL(block);
updateMovesTTL(block);
updateRecompressionTTL(block);
return block;
}
@ -369,13 +370,12 @@ void TTLBlockInputStream::removeValuesWithExpiredColumnTTL(Block & block)
block.erase(column);
}
void TTLBlockInputStream::updateMovesTTL(Block & block)
void TTLBlockInputStream::updateTTLWithDescriptions(Block & block, const TTLDescriptions & descriptions, TTLInfoMap & ttl_info_map)
{
std::vector<String> columns_to_remove;
for (const auto & ttl_entry : metadata_snapshot->getMoveTTLs())
for (const auto & ttl_entry : descriptions)
{
auto & new_ttl_info = new_ttl_infos.moves_ttl[ttl_entry.result_column];
auto & new_ttl_info = ttl_info_map[ttl_entry.result_column];
if (!block.has(ttl_entry.result_column))
{
columns_to_remove.push_back(ttl_entry.result_column);
@ -395,6 +395,16 @@ void TTLBlockInputStream::updateMovesTTL(Block & block)
block.erase(column);
}
void TTLBlockInputStream::updateMovesTTL(Block & block)
{
updateTTLWithDescriptions(block, metadata_snapshot->getMoveTTLs(), new_ttl_infos.moves_ttl);
}
void TTLBlockInputStream::updateRecompressionTTL(Block & block)
{
updateTTLWithDescriptions(block, metadata_snapshot->getRecompressionTTLs(), new_ttl_infos.recompression_ttl);
}
UInt32 TTLBlockInputStream::getTimestampByIndex(const IColumn * column, size_t ind)
{
if (const ColumnUInt16 * column_date = typeid_cast<const ColumnUInt16 *>(column))

View File

@ -4,6 +4,7 @@
#include <Storages/MergeTree/IMergeTreeDataPart.h>
#include <Core/Block.h>
#include <Interpreters/Aggregator.h>
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
#include <common/DateLUT.h>
@ -75,9 +76,16 @@ private:
/// Finalize agg_result into result_columns
void finalizeAggregates(MutableColumns & result_columns);
/// Execute description expressions on block and update ttl's in
/// ttl_info_map with expression results.
void updateTTLWithDescriptions(Block & block, const TTLDescriptions & descriptions, TTLInfoMap & ttl_info_map);
/// Updates TTL for moves
void updateMovesTTL(Block & block);
/// Update values for recompression TTL using data from block.
void updateRecompressionTTL(Block & block);
UInt32 getTimestampByIndex(const IColumn * column, size_t ind);
bool isTTLExpired(time_t ttl) const;
};

View File

@ -156,38 +156,31 @@ protected:
};
template <typename T, typename U, template <typename> typename DecimalType>
typename std::enable_if_t<(sizeof(T) >= sizeof(U)), DecimalType<T>>
inline decimalResultType(const DecimalType<T> & tx, const DecimalType<U> & ty, bool is_multiply, bool is_divide)
template <bool is_multiply, bool is_division, typename T, typename U, template <typename> typename DecimalType>
inline auto decimalResultType(const DecimalType<T> & tx, const DecimalType<U> & ty)
{
UInt32 scale = (tx.getScale() > ty.getScale() ? tx.getScale() : ty.getScale());
if (is_multiply)
UInt32 scale{};
if constexpr (is_multiply)
scale = tx.getScale() + ty.getScale();
else if (is_divide)
else if constexpr (is_division)
scale = tx.getScale();
return DecimalType<T>(DecimalUtils::maxPrecision<T>(), scale);
else
scale = (tx.getScale() > ty.getScale() ? tx.getScale() : ty.getScale());
if constexpr (sizeof(T) < sizeof(U))
return DecimalType<U>(DecimalUtils::maxPrecision<U>(), scale);
else
return DecimalType<T>(DecimalUtils::maxPrecision<T>(), scale);
}
template <typename T, typename U, template <typename> typename DecimalType>
typename std::enable_if_t<(sizeof(T) < sizeof(U)), const DecimalType<U>>
inline decimalResultType(const DecimalType<T> & tx, const DecimalType<U> & ty, bool is_multiply, bool is_divide)
{
UInt32 scale = (tx.getScale() > ty.getScale() ? tx.getScale() : ty.getScale());
if (is_multiply)
scale = tx.getScale() * ty.getScale();
else if (is_divide)
scale = tx.getScale();
return DecimalType<U>(DecimalUtils::maxPrecision<U>(), scale);
}
template <typename T, typename U, template <typename> typename DecimalType>
inline const DecimalType<T> decimalResultType(const DecimalType<T> & tx, const DataTypeNumber<U> &, bool, bool)
template <bool, bool, typename T, typename U, template <typename> typename DecimalType>
inline const DecimalType<T> decimalResultType(const DecimalType<T> & tx, const DataTypeNumber<U> &)
{
return DecimalType<T>(DecimalUtils::maxPrecision<T>(), tx.getScale());
}
template <typename T, typename U, template <typename> typename DecimalType>
inline const DecimalType<U> decimalResultType(const DataTypeNumber<T> &, const DecimalType<U> & ty, bool, bool)
template <bool, bool, typename T, typename U, template <typename> typename DecimalType>
inline const DecimalType<U> decimalResultType(const DataTypeNumber<T> &, const DecimalType<U> & ty)
{
return DecimalType<U>(DecimalUtils::maxPrecision<U>(), ty.getScale());
}

View File

@ -308,16 +308,30 @@ ReturnType DataTypeNullable::deserializeTextQuoted(IColumn & column, ReadBuffer
const DataTypePtr & nested_data_type)
{
return safeDeserialize<ReturnType>(column, *nested_data_type,
[&istr] { return checkStringByFirstCharacterAndAssertTheRestCaseInsensitive("NULL", istr); },
[&istr]
{
return checkStringByFirstCharacterAndAssertTheRestCaseInsensitive("NULL", istr);
},
[&nested_data_type, &istr, &settings] (IColumn & nested) { nested_data_type->deserializeAsTextQuoted(nested, istr, settings); });
}
void DataTypeNullable::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings) const
{
safeDeserialize(column, *nested_data_type,
[&istr] { return checkStringByFirstCharacterAndAssertTheRestCaseInsensitive("NULL", istr); },
[this, &istr, &settings] (IColumn & nested) { nested_data_type->deserializeAsWholeText(nested, istr, settings); });
deserializeWholeText<void>(column, istr, settings, nested_data_type);
}
template <typename ReturnType>
ReturnType DataTypeNullable::deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings,
const DataTypePtr & nested_data_type)
{
return safeDeserialize<ReturnType>(column, *nested_data_type,
[&istr]
{
return checkStringByFirstCharacterAndAssertTheRestCaseInsensitive("NULL", istr)
|| checkStringByFirstCharacterAndAssertTheRest("ᴺᵁᴸᴸ", istr);
},
[&nested_data_type, &istr, &settings] (IColumn & nested) { nested_data_type->deserializeAsWholeText(nested, istr, settings); });
}
@ -544,6 +558,7 @@ DataTypePtr removeNullable(const DataTypePtr & type)
}
template bool DataTypeNullable::deserializeWholeText<bool>(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const DataTypePtr & nested);
template bool DataTypeNullable::deserializeTextEscaped<bool>(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const DataTypePtr & nested);
template bool DataTypeNullable::deserializeTextQuoted<bool>(IColumn & column, ReadBuffer & istr, const FormatSettings &, const DataTypePtr & nested);
template bool DataTypeNullable::deserializeTextCSV<bool>(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const DataTypePtr & nested);

View File

@ -103,6 +103,8 @@ public:
/// If ReturnType is bool, check for NULL and deserialize value into non-nullable column (and return true) or insert default value of nested type (and return false)
/// If ReturnType is void, deserialize Nullable(T)
template <typename ReturnType = bool>
static ReturnType deserializeWholeText(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const DataTypePtr & nested);
template <typename ReturnType = bool>
static ReturnType deserializeTextEscaped(IColumn & column, ReadBuffer & istr, const FormatSettings & settings, const DataTypePtr & nested);
template <typename ReturnType = bool>
static ReturnType deserializeTextQuoted(IColumn & column, ReadBuffer & istr, const FormatSettings &, const DataTypePtr & nested);

View File

@ -2,11 +2,16 @@
#include <Core/Field.h>
#include <Core/Types.h>
#include <Core/MultiEnum.h>
#include <Core/SettingsEnums.h>
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/IAST.h>
#include "DataTypeDate.h"
#include "DataTypeDateTime.h"
#include "DataTypeDateTime64.h"
#include "DataTypeEnum.h"
#include "DataTypesDecimal.h"
#include "DataTypeFixedString.h"
#include "DataTypeNullable.h"
#include "DataTypeString.h"
@ -25,52 +30,88 @@ ASTPtr dataTypeConvertToQuery(const DataTypePtr & data_type)
return makeASTFunction("Nullable", dataTypeConvertToQuery(typeid_cast<const DataTypeNullable *>(data_type.get())->getNestedType()));
}
DataTypePtr convertMySQLDataType(const std::string & mysql_data_type, bool is_nullable, bool is_unsigned, size_t length)
DataTypePtr convertMySQLDataType(MultiEnum<MySQLDataTypesSupport> type_support,
const std::string & mysql_data_type,
bool is_nullable,
bool is_unsigned,
size_t length,
size_t precision,
size_t scale)
{
DataTypePtr res;
if (mysql_data_type == "tinyint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt8>();
else
res = std::make_shared<DataTypeInt8>();
}
else if (mysql_data_type == "smallint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt16>();
else
res = std::make_shared<DataTypeInt16>();
}
else if (mysql_data_type == "int" || mysql_data_type == "mediumint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt32>();
else
res = std::make_shared<DataTypeInt32>();
}
else if (mysql_data_type == "bigint")
{
if (is_unsigned)
res = std::make_shared<DataTypeUInt64>();
else
res = std::make_shared<DataTypeInt64>();
}
else if (mysql_data_type == "float")
res = std::make_shared<DataTypeFloat32>();
else if (mysql_data_type == "double")
res = std::make_shared<DataTypeFloat64>();
else if (mysql_data_type == "date")
res = std::make_shared<DataTypeDate>();
else if (mysql_data_type == "datetime" || mysql_data_type == "timestamp")
res = std::make_shared<DataTypeDateTime>();
else if (mysql_data_type == "binary")
res = std::make_shared<DataTypeFixedString>(length);
else
// we expect mysql_data_type to be either "basic_type" or "type_with_params(param1, param2, ...)"
auto data_type = std::string_view(mysql_data_type);
const auto param_start_pos = data_type.find("(");
const auto type_name = data_type.substr(0, param_start_pos);
DataTypePtr res = [&]() -> DataTypePtr {
if (type_name == "tinyint")
{
if (is_unsigned)
return std::make_shared<DataTypeUInt8>();
else
return std::make_shared<DataTypeInt8>();
}
if (type_name == "smallint")
{
if (is_unsigned)
return std::make_shared<DataTypeUInt16>();
else
return std::make_shared<DataTypeInt16>();
}
if (type_name == "int" || type_name == "mediumint")
{
if (is_unsigned)
return std::make_shared<DataTypeUInt32>();
else
return std::make_shared<DataTypeInt32>();
}
if (type_name == "bigint")
{
if (is_unsigned)
return std::make_shared<DataTypeUInt64>();
else
return std::make_shared<DataTypeInt64>();
}
if (type_name == "float")
return std::make_shared<DataTypeFloat32>();
if (type_name == "double")
return std::make_shared<DataTypeFloat64>();
if (type_name == "date")
return std::make_shared<DataTypeDate>();
if (type_name == "binary")
return std::make_shared<DataTypeFixedString>(length);
if (type_name == "datetime" || type_name == "timestamp")
{
if (!type_support.isSet(MySQLDataTypesSupport::DATETIME64))
return std::make_shared<DataTypeDateTime>();
if (type_name == "timestamp" && scale == 0)
{
return std::make_shared<DataTypeDateTime>();
}
else if (type_name == "datetime" || type_name == "timestamp")
{
return std::make_shared<DataTypeDateTime64>(scale);
}
}
if (type_support.isSet(MySQLDataTypesSupport::DECIMAL) && (type_name == "numeric" || type_name == "decimal"))
{
if (precision <= DecimalUtils::maxPrecision<Decimal32>())
return std::make_shared<DataTypeDecimal<Decimal32>>(precision, scale);
else if (precision <= DecimalUtils::maxPrecision<Decimal64>())
return std::make_shared<DataTypeDecimal<Decimal64>>(precision, scale);
else if (precision <= DecimalUtils::maxPrecision<Decimal128>())
return std::make_shared<DataTypeDecimal<Decimal128>>(precision, scale);
}
/// Also String is fallback for all unknown types.
res = std::make_shared<DataTypeString>();
return std::make_shared<DataTypeString>();
}();
if (is_nullable)
res = std::make_shared<DataTypeNullable>(res);
return res;
}

View File

@ -1,17 +1,20 @@
#pragma once
#include <string>
#include <Core/MultiEnum.h>
#include <Parsers/IAST.h>
#include "IDataType.h"
namespace DB
{
enum class MySQLDataTypesSupport;
/// Convert data type to query. for example
/// DataTypeUInt8 -> ASTIdentifier(UInt8)
/// DataTypeNullable(DataTypeUInt8) -> ASTFunction(ASTIdentifier(UInt8))
ASTPtr dataTypeConvertToQuery(const DataTypePtr & data_type);
/// Convert MySQL type to ClickHouse data type.
DataTypePtr convertMySQLDataType(const std::string & mysql_data_type, bool is_nullable, bool is_unsigned, size_t length);
DataTypePtr convertMySQLDataType(MultiEnum<MySQLDataTypesSupport> type_support, const std::string & mysql_data_type, bool is_nullable, bool is_unsigned, size_t length, size_t precision, size_t scale);
}

View File

@ -0,0 +1,101 @@
#include <Columns/IColumn.h>
#include <Core/Field.h>
#include <DataTypes/DataTypeFactory.h>
#include <DataTypes/IDataType.h>
#include <DataTypes/getLeastSupertype.h>
#include <DataTypes/getMostSubtype.h>
#include <Formats/FormatSettings.h>
#include <IO/ReadBuffer.h>
#pragma GCC diagnostic ignored "-Wmissing-declarations"
#include <gtest/gtest.h>
#include <string>
#include <vector>
#include <Core/iostream_debug_helpers.h>
namespace std
{
template <typename T>
inline std::ostream& operator<<(std::ostream & ostr, const std::vector<T> & v)
{
ostr << "[";
for (const auto & i : v)
{
ostr << i << ", ";
}
return ostr << "] (" << v.size() << ") items";
}
}
using namespace DB;
struct ParseDataTypeTestCase
{
const char * type_name;
std::vector<String> values;
FieldVector expected_values;
};
std::ostream & operator<<(std::ostream & ostr, const ParseDataTypeTestCase & test_case)
{
return ostr << "ParseDataTypeTestCase{\"" << test_case.type_name << "\", " << test_case.values << "}";
}
class ParseDataTypeTest : public ::testing::TestWithParam<ParseDataTypeTestCase>
{
public:
void SetUp() override
{
const auto & p = GetParam();
data_type = DataTypeFactory::instance().get(p.type_name);
}
DataTypePtr data_type;
};
TEST_P(ParseDataTypeTest, parseStringValue)
{
const auto & p = GetParam();
auto col = data_type->createColumn();
for (const auto & value : p.values)
{
ReadBuffer buffer(const_cast<char *>(value.data()), value.size(), 0);
data_type->deserializeAsWholeText(*col, buffer, FormatSettings{});
}
ASSERT_EQ(p.expected_values.size(), col->size()) << "Actual items: " << *col;
for (size_t i = 0; i < col->size(); ++i)
{
ASSERT_EQ(p.expected_values[i], (*col)[i]);
}
}
INSTANTIATE_TEST_SUITE_P(ParseDecimal,
ParseDataTypeTest,
::testing::ValuesIn(
std::initializer_list<ParseDataTypeTestCase>{
{
"Decimal(8, 0)",
{"0", "5", "8", "-5", "-8", "12345678", "-12345678"},
std::initializer_list<Field>{
DecimalField<Decimal32>(0, 0),
DecimalField<Decimal32>(5, 0),
DecimalField<Decimal32>(8, 0),
DecimalField<Decimal32>(-5, 0),
DecimalField<Decimal32>(-8, 0),
DecimalField<Decimal32>(12345678, 0),
DecimalField<Decimal32>(-12345678, 0)
}
}
}
)
);

View File

@ -10,6 +10,7 @@
# include <DataTypes/DataTypesNumber.h>
# include <DataTypes/convertMySQLDataType.h>
# include <Databases/MySQL/DatabaseConnectionMySQL.h>
# include <Databases/MySQL/FetchTablesColumnsList.h>
# include <Formats/MySQLBlockInputStream.h>
# include <IO/Operators.h>
# include <Parsers/ASTCreateQuery.h>
@ -43,31 +44,14 @@ constexpr static const auto suffix = ".remove_flag";
static constexpr const std::chrono::seconds cleaner_sleep_time{30};
static const std::chrono::seconds lock_acquire_timeout{10};
static String toQueryStringWithQuote(const std::vector<String> & quote_list)
{
WriteBufferFromOwnString quote_list_query;
quote_list_query << "(";
for (size_t index = 0; index < quote_list.size(); ++index)
{
if (index)
quote_list_query << ",";
quote_list_query << quote << quote_list[index];
}
quote_list_query << ")";
return quote_list_query.str();
}
DatabaseConnectionMySQL::DatabaseConnectionMySQL(
const Context & global_context_, const String & database_name_, const String & metadata_path_,
DatabaseConnectionMySQL::DatabaseConnectionMySQL(const Context & context, const String & database_name_, const String & metadata_path_,
const ASTStorage * database_engine_define_, const String & database_name_in_mysql_, mysqlxx::Pool && pool)
: IDatabase(database_name_)
, global_context(global_context_.getGlobalContext())
, global_context(context.getGlobalContext())
, metadata_path(metadata_path_)
, database_engine_define(database_engine_define_->clone())
, database_name_in_mysql(database_name_in_mysql_)
, mysql_datatypes_support_level(context.getQueryContext().getSettingsRef().mysql_datatypes_support_level)
, mysql_pool(std::move(pool))
{
empty(); /// test database is works fine.
@ -78,7 +62,7 @@ bool DatabaseConnectionMySQL::empty() const
{
std::lock_guard<std::mutex> lock(mutex);
fetchTablesIntoLocalCache();
fetchTablesIntoLocalCache(global_context);
if (local_tables_cache.empty())
return true;
@ -90,12 +74,12 @@ bool DatabaseConnectionMySQL::empty() const
return true;
}
DatabaseTablesIteratorPtr DatabaseConnectionMySQL::getTablesIterator(const Context &, const FilterByNameFunction & filter_by_table_name)
DatabaseTablesIteratorPtr DatabaseConnectionMySQL::getTablesIterator(const Context & context, const FilterByNameFunction & filter_by_table_name)
{
Tables tables;
std::lock_guard<std::mutex> lock(mutex);
fetchTablesIntoLocalCache();
fetchTablesIntoLocalCache(context);
for (const auto & [table_name, modify_time_and_storage] : local_tables_cache)
if (!remove_or_detach_tables.count(table_name) && (!filter_by_table_name || filter_by_table_name(table_name)))
@ -109,11 +93,11 @@ bool DatabaseConnectionMySQL::isTableExist(const String & name, const Context &
return bool(tryGetTable(name, context));
}
StoragePtr DatabaseConnectionMySQL::tryGetTable(const String & mysql_table_name, const Context &) const
StoragePtr DatabaseConnectionMySQL::tryGetTable(const String & mysql_table_name, const Context & context) const
{
std::lock_guard<std::mutex> lock(mutex);
fetchTablesIntoLocalCache();
fetchTablesIntoLocalCache(context);
if (!remove_or_detach_tables.count(mysql_table_name) && local_tables_cache.find(mysql_table_name) != local_tables_cache.end())
return local_tables_cache[mysql_table_name].second;
@ -157,11 +141,11 @@ static ASTPtr getCreateQueryFromStorage(const StoragePtr & storage, const ASTPtr
return create_table_query;
}
ASTPtr DatabaseConnectionMySQL::getCreateTableQueryImpl(const String & table_name, const Context &, bool throw_on_error) const
ASTPtr DatabaseConnectionMySQL::getCreateTableQueryImpl(const String & table_name, const Context & context, bool throw_on_error) const
{
std::lock_guard<std::mutex> lock(mutex);
fetchTablesIntoLocalCache();
fetchTablesIntoLocalCache(context);
if (local_tables_cache.find(table_name) == local_tables_cache.end())
{
@ -178,7 +162,7 @@ time_t DatabaseConnectionMySQL::getObjectMetadataModificationTime(const String &
{
std::lock_guard<std::mutex> lock(mutex);
fetchTablesIntoLocalCache();
fetchTablesIntoLocalCache(global_context);
if (local_tables_cache.find(table_name) == local_tables_cache.end())
throw Exception("MySQL table " + database_name_in_mysql + "." + table_name + " doesn't exist.", ErrorCodes::UNKNOWN_TABLE);
@ -194,12 +178,12 @@ ASTPtr DatabaseConnectionMySQL::getCreateDatabaseQuery() const
return create_query;
}
void DatabaseConnectionMySQL::fetchTablesIntoLocalCache() const
void DatabaseConnectionMySQL::fetchTablesIntoLocalCache(const Context & context) const
{
const auto & tables_with_modification_time = fetchTablesWithModificationTime();
destroyLocalCacheExtraTables(tables_with_modification_time);
fetchLatestTablesStructureIntoCache(tables_with_modification_time);
fetchLatestTablesStructureIntoCache(tables_with_modification_time, context);
}
void DatabaseConnectionMySQL::destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const
@ -216,7 +200,7 @@ void DatabaseConnectionMySQL::destroyLocalCacheExtraTables(const std::map<String
}
}
void DatabaseConnectionMySQL::fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> &tables_modification_time) const
void DatabaseConnectionMySQL::fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> &tables_modification_time, const Context & context) const
{
std::vector<String> wait_update_tables_name;
for (const auto & table_modification_time : tables_modification_time)
@ -228,7 +212,7 @@ void DatabaseConnectionMySQL::fetchLatestTablesStructureIntoCache(const std::map
wait_update_tables_name.emplace_back(table_modification_time.first);
}
std::map<String, NamesAndTypesList> tables_and_columns = fetchTablesColumnsList(wait_update_tables_name);
std::map<String, NamesAndTypesList> tables_and_columns = fetchTablesColumnsList(wait_update_tables_name, context);
for (const auto & table_and_columns : tables_and_columns)
{
@ -280,53 +264,16 @@ std::map<String, UInt64> DatabaseConnectionMySQL::fetchTablesWithModificationTim
return tables_with_modification_time;
}
std::map<String, NamesAndTypesList> DatabaseConnectionMySQL::fetchTablesColumnsList(const std::vector<String> & tables_name) const
std::map<String, NamesAndTypesList> DatabaseConnectionMySQL::fetchTablesColumnsList(const std::vector<String> & tables_name, const Context & context) const
{
std::map<String, NamesAndTypesList> tables_and_columns;
const auto & settings = context.getSettingsRef();
if (tables_name.empty())
return tables_and_columns;
Block tables_columns_sample_block
{
{ std::make_shared<DataTypeString>(), "table_name" },
{ std::make_shared<DataTypeString>(), "column_name" },
{ std::make_shared<DataTypeString>(), "column_type" },
{ std::make_shared<DataTypeUInt8>(), "is_nullable" },
{ std::make_shared<DataTypeUInt8>(), "is_unsigned" },
{ std::make_shared<DataTypeUInt64>(), "length" },
};
WriteBufferFromOwnString query;
query << "SELECT "
" TABLE_NAME AS table_name,"
" COLUMN_NAME AS column_name,"
" DATA_TYPE AS column_type,"
" IS_NULLABLE = 'YES' AS is_nullable,"
" COLUMN_TYPE LIKE '%unsigned' AS is_unsigned,"
" CHARACTER_MAXIMUM_LENGTH AS length"
" FROM INFORMATION_SCHEMA.COLUMNS"
" WHERE TABLE_SCHEMA = " << quote << database_name_in_mysql
<< " AND TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION";
const auto & external_table_functions_use_nulls = global_context.getSettings().external_table_functions_use_nulls;
MySQLBlockInputStream result(mysql_pool.get(), query.str(), tables_columns_sample_block, DEFAULT_BLOCK_SIZE);
while (Block block = result.read())
{
size_t rows = block.rows();
for (size_t i = 0; i < rows; ++i)
{
String table_name = (*block.getByPosition(0).column)[i].safeGet<String>();
tables_and_columns[table_name].emplace_back((*block.getByPosition(1).column)[i].safeGet<String>(),
convertMySQLDataType(
(*block.getByPosition(2).column)[i].safeGet<String>(),
(*block.getByPosition(3).column)[i].safeGet<UInt64>() &&
external_table_functions_use_nulls,
(*block.getByPosition(4).column)[i].safeGet<UInt64>(),
(*block.getByPosition(5).column)[i].safeGet<UInt64>()));
}
}
return tables_and_columns;
return DB::fetchTablesColumnsList(
mysql_pool,
database_name_in_mysql,
tables_name,
settings.external_table_functions_use_nulls,
mysql_datatypes_support_level);
}
void DatabaseConnectionMySQL::shutdown()

View File

@ -4,17 +4,27 @@
#if USE_MYSQL
#include <mysqlxx/Pool.h>
#include <Databases/DatabasesCommon.h>
#include <memory>
#include <Parsers/ASTCreateQuery.h>
#include <Common/ThreadPool.h>
#include <Core/MultiEnum.h>
#include <Common/ThreadPool.h>
#include <Databases/DatabasesCommon.h>
#include <Parsers/ASTCreateQuery.h>
#include <atomic>
#include <condition_variable>
#include <map>
#include <memory>
#include <mutex>
#include <unordered_set>
#include <vector>
namespace DB
{
class Context;
enum class MySQLDataTypesSupport;
/** Real-time access to table list and table structure from remote MySQL
* It doesn't make any manipulations with filesystem.
* All tables are created by calling code after real-time pull-out structure from remote MySQL
@ -25,7 +35,7 @@ public:
~DatabaseConnectionMySQL() override;
DatabaseConnectionMySQL(
const Context & global_context, const String & database_name, const String & metadata_path,
const Context & context, const String & database_name, const String & metadata_path,
const ASTStorage * database_engine_define, const String & database_name_in_mysql, mysqlxx::Pool && pool);
String getEngineName() const override { return "MySQL"; }
@ -66,6 +76,9 @@ private:
String metadata_path;
ASTPtr database_engine_define;
String database_name_in_mysql;
// Cache setting for later from query context upon creation,
// so column types depend on the settings set at query-level.
MultiEnum<MySQLDataTypesSupport> mysql_datatypes_support_level;
std::atomic<bool> quit{false};
std::condition_variable cond;
@ -81,15 +94,15 @@ private:
void cleanOutdatedTables();
void fetchTablesIntoLocalCache() const;
void fetchTablesIntoLocalCache(const Context & context) const;
std::map<String, UInt64> fetchTablesWithModificationTime() const;
std::map<String, NamesAndTypesList> fetchTablesColumnsList(const std::vector<String> & tables_name) const;
std::map<String, NamesAndTypesList> fetchTablesColumnsList(const std::vector<String> & tables_name, const Context & context) const;
void destroyLocalCacheExtraTables(const std::map<String, UInt64> & tables_with_modification_time) const;
void fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> & tables_modification_time) const;
void fetchLatestTablesStructureIntoCache(const std::map<String, UInt64> & tables_modification_time, const Context & context) const;
ThreadFromGlobalPool thread;
};

View File

@ -0,0 +1,114 @@
#if !defined(ARCADIA_BUILD)
# include "config_core.h"
#endif
#if USE_MYSQL
#include <Core/Block.h>
#include <Databases/MySQL/FetchTablesColumnsList.h>
#include <DataTypes/convertMySQLDataType.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Formats/MySQLBlockInputStream.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
#include <memory>
namespace
{
using namespace DB;
String toQueryStringWithQuote(const std::vector<String> & quote_list)
{
WriteBufferFromOwnString quote_list_query;
quote_list_query << "(";
for (size_t index = 0; index < quote_list.size(); ++index)
{
if (index)
quote_list_query << ",";
quote_list_query << quote << quote_list[index];
}
quote_list_query << ")";
return quote_list_query.str();
}
}
namespace DB
{
std::map<String, NamesAndTypesList> fetchTablesColumnsList(
mysqlxx::Pool & pool,
const String & database_name,
const std::vector<String> & tables_name,
bool external_table_functions_use_nulls,
MultiEnum<MySQLDataTypesSupport> type_support)
{
std::map<String, NamesAndTypesList> tables_and_columns;
if (tables_name.empty())
return tables_and_columns;
Block tables_columns_sample_block
{
{ std::make_shared<DataTypeString>(), "table_name" },
{ std::make_shared<DataTypeString>(), "column_name" },
{ std::make_shared<DataTypeString>(), "column_type" },
{ std::make_shared<DataTypeUInt8>(), "is_nullable" },
{ std::make_shared<DataTypeUInt8>(), "is_unsigned" },
{ std::make_shared<DataTypeUInt64>(), "length" },
{ std::make_shared<DataTypeUInt64>(), "precision" },
{ std::make_shared<DataTypeUInt64>(), "scale" },
};
WriteBufferFromOwnString query;
query << "SELECT "
" TABLE_NAME AS table_name,"
" COLUMN_NAME AS column_name,"
" COLUMN_TYPE AS column_type,"
" IS_NULLABLE = 'YES' AS is_nullable,"
" COLUMN_TYPE LIKE '%unsigned' AS is_unsigned,"
" CHARACTER_MAXIMUM_LENGTH AS length,"
" NUMERIC_PRECISION as '',"
" IF(ISNULL(NUMERIC_SCALE), DATETIME_PRECISION, NUMERIC_SCALE) AS scale" // we know DATETIME_PRECISION as a scale in CH
" FROM INFORMATION_SCHEMA.COLUMNS"
" WHERE TABLE_SCHEMA = " << quote << database_name
<< " AND TABLE_NAME IN " << toQueryStringWithQuote(tables_name) << " ORDER BY ORDINAL_POSITION";
MySQLBlockInputStream result(pool.get(), query.str(), tables_columns_sample_block, DEFAULT_BLOCK_SIZE);
while (Block block = result.read())
{
const auto & table_name_col = *block.getByPosition(0).column;
const auto & column_name_col = *block.getByPosition(1).column;
const auto & column_type_col = *block.getByPosition(2).column;
const auto & is_nullable_col = *block.getByPosition(3).column;
const auto & is_unsigned_col = *block.getByPosition(4).column;
const auto & char_max_length_col = *block.getByPosition(5).column;
const auto & precision_col = *block.getByPosition(6).column;
const auto & scale_col = *block.getByPosition(7).column;
size_t rows = block.rows();
for (size_t i = 0; i < rows; ++i)
{
String table_name = table_name_col[i].safeGet<String>();
tables_and_columns[table_name].emplace_back(
column_name_col[i].safeGet<String>(),
convertMySQLDataType(
type_support,
column_type_col[i].safeGet<String>(),
external_table_functions_use_nulls && is_nullable_col[i].safeGet<UInt64>(),
is_unsigned_col[i].safeGet<UInt64>(),
char_max_length_col[i].safeGet<UInt64>(),
precision_col[i].safeGet<UInt64>(),
scale_col[i].safeGet<UInt64>()));
}
}
return tables_and_columns;
}
}
#endif

View File

@ -0,0 +1,28 @@
#pragma once
#include "config_core.h"
#if USE_MYSQL
#include <mysqlxx/Pool.h>
#include <common/types.h>
#include <Core/MultiEnum.h>
#include <Core/NamesAndTypes.h>
#include <Core/SettingsEnums.h>
#include <map>
#include <vector>
namespace DB
{
std::map<String, NamesAndTypesList> fetchTablesColumnsList(
mysqlxx::Pool & pool,
const String & database_name,
const std::vector<String> & tables_name,
bool external_table_functions_use_nulls,
MultiEnum<MySQLDataTypesSupport> type_support);
}
#endif

View File

@ -19,6 +19,7 @@ SRCS(
DatabaseWithDictionaries.cpp
MySQL/DatabaseConnectionMySQL.cpp
MySQL/DatabaseMaterializeMySQL.cpp
MySQL/FetchTablesColumnsList.cpp
MySQL/MaterializeMetadata.cpp
MySQL/MaterializeMySQLSettings.cpp
MySQL/MaterializeMySQLSyncThread.cpp

View File

@ -19,6 +19,7 @@ namespace DB
namespace ErrorCodes
{
extern const int TYPE_MISMATCH;
extern const int UNKNOWN_TYPE;
}
CassandraBlockInputStream::CassandraBlockInputStream(
@ -140,6 +141,8 @@ void CassandraBlockInputStream::insertValue(IColumn & column, ValueType type, co
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(uuid_str.data(), uuid_str.size()));
break;
}
default:
throw Exception("Unknown type : " + std::to_string(static_cast<int>(type)), ErrorCodes::UNKNOWN_TYPE);
}
}
@ -252,6 +255,8 @@ void CassandraBlockInputStream::assertTypes(const CassResultPtr & result)
expected = CASS_VALUE_TYPE_UUID;
expected_text = "uuid";
break;
default:
throw Exception("Unknown type : " + std::to_string(static_cast<int>(description.types[i].first)), ErrorCodes::UNKNOWN_TYPE);
}
CassValueType got = cass_result_column_type(result, i);

View File

@ -1,12 +1,13 @@
#include "ExecutableDictionarySource.h"
#include <future>
#include <thread>
#include <functional>
#include <ext/scope_guard.h>
#include <DataStreams/IBlockOutputStream.h>
#include <DataStreams/OwningBlockInputStream.h>
#include <Interpreters/Context.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/copyData.h>
#include <Common/ShellCommand.h>
#include <Common/ThreadPool.h>
#include <common/logger_useful.h>
@ -16,6 +17,7 @@
#include "DictionaryStructure.h"
#include "registerDictionaries.h"
namespace DB
{
static const UInt64 max_block_size = 8192;
@ -31,15 +33,23 @@ namespace
/// Owns ShellCommand and calls wait for it.
class ShellCommandOwningBlockInputStream : public OwningBlockInputStream<ShellCommand>
{
private:
Poco::Logger * log;
public:
ShellCommandOwningBlockInputStream(const BlockInputStreamPtr & impl, std::unique_ptr<ShellCommand> own_)
: OwningBlockInputStream(std::move(impl), std::move(own_))
ShellCommandOwningBlockInputStream(Poco::Logger * log_, const BlockInputStreamPtr & impl, std::unique_ptr<ShellCommand> command_)
: OwningBlockInputStream(std::move(impl), std::move(command_)), log(log_)
{
}
void readSuffix() override
{
OwningBlockInputStream<ShellCommand>::readSuffix();
std::string err;
readStringUntilEOF(err, own->err);
if (!err.empty())
LOG_ERROR(log, "Having stderr: {}", err);
own->wait();
}
};
@ -80,7 +90,7 @@ BlockInputStreamPtr ExecutableDictionarySource::loadAll()
LOG_TRACE(log, "loadAll {}", toString());
auto process = ShellCommand::execute(command);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<ShellCommandOwningBlockInputStream>(input_stream, std::move(process));
return std::make_shared<ShellCommandOwningBlockInputStream>(log, input_stream, std::move(process));
}
BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll()
@ -95,67 +105,73 @@ BlockInputStreamPtr ExecutableDictionarySource::loadUpdatedAll()
LOG_TRACE(log, "loadUpdatedAll {}", command_with_update_field);
auto process = ShellCommand::execute(command_with_update_field);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<ShellCommandOwningBlockInputStream>(input_stream, std::move(process));
return std::make_shared<ShellCommandOwningBlockInputStream>(log, input_stream, std::move(process));
}
namespace
{
/** A stream, that also runs and waits for background thread
* (that will feed data into pipe to be read from the other side of the pipe).
/** A stream, that runs child process and sends data to its stdin in background thread,
* and receives data from its stdout.
*/
class BlockInputStreamWithBackgroundThread final : public IBlockInputStream
{
public:
BlockInputStreamWithBackgroundThread(
const BlockInputStreamPtr & stream_, std::unique_ptr<ShellCommand> && command_, std::packaged_task<void()> && task_)
: stream{stream_}, command{std::move(command_)}, task(std::move(task_)), thread([this] {
task();
command->in.close();
})
const Context & context,
const std::string & format,
const Block & sample_block,
const std::string & command_str,
Poco::Logger * log_,
std::function<void(WriteBufferFromFile &)> && send_data_)
: log(log_),
command(ShellCommand::execute(command_str)),
send_data(std::move(send_data_)),
thread([this] { send_data(command->in); })
{
children.push_back(stream);
stream = context.getInputFormat(format, command->out, sample_block, max_block_size);
}
~BlockInputStreamWithBackgroundThread() override
{
if (thread.joinable())
{
try
{
readSuffix();
}
catch (...)
{
tryLogCurrentException(__PRETTY_FUNCTION__);
}
}
thread.join();
}
Block getHeader() const override { return stream->getHeader(); }
Block getHeader() const override
{
return stream->getHeader();
}
private:
Block readImpl() override { return stream->read(); }
Block readImpl() override
{
return stream->read();
}
void readPrefix() override
{
stream->readPrefix();
}
void readSuffix() override
{
IBlockInputStream::readSuffix();
if (!wait_called)
{
wait_called = true;
command->wait();
}
thread.join();
/// To rethrow an exception, if any.
task.get_future().get();
stream->readSuffix();
std::string err;
readStringUntilEOF(err, command->err);
if (!err.empty())
LOG_ERROR(log, "Having stderr: {}", err);
command->wait();
}
String getName() const override { return "WithBackgroundThread"; }
Poco::Logger * log;
BlockInputStreamPtr stream;
std::unique_ptr<ShellCommand> command;
std::packaged_task<void()> task;
std::function<void(WriteBufferFromFile &)> send_data;
ThreadFromGlobalPool thread;
bool wait_called = false;
};
}
@ -164,28 +180,29 @@ namespace
BlockInputStreamPtr ExecutableDictionarySource::loadIds(const std::vector<UInt64> & ids)
{
LOG_TRACE(log, "loadIds {} size = {}", toString(), ids.size());
auto process = ShellCommand::execute(command);
auto output_stream = context.getOutputFormat(format, process->in, sample_block);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<BlockInputStreamWithBackgroundThread>(
input_stream, std::move(process), std::packaged_task<void()>([output_stream, &ids]() mutable { formatIDs(output_stream, ids); }));
context, format, sample_block, command, log,
[&ids, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context.getOutputFormat(format, out, sample_block);
formatIDs(output_stream, ids);
out.close();
});
}
BlockInputStreamPtr ExecutableDictionarySource::loadKeys(const Columns & key_columns, const std::vector<size_t> & requested_rows)
{
LOG_TRACE(log, "loadKeys {} size = {}", toString(), requested_rows.size());
auto process = ShellCommand::execute(command);
auto output_stream = context.getOutputFormat(format, process->in, sample_block);
auto input_stream = context.getInputFormat(format, process->out, sample_block, max_block_size);
return std::make_shared<BlockInputStreamWithBackgroundThread>(
input_stream, std::move(process), std::packaged_task<void()>([output_stream, key_columns, &requested_rows, this]() mutable
context, format, sample_block, command, log,
[key_columns, &requested_rows, this](WriteBufferFromFile & out) mutable
{
auto output_stream = context.getOutputFormat(format, out, sample_block);
formatKeys(dict_struct, output_stream, key_columns, requested_rows);
}));
out.close();
});
}
bool ExecutableDictionarySource::isModified() const

View File

@ -26,6 +26,7 @@ namespace DB
extern const int LOGICAL_ERROR;
extern const int NUMBER_OF_COLUMNS_DOESNT_MATCH;
extern const int INTERNAL_REDIS_ERROR;
extern const int UNKNOWN_TYPE;
}
@ -103,6 +104,8 @@ namespace DB
case ValueType::vtUUID:
assert_cast<ColumnUInt128 &>(column).insertValue(parse<UUID>(string_value));
break;
default:
throw Exception("Value of unsupported type:" + column.getName(), ErrorCodes::UNKNOWN_TYPE);
}
}
}

View File

@ -324,13 +324,85 @@ void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegm
target = std::move(file_segmentation_engine);
}
/// File Segmentation Engines for parallel reading
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory);
void registerFileSegmentationEngineCSV(FormatFactory & factory);
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory);
void registerFileSegmentationEngineRegexp(FormatFactory & factory);
void registerFileSegmentationEngineJSONAsString(FormatFactory & factory);
/// Formats for both input/output.
void registerInputFormatNative(FormatFactory & factory);
void registerOutputFormatNative(FormatFactory & factory);
void registerInputFormatProcessorNative(FormatFactory & factory);
void registerOutputFormatProcessorNative(FormatFactory & factory);
void registerInputFormatProcessorRowBinary(FormatFactory & factory);
void registerOutputFormatProcessorRowBinary(FormatFactory & factory);
void registerInputFormatProcessorTabSeparated(FormatFactory & factory);
void registerOutputFormatProcessorTabSeparated(FormatFactory & factory);
void registerInputFormatProcessorValues(FormatFactory & factory);
void registerOutputFormatProcessorValues(FormatFactory & factory);
void registerInputFormatProcessorCSV(FormatFactory & factory);
void registerOutputFormatProcessorCSV(FormatFactory & factory);
void registerInputFormatProcessorTSKV(FormatFactory & factory);
void registerOutputFormatProcessorTSKV(FormatFactory & factory);
void registerInputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerInputFormatProcessorProtobuf(FormatFactory & factory);
void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
void registerInputFormatProcessorTemplate(FormatFactory & factory);
void registerOutputFormatProcessorTemplate(FormatFactory & factory);
void registerInputFormatProcessorMsgPack(FormatFactory & factory);
void registerOutputFormatProcessorMsgPack(FormatFactory & factory);
void registerInputFormatProcessorORC(FormatFactory & factory);
void registerOutputFormatProcessorORC(FormatFactory & factory);
void registerInputFormatProcessorParquet(FormatFactory & factory);
void registerOutputFormatProcessorParquet(FormatFactory & factory);
void registerInputFormatProcessorArrow(FormatFactory & factory);
void registerOutputFormatProcessorArrow(FormatFactory & factory);
void registerInputFormatProcessorAvro(FormatFactory & factory);
void registerOutputFormatProcessorAvro(FormatFactory & factory);
/// Output only (presentational) formats.
void registerOutputFormatNull(FormatFactory & factory);
void registerOutputFormatProcessorPretty(FormatFactory & factory);
void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory);
void registerOutputFormatProcessorPrettySpace(FormatFactory & factory);
void registerOutputFormatProcessorVertical(FormatFactory & factory);
void registerOutputFormatProcessorJSON(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompact(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factory);
void registerOutputFormatProcessorXML(FormatFactory & factory);
void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory);
void registerOutputFormatProcessorNull(FormatFactory & factory);
void registerOutputFormatProcessorMySQLWire(FormatFactory & factory);
void registerOutputFormatProcessorMarkdown(FormatFactory & factory);
void registerOutputFormatProcessorPostgreSQLWire(FormatFactory & factory);
/// Input only formats.
void registerInputFormatProcessorRegexp(FormatFactory & factory);
void registerInputFormatProcessorJSONAsString(FormatFactory & factory);
void registerInputFormatProcessorCapnProto(FormatFactory & factory);
FormatFactory::FormatFactory()
{
registerFileSegmentationEngineTabSeparated(*this);
registerFileSegmentationEngineCSV(*this);
registerFileSegmentationEngineJSONEachRow(*this);
registerFileSegmentationEngineRegexp(*this);
registerFileSegmentationEngineJSONAsString(*this);
registerInputFormatNative(*this);
registerOutputFormatNative(*this);
registerOutputFormatProcessorJSONEachRowWithProgress(*this);
registerInputFormatProcessorNative(*this);
registerOutputFormatProcessorNative(*this);
registerInputFormatProcessorRowBinary(*this);
@ -349,8 +421,11 @@ FormatFactory::FormatFactory()
registerOutputFormatProcessorJSONCompactEachRow(*this);
registerInputFormatProcessorProtobuf(*this);
registerOutputFormatProcessorProtobuf(*this);
registerInputFormatProcessorTemplate(*this);
registerOutputFormatProcessorTemplate(*this);
registerInputFormatProcessorMsgPack(*this);
registerOutputFormatProcessorMsgPack(*this);
#if !defined(ARCADIA_BUILD)
registerInputFormatProcessorCapnProto(*this);
registerInputFormatProcessorORC(*this);
registerOutputFormatProcessorORC(*this);
registerInputFormatProcessorParquet(*this);
@ -360,18 +435,6 @@ FormatFactory::FormatFactory()
registerInputFormatProcessorAvro(*this);
registerOutputFormatProcessorAvro(*this);
#endif
registerInputFormatProcessorTemplate(*this);
registerOutputFormatProcessorTemplate(*this);
registerInputFormatProcessorRegexp(*this);
registerInputFormatProcessorMsgPack(*this);
registerOutputFormatProcessorMsgPack(*this);
registerInputFormatProcessorJSONAsString(*this);
registerFileSegmentationEngineTabSeparated(*this);
registerFileSegmentationEngineCSV(*this);
registerFileSegmentationEngineJSONEachRow(*this);
registerFileSegmentationEngineRegexp(*this);
registerFileSegmentationEngineJSONAsString(*this);
registerOutputFormatNull(*this);
@ -381,12 +444,19 @@ FormatFactory::FormatFactory()
registerOutputFormatProcessorVertical(*this);
registerOutputFormatProcessorJSON(*this);
registerOutputFormatProcessorJSONCompact(*this);
registerOutputFormatProcessorJSONEachRowWithProgress(*this);
registerOutputFormatProcessorXML(*this);
registerOutputFormatProcessorODBCDriver2(*this);
registerOutputFormatProcessorNull(*this);
registerOutputFormatProcessorMySQLWire(*this);
registerOutputFormatProcessorMarkdown(*this);
registerOutputFormatProcessorPostgreSQLWire(*this);
registerInputFormatProcessorRegexp(*this);
registerInputFormatProcessorJSONAsString(*this);
#if !defined(ARCADIA_BUILD)
registerInputFormatProcessorCapnProto(*this);
#endif
}
FormatFactory & FormatFactory::instance()

View File

@ -141,73 +141,4 @@ private:
const Creators & getCreators(const String & name) const;
};
/// Formats for both input/output.
void registerInputFormatNative(FormatFactory & factory);
void registerOutputFormatNative(FormatFactory & factory);
void registerInputFormatProcessorNative(FormatFactory & factory);
void registerOutputFormatProcessorNative(FormatFactory & factory);
void registerInputFormatProcessorRowBinary(FormatFactory & factory);
void registerOutputFormatProcessorRowBinary(FormatFactory & factory);
void registerInputFormatProcessorTabSeparated(FormatFactory & factory);
void registerOutputFormatProcessorTabSeparated(FormatFactory & factory);
void registerInputFormatProcessorValues(FormatFactory & factory);
void registerOutputFormatProcessorValues(FormatFactory & factory);
void registerInputFormatProcessorCSV(FormatFactory & factory);
void registerOutputFormatProcessorCSV(FormatFactory & factory);
void registerInputFormatProcessorTSKV(FormatFactory & factory);
void registerOutputFormatProcessorTSKV(FormatFactory & factory);
void registerInputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRow(FormatFactory & factory);
void registerInputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompactEachRow(FormatFactory & factory);
void registerInputFormatProcessorParquet(FormatFactory & factory);
void registerOutputFormatProcessorParquet(FormatFactory & factory);
void registerInputFormatProcessorArrow(FormatFactory & factory);
void registerOutputFormatProcessorArrow(FormatFactory & factory);
void registerInputFormatProcessorProtobuf(FormatFactory & factory);
void registerOutputFormatProcessorProtobuf(FormatFactory & factory);
void registerInputFormatProcessorAvro(FormatFactory & factory);
void registerOutputFormatProcessorAvro(FormatFactory & factory);
void registerInputFormatProcessorTemplate(FormatFactory & factory);
void registerOutputFormatProcessorTemplate(FormatFactory & factory);
void registerInputFormatProcessorMsgPack(FormatFactory & factory);
void registerOutputFormatProcessorMsgPack(FormatFactory & factory);
void registerInputFormatProcessorORC(FormatFactory & factory);
void registerOutputFormatProcessorORC(FormatFactory & factory);
/// File Segmentation Engines for parallel reading
void registerFileSegmentationEngineTabSeparated(FormatFactory & factory);
void registerFileSegmentationEngineCSV(FormatFactory & factory);
void registerFileSegmentationEngineJSONEachRow(FormatFactory & factory);
void registerFileSegmentationEngineRegexp(FormatFactory & factory);
void registerFileSegmentationEngineJSONAsString(FormatFactory & factory);
/// Output only (presentational) formats.
void registerOutputFormatNull(FormatFactory & factory);
void registerOutputFormatProcessorPretty(FormatFactory & factory);
void registerOutputFormatProcessorPrettyCompact(FormatFactory & factory);
void registerOutputFormatProcessorPrettySpace(FormatFactory & factory);
void registerOutputFormatProcessorPrettyASCII(FormatFactory & factory);
void registerOutputFormatProcessorVertical(FormatFactory & factory);
void registerOutputFormatProcessorJSON(FormatFactory & factory);
void registerOutputFormatProcessorJSONCompact(FormatFactory & factory);
void registerOutputFormatProcessorJSONEachRowWithProgress(FormatFactory & factory);
void registerOutputFormatProcessorXML(FormatFactory & factory);
void registerOutputFormatProcessorODBCDriver2(FormatFactory & factory);
void registerOutputFormatProcessorNull(FormatFactory & factory);
void registerOutputFormatProcessorMySQLWire(FormatFactory & factory);
void registerOutputFormatProcessorMarkdown(FormatFactory & factory);
void registerOutputFormatProcessorPostgreSQLWire(FormatFactory & factory);
/// Input only formats.
void registerInputFormatProcessorCapnProto(FormatFactory & factory);
void registerInputFormatProcessorRegexp(FormatFactory & factory);
void registerInputFormatProcessorJSONAsString(FormatFactory & factory);
}

View File

@ -7,13 +7,15 @@
# include <Columns/ColumnNullable.h>
# include <Columns/ColumnString.h>
# include <Columns/ColumnsNumber.h>
# include <Columns/ColumnDecimal.h>
# include <DataTypes/IDataType.h>
# include <DataTypes/DataTypeNullable.h>
# include <IO/ReadHelpers.h>
# include <IO/WriteHelpers.h>
# include <Common/assert_cast.h>
# include <ext/range.h>
# include "MySQLBlockInputStream.h"
namespace DB
{
namespace ErrorCodes
@ -39,7 +41,7 @@ namespace
{
using ValueType = ExternalResultDescription::ValueType;
void insertValue(IColumn & column, const ValueType type, const mysqlxx::Value & value)
void insertValue(const IDataType & data_type, IColumn & column, const ValueType type, const mysqlxx::Value & value)
{
switch (type)
{
@ -85,6 +87,15 @@ namespace
case ValueType::vtUUID:
assert_cast<ColumnUInt128 &>(column).insert(parse<UUID>(value.data(), value.size()));
break;
case ValueType::vtDateTime64:[[fallthrough]];
case ValueType::vtDecimal32: [[fallthrough]];
case ValueType::vtDecimal64: [[fallthrough]];
case ValueType::vtDecimal128:
{
ReadBuffer buffer(const_cast<char *>(value.data()), value.size(), 0);
data_type.deserializeAsWholeText(column, buffer, FormatSettings{});
break;
}
}
}
@ -112,19 +123,21 @@ Block MySQLBlockInputStream::readImpl()
for (const auto idx : ext::range(0, row.size()))
{
const auto value = row[idx];
const auto & sample = description.sample_block.getByPosition(idx);
if (!value.isNull())
{
if (description.types[idx].second)
{
ColumnNullable & column_nullable = assert_cast<ColumnNullable &>(*columns[idx]);
insertValue(column_nullable.getNestedColumn(), description.types[idx].first, value);
const auto & data_type = assert_cast<const DataTypeNullable &>(*sample.type);
insertValue(*data_type.getNestedType(), column_nullable.getNestedColumn(), description.types[idx].first, value);
column_nullable.getNullMapData().emplace_back(0);
}
else
insertValue(*columns[idx], description.types[idx].first, value);
insertValue(*sample.type, *columns[idx], description.types[idx].first, value);
}
else
insertDefaultValue(*columns[idx], *description.sample_block.getByPosition(idx).column);
insertDefaultValue(*columns[idx], *sample.column);
}
++num_rows;

View File

@ -53,8 +53,28 @@ endif()
target_include_directories(clickhouse_functions SYSTEM PRIVATE ${SPARSEHASH_INCLUDE_DIR})
# Won't generate debug info for files with heavy template instantiation to achieve faster linking and lower size.
target_compile_options(clickhouse_functions PRIVATE "-g0")
if (CMAKE_BUILD_TYPE_UC STREQUAL "RELEASE"
OR CMAKE_BUILD_TYPE_UC STREQUAL "RELWITHDEBINFO"
OR CMAKE_BUILD_TYPE_UC STREQUAL "MINSIZEREL")
set (STRIP_DSF_DEFAULT ON)
else()
set (STRIP_DSF_DEFAULT OFF)
endif()
option(STRIP_DEBUG_SYMBOLS_FUNCTIONS
"Do not generate debugger info for ClickHouse functions.
Provides faster linking and lower binary size.
Tradeoff is the inability to debug some source files with e.g. gdb
(empty stack frames and no local variables)."
${STRIP_DSF_DEFAULT})
if (STRIP_DEBUG_SYMBOLS_FUNCTIONS)
message(WARNING "Not generating debugger info for ClickHouse functions")
target_compile_options(clickhouse_functions PRIVATE "-g0")
else()
message(STATUS "Generating debugger info for ClickHouse functions")
endif()
if (USE_ICU)
target_link_libraries (clickhouse_functions PRIVATE ${ICU_LIBRARIES})

View File

@ -561,6 +561,9 @@ public:
template <template <typename, typename> class Op, typename Name, bool valid_on_default_arguments = true>
class FunctionBinaryArithmetic : public IFunction
{
static constexpr const bool is_multiply = IsOperation<Op>::multiply;
static constexpr const bool is_division = IsOperation<Op>::division;
const Context & context;
bool check_decimal_overflow = true;
@ -858,7 +861,7 @@ public:
return false;
else if constexpr (std::is_same_v<LeftDataType, RightDataType>)
{
if (left.getN() == right.getN())
if (left.getN() == right.getN())
{
type_res = std::make_shared<LeftDataType>(left.getN());
return true;
@ -872,10 +875,7 @@ public:
{
if constexpr (IsDataTypeDecimal<LeftDataType> && IsDataTypeDecimal<RightDataType>)
{
constexpr bool is_multiply = IsOperation<Op>::multiply;
constexpr bool is_division = IsOperation<Op>::division;
ResultDataType result_type = decimalResultType(left, right, is_multiply, is_division);
ResultDataType result_type = decimalResultType<is_multiply, is_division>(left, right);
type_res = std::make_shared<ResultDataType>(result_type.getPrecision(), result_type.getScale());
}
else if constexpr (IsDataTypeDecimal<LeftDataType>)
@ -899,7 +899,7 @@ public:
type_res = std::make_shared<ResultDataType>();
return true;
}
}
}
return false;
});
if (!valid)
@ -995,8 +995,6 @@ public:
if constexpr (!std::is_same_v<ResultDataType, InvalidType>)
{
constexpr bool result_is_decimal = IsDataTypeDecimal<LeftDataType> || IsDataTypeDecimal<RightDataType>;
constexpr bool is_multiply = IsOperation<Op>::multiply;
constexpr bool is_division = IsOperation<Op>::division;
using T0 = typename LeftDataType::FieldType;
using T1 = typename RightDataType::FieldType;
@ -1019,7 +1017,7 @@ public:
/// the only case with a non-vector result
if constexpr (result_is_decimal)
{
ResultDataType type = decimalResultType(left, right, is_multiply, is_division);
ResultDataType type = decimalResultType<is_multiply, is_division>(left, right);
typename ResultDataType::FieldType scale_a = type.scaleFactorFor(left, is_multiply);
typename ResultDataType::FieldType scale_b = type.scaleFactorFor(right, is_multiply || is_division);
if constexpr (IsDataTypeDecimal<RightDataType> && is_division)
@ -1044,7 +1042,7 @@ public:
typename ColVecResult::MutablePtr col_res = nullptr;
if constexpr (result_is_decimal)
{
ResultDataType type = decimalResultType(left, right, is_multiply, is_division);
ResultDataType type = decimalResultType<is_multiply, is_division>(left, right);
col_res = ColVecResult::create(0, type.getScale());
}
else
@ -1059,7 +1057,7 @@ public:
{
if constexpr (result_is_decimal)
{
ResultDataType type = decimalResultType(left, right, is_multiply, is_division);
ResultDataType type = decimalResultType<is_multiply, is_division>(left, right);
typename ResultDataType::FieldType scale_a = type.scaleFactorFor(left, is_multiply);
typename ResultDataType::FieldType scale_b = type.scaleFactorFor(right, is_multiply || is_division);
@ -1079,12 +1077,13 @@ public:
{
if constexpr (result_is_decimal)
{
ResultDataType type = decimalResultType(left, right, is_multiply, is_division);
ResultDataType type = decimalResultType<is_multiply, is_division>(left, right);
typename ResultDataType::FieldType scale_a = type.scaleFactorFor(left, is_multiply);
typename ResultDataType::FieldType scale_b = type.scaleFactorFor(right, is_multiply || is_division);
if constexpr (IsDataTypeDecimal<RightDataType> && is_division)
scale_a = right.getScaleMultiplier();
if (auto col_right = checkAndGetColumn<ColVecT1>(col_right_raw))
{
OpImpl::vectorVector(col_left->getData(), col_right->getData(), vec_res, scale_a, scale_b,

View File

@ -2260,9 +2260,7 @@ private:
size_t nullable_pos = block.columns() - 1;
nullable_col = typeid_cast<const ColumnNullable *>(block.getByPosition(nullable_pos).column.get());
if (!nullable_col)
throw Exception("Last column should be ColumnNullable", ErrorCodes::LOGICAL_ERROR);
if (col && nullable_col->size() != col->size())
if (col && nullable_col && nullable_col->size() != col->size())
throw Exception("ColumnNullable is not compatible with original", ErrorCodes::LOGICAL_ERROR);
}

View File

@ -3,5 +3,6 @@ add_headers_and_sources(clickhouse_functions_gatherutils .)
add_library(clickhouse_functions_gatherutils ${clickhouse_functions_gatherutils_sources} ${clickhouse_functions_gatherutils_headers})
target_link_libraries(clickhouse_functions_gatherutils PRIVATE dbms)
# Won't generate debug info for files with heavy template instantiation to achieve faster linking and lower size.
target_compile_options(clickhouse_functions_gatherutils PRIVATE "-g0")
if (STRIP_DEBUG_SYMBOLS_FUNCTIONS)
target_compile_options(clickhouse_functions_gatherutils PRIVATE "-g0")
endif()

View File

@ -3,8 +3,9 @@ add_headers_and_sources(clickhouse_functions_url .)
add_library(clickhouse_functions_url ${clickhouse_functions_url_sources} ${clickhouse_functions_url_headers})
target_link_libraries(clickhouse_functions_url PRIVATE dbms)
# Won't generate debug info for files with heavy template instantiation to achieve faster linking and lower size.
target_compile_options(clickhouse_functions_url PRIVATE "-g0")
if (STRIP_DEBUG_SYMBOLS_FUNCTIONS)
target_compile_options(clickhouse_functions_url PRIVATE "-g0")
endif()
# TODO: move Functions/Regexps.h to some lib and use here
target_link_libraries(clickhouse_functions_url PRIVATE hyperscan)

View File

@ -3,5 +3,6 @@ add_headers_and_sources(clickhouse_functions_array .)
add_library(clickhouse_functions_array ${clickhouse_functions_array_sources} ${clickhouse_functions_array_headers})
target_link_libraries(clickhouse_functions_array PRIVATE dbms clickhouse_functions_gatherutils)
# Won't generate debug info for files with heavy template instantiation to achieve faster linking and lower size.
target_compile_options(clickhouse_functions_array PRIVATE "-g0")
if (STRIP_DEBUG_SYMBOLS_FUNCTIONS)
target_compile_options(clickhouse_functions_array PRIVATE "-g0")
endif()

View File

@ -1,5 +1,3 @@
#include <optional>
#include <type_traits>
#include <Functions/IFunctionImpl.h>
#include <Functions/FunctionFactory.h>
#include <Functions/FunctionHelpers.h>
@ -799,7 +797,7 @@ private:
block.getByPosition(result).column = std::move(col_result);
return true;
}
else if (col_lc->getDictionaryPtr()->isNullable()) // LC(Nullable(T)) and U
else if (col_lc->nestedIsNullable()) // LC(Nullable(T)) and U
{
const ColumnPtr left_casted = col_lc->convertToFullColumnIfLowCardinality(); // Nullable(T)
const ColumnNullable& left_nullable = *checkAndGetColumn<ColumnNullable>(left_casted.get());

View File

@ -10,7 +10,6 @@ ADDINCL(
contrib/libs/farmhash
contrib/libs/h3/h3lib/include
contrib/libs/hyperscan/src
contrib/libs/icu/common
contrib/libs/libdivide
contrib/libs/rapidjson/include
contrib/libs/xxhash

View File

@ -9,7 +9,6 @@ ADDINCL(
contrib/libs/farmhash
contrib/libs/h3/h3lib/include
contrib/libs/hyperscan/src
contrib/libs/icu/common
contrib/libs/libdivide
contrib/libs/rapidjson/include
contrib/libs/xxhash

View File

@ -123,6 +123,11 @@ public:
return bytes_ignored;
}
void ignoreAll()
{
tryIgnore(std::numeric_limits<size_t>::max());
}
/** Reads a single byte. */
bool ALWAYS_INLINE read(char & c)
{

View File

@ -1,6 +1,7 @@
#pragma once
#include <Common/typeid_cast.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTQueryWithTableAndOutput.h>
#include <Parsers/ASTRenameQuery.h>
#include <Parsers/ASTIdentifier.h>
@ -23,8 +24,9 @@ namespace DB
class AddDefaultDatabaseVisitor
{
public:
AddDefaultDatabaseVisitor(const String & database_name_, std::ostream * ostr_ = nullptr)
AddDefaultDatabaseVisitor(const String & database_name_, bool only_replace_current_database_function_ = false, std::ostream * ostr_ = nullptr)
: database_name(database_name_),
only_replace_current_database_function(only_replace_current_database_function_),
visit_depth(0),
ostr(ostr_)
{}
@ -34,7 +36,8 @@ public:
visitDDLChildren(ast);
if (!tryVisitDynamicCast<ASTQueryWithTableAndOutput>(ast) &&
!tryVisitDynamicCast<ASTRenameQuery>(ast))
!tryVisitDynamicCast<ASTRenameQuery>(ast) &&
!tryVisitDynamicCast<ASTFunction>(ast))
{}
}
@ -60,6 +63,7 @@ public:
private:
const String database_name;
bool only_replace_current_database_function = false;
mutable size_t visit_depth;
std::ostream * ostr;
@ -164,12 +168,18 @@ private:
void visitDDL(ASTQueryWithTableAndOutput & node, ASTPtr &) const
{
if (only_replace_current_database_function)
return;
if (node.database.empty())
node.database = database_name;
}
void visitDDL(ASTRenameQuery & node, ASTPtr &) const
{
if (only_replace_current_database_function)
return;
for (ASTRenameQuery::Element & elem : node.elements)
{
if (elem.from.database.empty())
@ -179,6 +189,15 @@ private:
}
}
void visitDDL(ASTFunction & function, ASTPtr & node) const
{
if (function.name == "currentDatabase")
{
node = std::make_shared<ASTLiteral>(database_name);
return;
}
}
void visitDDLChildren(ASTPtr & ast) const
{
for (auto & child : ast->children)

View File

@ -1434,9 +1434,11 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
[](const AccessRightsElement & elem) { return elem.isEmptyDatabase(); })
!= query_requires_access.end());
bool use_local_default_database = false;
const String & current_database = context.getCurrentDatabase();
if (need_replace_current_database)
{
bool use_local_default_database = false;
Strings shard_default_databases;
for (const auto & shard : shards)
{
@ -1457,10 +1459,6 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
if (use_local_default_database)
{
const String & current_database = context.getCurrentDatabase();
AddDefaultDatabaseVisitor visitor(current_database);
visitor.visitDDL(query_ptr);
query_requires_access.replaceEmptyDatabase(current_database);
}
else
@ -1481,6 +1479,9 @@ BlockIO executeDDLQueryOnCluster(const ASTPtr & query_ptr_, const Context & cont
}
}
AddDefaultDatabaseVisitor visitor(current_database, !use_local_default_database);
visitor.visitDDL(query_ptr);
/// Check access rights, assume that all servers have the same users config
if (query_requires_grant_option)
context.getAccess()->checkGrantOption(query_requires_access);

View File

@ -22,7 +22,7 @@ namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int WRONG_GLOBAL_SUBQUERY;
}
@ -73,8 +73,7 @@ public:
is_table = true;
if (!subquery_or_table_name)
throw Exception("Logical error: unknown AST element passed to ExpressionAnalyzer::addExternalStorage method",
ErrorCodes::LOGICAL_ERROR);
throw Exception("Global subquery requires subquery or table name", ErrorCodes::WRONG_GLOBAL_SUBQUERY);
if (is_table)
{

View File

@ -152,7 +152,7 @@ void QueryNormalizer::visitChildren(const ASTPtr & node, Data & data)
{
if (const auto * func_node = node->as<ASTFunction>())
{
if (func_node->query)
if (func_node->tryGetQueryArgument())
{
if (func_node->name != "view")
throw Exception("Query argument can only be used in the `view` TableFunction", ErrorCodes::BAD_ARGUMENTS);

View File

@ -13,6 +13,9 @@ ASTPtr ASTExpressionList::clone() const
void ASTExpressionList::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
if (frame.expression_list_prepend_whitespace)
settings.ostr << ' ';
for (ASTs::const_iterator it = children.begin(); it != children.end(); ++it)
{
if (it != children.begin())
@ -30,6 +33,12 @@ void ASTExpressionList::formatImplMultiline(const FormatSettings & settings, For
{
std::string indent_str = "\n" + std::string(4 * (frame.indent + 1), ' ');
if (frame.expression_list_prepend_whitespace)
{
if (!(children.size() > 1 || frame.expression_list_always_start_on_new_line))
settings.ostr << ' ';
}
++frame.indent;
for (ASTs::const_iterator it = children.begin(); it != children.end(); ++it)
{

View File

@ -48,7 +48,6 @@ ASTPtr ASTFunction::clone() const
auto res = std::make_shared<ASTFunction>(*this);
res->children.clear();
if (query) { res->query = query->clone(); res->children.push_back(res->query); }
if (arguments) { res->arguments = arguments->clone(); res->children.push_back(res->arguments); }
if (parameters) { res->parameters = parameters->clone(); res->children.push_back(res->parameters); }
@ -112,14 +111,25 @@ static bool highlightStringLiteralWithMetacharacters(const ASTPtr & node, const
}
ASTSelectWithUnionQuery * ASTFunction::tryGetQueryArgument() const
{
if (arguments && arguments->children.size() == 1)
{
return arguments->children[0]->as<ASTSelectWithUnionQuery>();
}
return nullptr;
}
void ASTFunction::formatImplWithoutAlias(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
frame.expression_list_prepend_whitespace = false;
FormatStateStacked nested_need_parens = frame;
FormatStateStacked nested_dont_need_parens = frame;
nested_need_parens.need_parens = true;
nested_dont_need_parens.need_parens = false;
if (query)
if (auto * query = tryGetQueryArgument())
{
std::string nl_or_nothing = settings.one_line ? "" : "\n";
std::string indent_str = settings.one_line ? "" : std::string(4u * frame.indent, ' ');

View File

@ -2,6 +2,7 @@
#include <Parsers/ASTWithAlias.h>
#include <Parsers/ASTExpressionList.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
namespace DB
@ -13,7 +14,6 @@ class ASTFunction : public ASTWithAlias
{
public:
String name;
ASTPtr query; // It's possible for a function to accept a query as its only argument.
ASTPtr arguments;
/// parameters - for parametric aggregate function. Example: quantile(0.9)(x) - what in first parens are 'parameters'.
ASTPtr parameters;
@ -26,6 +26,8 @@ public:
void updateTreeHashImpl(SipHash & hash_state) const override;
ASTSelectWithUnionQuery * tryGetQueryArgument() const;
protected:
void formatImplWithoutAlias(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
void appendColumnNameImpl(WriteBuffer & ostr) const override;

View File

@ -72,18 +72,20 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
{
frame.current_select = this;
frame.need_parens = false;
frame.expression_list_prepend_whitespace = true;
std::string indent_str = s.one_line ? "" : std::string(4 * frame.indent, ' ');
if (with())
{
s.ostr << (s.hilite ? hilite_keyword : "") << indent_str << "WITH " << (s.hilite ? hilite_none : "");
s.ostr << (s.hilite ? hilite_keyword : "") << indent_str << "WITH" << (s.hilite ? hilite_none : "");
s.one_line
? with()->formatImpl(s, state, frame)
: with()->as<ASTExpressionList &>().formatImplMultiline(s, state, frame);
s.ostr << s.nl_or_ws;
}
s.ostr << (s.hilite ? hilite_keyword : "") << indent_str << "SELECT " << (distinct ? "DISTINCT " : "") << (s.hilite ? hilite_none : "");
s.ostr << (s.hilite ? hilite_keyword : "") << indent_str << "SELECT" << (distinct ? " DISTINCT" : "") << (s.hilite ? hilite_none : "");
s.one_line
? select()->formatImpl(s, state, frame)
@ -109,7 +111,7 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
if (groupBy())
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "GROUP BY " << (s.hilite ? hilite_none : "");
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "GROUP BY" << (s.hilite ? hilite_none : "");
s.one_line
? groupBy()->formatImpl(s, state, frame)
: groupBy()->as<ASTExpressionList &>().formatImplMultiline(s, state, frame);
@ -132,7 +134,7 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
if (orderBy())
{
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "ORDER BY " << (s.hilite ? hilite_none : "");
s.ostr << (s.hilite ? hilite_keyword : "") << s.nl_or_ws << indent_str << "ORDER BY" << (s.hilite ? hilite_none : "");
s.one_line
? orderBy()->formatImpl(s, state, frame)
: orderBy()->as<ASTExpressionList &>().formatImplMultiline(s, state, frame);
@ -147,7 +149,7 @@ void ASTSelectQuery::formatImpl(const FormatSettings & s, FormatState & state, F
s.ostr << ", ";
}
limitByLength()->formatImpl(s, state, frame);
s.ostr << (s.hilite ? hilite_keyword : "") << " BY " << (s.hilite ? hilite_none : "");
s.ostr << (s.hilite ? hilite_keyword : "") << " BY" << (s.hilite ? hilite_none : "");
s.one_line
? limitBy()->formatImpl(s, state, frame)
: limitBy()->as<ASTExpressionList &>().formatImplMultiline(s, state, frame);

View File

@ -57,6 +57,11 @@ void ASTTTLElement::formatImpl(const FormatSettings & settings, FormatState & st
}
}
}
else if (mode == TTLMode::RECOMPRESS)
{
settings.ostr << " RECOMPRESS ";
recompression_codec->formatImpl(settings, state, frame);
}
else if (mode == TTLMode::DELETE)
{
/// It would be better to output "DELETE" here but that will break compatibility with earlier versions.

View File

@ -20,6 +20,8 @@ public:
ASTs group_by_key;
std::vector<std::pair<String, ASTPtr>> group_by_aggregations;
ASTPtr recompression_codec;
ASTTTLElement(TTLMode mode_, DataDestinationType destination_type_, const String & destination_name_)
: mode(mode_)
, destination_type(destination_type_)

View File

@ -210,6 +210,7 @@ void ASTTableJoin::formatImplBeforeTable(const FormatSettings & settings, Format
void ASTTableJoin::formatImplAfterTable(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
frame.need_parens = false;
frame.expression_list_prepend_whitespace = false;
if (using_expression_list)
{
@ -236,8 +237,10 @@ void ASTTableJoin::formatImpl(const FormatSettings & settings, FormatState & sta
void ASTArrayJoin::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
frame.expression_list_prepend_whitespace = true;
settings.ostr << (settings.hilite ? hilite_keyword : "")
<< (kind == Kind::Left ? "LEFT " : "") << "ARRAY JOIN " << (settings.hilite ? hilite_none : "");
<< (kind == Kind::Left ? "LEFT " : "") << "ARRAY JOIN" << (settings.hilite ? hilite_none : "");
settings.one_line
? expression_list->formatImpl(settings, state, frame)

View File

@ -260,8 +260,10 @@ bool ParserFunction::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
++pos;
auto function_node = std::make_shared<ASTFunction>();
tryGetIdentifierNameInto(identifier, function_node->name);
function_node->query = query;
function_node->children.push_back(function_node->query);
auto expr_list_with_single_query = std::make_shared<ASTExpressionList>();
expr_list_with_single_query->children.push_back(query);
function_node->arguments = expr_list_with_single_query;
function_node->children.push_back(function_node->arguments);
node = function_node;
return true;
}
@ -1660,6 +1662,8 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_where("WHERE");
ParserKeyword s_group_by("GROUP BY");
ParserKeyword s_set("SET");
ParserKeyword s_recompress("RECOMPRESS");
ParserKeyword s_codec("CODEC");
ParserToken s_comma(TokenType::Comma);
ParserToken s_eq(TokenType::Equals);
@ -1667,6 +1671,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserStringLiteral parser_string_literal;
ParserExpression parser_exp;
ParserExpressionList parser_expression_list(false);
ParserCodec parser_codec;
ASTPtr ttl_expr;
if (!parser_exp.parse(pos, ttl_expr, expected))
@ -1690,6 +1695,10 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
mode = TTLMode::GROUP_BY;
}
else if (s_recompress.ignore(pos))
{
mode = TTLMode::RECOMPRESS;
}
else
{
s_delete.ignore(pos);
@ -1698,6 +1707,7 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ASTPtr where_expr;
ASTPtr ast_group_by_key;
ASTPtr recompression_codec;
std::vector<std::pair<String, ASTPtr>> group_by_aggregations;
if (mode == TTLMode::MOVE)
@ -1741,6 +1751,14 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
if (!parser_exp.parse(pos, where_expr, expected))
return false;
}
else if (mode == TTLMode::RECOMPRESS)
{
if (!s_codec.ignore(pos))
return false;
if (!parser_codec.parse(pos, recompression_codec, expected))
return false;
}
auto ttl_element = std::make_shared<ASTTTLElement>(mode, destination_type, destination_name);
ttl_element->setTTL(std::move(ttl_expr));
@ -1753,6 +1771,9 @@ bool ParserTTLElement::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ttl_element->group_by_aggregations = std::move(group_by_aggregations);
}
if (mode == TTLMode::RECOMPRESS)
ttl_element->recompression_codec = recompression_codec;
node = ttl_element;
return true;
}

View File

@ -203,6 +203,7 @@ public:
UInt8 indent = 0;
bool need_parens = false;
bool expression_list_always_start_on_new_line = false; /// Line feed and indent before expression list even if it's of single element.
bool expression_list_prepend_whitespace = false; /// Prepend whitespace (if it is required)
const IAST * current_select = nullptr;
};

View File

@ -253,15 +253,27 @@ Token Lexer::nextTokenImpl()
else
{
++pos;
/// Nested multiline comments are supported according to the SQL standard.
size_t nesting_level = 1;
while (pos + 2 <= end)
{
/// This means that nested multiline comments are not supported.
if (pos[0] == '*' && pos[1] == '/')
if (pos[0] == '/' && pos[1] == '*')
{
pos += 2;
return Token(TokenType::Comment, token_begin, pos);
++nesting_level;
}
++pos;
else if (pos[0] == '*' && pos[1] == '/')
{
pos += 2;
--nesting_level;
if (nesting_level == 0)
return Token(TokenType::Comment, token_begin, pos);
}
else
++pos;
}
return Token(TokenType::ErrorMultilineCommentIsNotClosed, token_begin, end);
}

Some files were not shown because too many files have changed in this diff Show More