Merge remote-tracking branch 'origin/master' into pr-custom-key-failover

This commit is contained in:
Igor Nikonov 2023-11-30 14:09:26 +00:00
commit b4bf611d5d
217 changed files with 4808 additions and 883 deletions

View File

@ -33,8 +33,6 @@ curl https://clickhouse.com/ | sh
## Upcoming Events
* [**ClickHouse Meetup in San Francisco**](https://www.meetup.com/clickhouse-silicon-valley-meetup-group/events/296334923/) - Nov 14
* [**ClickHouse Meetup in Singapore**](https://www.meetup.com/clickhouse-singapore-meetup-group/events/296334976/) - Nov 15
* [**ClickHouse Meetup in Berlin**](https://www.meetup.com/clickhouse-berlin-user-group/events/296488501/) - Nov 30
* [**ClickHouse Meetup in NYC**](https://www.meetup.com/clickhouse-new-york-user-group/events/296488779/) - Dec 11
* [**ClickHouse Meetup in Boston**](https://www.meetup.com/clickhouse-boston-user-group/events/296488840/) - Dec 12

2
contrib/libpqxx vendored

@ -1 +1 @@
Subproject commit 791d68fd89902835133c50435e380ec7a73271b7
Subproject commit c995193a3a14d71f4711f1f421f65a1a1db64640

View File

@ -39,8 +39,8 @@ If you need to update rows frequently, we recommend using the [`ReplacingMergeTr
``` sql
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster]
(
name1 [type1] [[NOT] NULL] [DEFAULT|MATERIALIZED|ALIAS|EPHEMERAL expr1] [COMMENT ...] [CODEC(codec1)] [TTL expr1] [PRIMARY KEY],
name2 [type2] [[NOT] NULL] [DEFAULT|MATERIALIZED|ALIAS|EPHEMERAL expr2] [COMMENT ...] [CODEC(codec2)] [TTL expr2] [PRIMARY KEY],
name1 [type1] [[NOT] NULL] [DEFAULT|MATERIALIZED|ALIAS|EPHEMERAL expr1] [COMMENT ...] [CODEC(codec1)] [STATISTIC(stat1)] [TTL expr1] [PRIMARY KEY],
name2 [type2] [[NOT] NULL] [DEFAULT|MATERIALIZED|ALIAS|EPHEMERAL expr2] [COMMENT ...] [CODEC(codec2)] [STATISTIC(stat2)] [TTL expr2] [PRIMARY KEY],
...
INDEX index_name1 expr1 TYPE type1(...) [GRANULARITY value1],
INDEX index_name2 expr2 TYPE type2(...) [GRANULARITY value2],
@ -1358,3 +1358,33 @@ In this sample configuration:
- `_partition_value` — Values (a tuple) of a `partition by` expression.
- `_sample_factor` — Sample factor (from the query).
- `_block_number` — Block number of the row, it is persisted on merges when `allow_experimental_block_number_column` is set to true.
## Column Statistics (Experimental) {#column-statistics}
The statistic declaration is in the columns section of the `CREATE` query for tables from the `*MergeTree*` Family when we enable `set allow_experimental_statistic = 1`.
``` sql
CREATE TABLE example_table
(
a Int64 STATISTIC(tdigest),
b Float64
)
ENGINE = MergeTree
ORDER BY a
```
We can also manipulate statistics with `ALTER` statements.
```sql
ALTER TABLE example_table ADD STATISTIC b TYPE tdigest;
ALTER TABLE example_table DROP STATISTIC a TYPE tdigest;
```
These lightweight statistics aggregate information about distribution of values in columns.
They can be used for query optimization when we enable `set allow_statistic_optimize = 1`.
#### Available Types of Column Statistics {#available-types-of-column-statistics}
- `tdigest`
Stores distribution of values from numeric columns in [TDigest](https://github.com/tdunning/t-digest) sketch.

View File

@ -16,9 +16,9 @@ More information about PGO in ClickHouse you can read in the corresponding GitHu
There are two major kinds of PGO: [Instrumentation](https://clang.llvm.org/docs/UsersManual.html#using-sampling-profilers) and [Sampling](https://clang.llvm.org/docs/UsersManual.html#using-sampling-profilers) (also known as AutoFDO). In this guide is described the Instrumentation PGO with ClickHouse.
1. Build ClickHouse in Instrumented mode. In Clang it can be done via passing `-fprofile-instr-generate` option to `CXXFLAGS`.
1. Build ClickHouse in Instrumented mode. In Clang it can be done via passing `-fprofile-generate` option to `CXXFLAGS`.
2. Run instrumented ClickHouse on a sample workload. Here you need to use your usual workload. One of the approaches could be using [ClickBench](https://github.com/ClickHouse/ClickBench) as a sample workload. ClickHouse in the instrumentation mode could work slowly so be ready for that and do not run instrumented ClickHouse in performance-critical environments.
3. Recompile ClickHouse once again with `-fprofile-instr-use` compiler flags and profiles that are collected from the previous step.
3. Recompile ClickHouse once again with `-fprofile-use` compiler flags and profiles that are collected from the previous step.
A more detailed guide on how to apply PGO is in the Clang [documentation](https://clang.llvm.org/docs/UsersManual.html#profile-guided-optimization).

View File

@ -1835,9 +1835,10 @@ Settings:
- `endpoint` HTTP endpoint for scraping metrics by prometheus server. Start from /.
- `port` Port for `endpoint`.
- `metrics` Flag that sets to expose metrics from the [system.metrics](../../operations/system-tables/metrics.md#system_tables-metrics) table.
- `events` Flag that sets to expose metrics from the [system.events](../../operations/system-tables/events.md#system_tables-events) table.
- `asynchronous_metrics` Flag that sets to expose current metrics values from the [system.asynchronous_metrics](../../operations/system-tables/asynchronous_metrics.md#system_tables-asynchronous_metrics) table.
- `metrics` Expose metrics from the [system.metrics](../../operations/system-tables/metrics.md#system_tables-metrics) table.
- `events` Expose metrics from the [system.events](../../operations/system-tables/events.md#system_tables-events) table.
- `asynchronous_metrics` Expose current metrics values from the [system.asynchronous_metrics](../../operations/system-tables/asynchronous_metrics.md#system_tables-asynchronous_metrics) table.
- `errors` - Expose the number of errors by error codes occurred since the last server restart. This information could be obtained from the [system.errors](../../operations/system-tables/asynchronous_metrics.md#system_tables-errors) as well.
**Example**
@ -1853,6 +1854,7 @@ Settings:
<metrics>true</metrics>
<events>true</events>
<asynchronous_metrics>true</asynchronous_metrics>
<errors>true</errors>
</prometheus>
<!-- highlight-end -->
</clickhouse>
@ -2350,7 +2352,7 @@ Path on the local filesystem to store temporary data for processing large querie
## user_files_path {#user_files_path}
The directory with user files. Used in the table function [file()](../../sql-reference/table-functions/file.md).
The directory with user files. Used in the table function [file()](../../sql-reference/table-functions/file.md), [fileCluster()](../../sql-reference/table-functions/fileCluster.md).
**Example**

View File

@ -4801,6 +4801,14 @@ a Tuple(
)
```
## allow_experimental_statistic {#allow_experimental_statistic}
Allows defining columns with [statistics](../../engines/table-engines/mergetree-family/mergetree.md#table_engine-mergetree-creating-a-table) and [manipulate statistics](../../engines/table-engines/mergetree-family/mergetree.md#column-statistics).
## allow_statistic_optimize {#allow_statistic_optimize}
Allows using statistic to optimize the order of [prewhere conditions](../../sql-reference/statements/select/prewhere.md).
## analyze_index_with_space_filling_curves
If a table has a space-filling curve in its index, e.g. `ORDER BY mortonEncode(x, y)`, and the query has conditions on its arguments, e.g. `x >= 10 AND x <= 20 AND y >= 20 AND y <= 30`, use the space-filling curve for index analysis.

View File

@ -0,0 +1,48 @@
---
toc_priority: 112
---
# groupArraySorted {#groupArraySorted}
Returns an array with the first N items in ascending order.
``` sql
groupArraySorted(N)(column)
```
**Arguments**
- `N` The number of elements to return.
If the parameter is omitted, default value is the size of input.
- `column` The value (Integer, String, Float and other Generic types).
**Example**
Gets the first 10 numbers:
``` sql
SELECT groupArraySorted(10)(number) FROM numbers(100)
```
``` text
┌─groupArraySorted(10)(number)─┐
│ [0,1,2,3,4,5,6,7,8,9] │
└──────────────────────────────┘
```
Gets all the String implementations of all numbers in column:
``` sql
SELECT groupArraySorted(str) FROM (SELECT toString(number) as str FROM numbers(5));
```
``` text
┌─groupArraySorted(str)────────┐
│ ['0','1','2','3','4'] │
└──────────────────────────────┘
```

View File

@ -54,6 +54,7 @@ ClickHouse-specific aggregate functions:
- [groupArrayMovingAvg](/docs/en/sql-reference/aggregate-functions/reference/grouparraymovingavg.md)
- [groupArrayMovingSum](/docs/en/sql-reference/aggregate-functions/reference/grouparraymovingsum.md)
- [groupArraySample](./grouparraysample.md)
- [groupArraySorted](/docs/en/sql-reference/aggregate-functions/reference/grouparraysorted.md)
- [groupBitAnd](/docs/en/sql-reference/aggregate-functions/reference/groupbitand.md)
- [groupBitOr](/docs/en/sql-reference/aggregate-functions/reference/groupbitor.md)
- [groupBitXor](/docs/en/sql-reference/aggregate-functions/reference/groupbitxor.md)

View File

@ -16,6 +16,7 @@ Most `ALTER TABLE` queries modify table settings or data:
- [INDEX](/docs/en/sql-reference/statements/alter/skipping-index.md)
- [CONSTRAINT](/docs/en/sql-reference/statements/alter/constraint.md)
- [TTL](/docs/en/sql-reference/statements/alter/ttl.md)
- [STATISTIC](/docs/en/sql-reference/statements/alter/statistic.md)
:::note
Most `ALTER TABLE` queries are supported only for [\*MergeTree](/docs/en/engines/table-engines/mergetree-family/index.md) tables, as well as [Merge](/docs/en/engines/table-engines/special/merge.md) and [Distributed](/docs/en/engines/table-engines/special/distributed.md).

View File

@ -0,0 +1,25 @@
---
slug: /en/sql-reference/statements/alter/statistic
sidebar_position: 45
sidebar_label: STATISTIC
---
# Manipulating Column Statistics
The following operations are available:
- `ALTER TABLE [db].table ADD STATISTIC (columns list) TYPE type` - Adds statistic description to tables metadata.
- `ALTER TABLE [db].table DROP STATISTIC (columns list) TYPE type` - Removes statistic description from tables metadata and deletes statistic files from disk.
- `ALTER TABLE [db].table CLEAR STATISTIC (columns list) TYPE type` - Deletes statistic files from disk.
- `ALTER TABLE [db.]table MATERIALIZE STATISTIC (columns list) TYPE type` - Rebuilds the statistic for columns. Implemented as a [mutation](../../../sql-reference/statements/alter/index.md#mutations).
The first two commands are lightweight in a sense that they only change metadata or remove files.
Also, they are replicated, syncing statistics metadata via ZooKeeper.
:::note
Statistic manipulation is supported only for tables with [`*MergeTree`](../../../engines/table-engines/mergetree-family/mergetree.md) engine (including [replicated](../../../engines/table-engines/mergetree-family/replication.md) variants).
:::

View File

@ -0,0 +1,85 @@
---
slug: /en/sql-reference/table-functions/fileCluster
sidebar_position: 61
sidebar_label: fileCluster
---
# fileCluster Table Function
Enables simultaneous processing of files matching a specified path across multiple nodes within a cluster. The initiator establishes connections to worker nodes, expands globs in the file path, and delegates file-reading tasks to worker nodes. Each worker node is querying the initiator for the next file to process, repeating until all tasks are completed (all files are read).
:::note
This function will operate _correctly_ only in case the set of files matching the initially specified path is identical across all nodes, and their content is consistent among different nodes.
In case these files differ between nodes, the return value cannot be predetermined and depends on the order in which worker nodes request tasks from the initiator.
:::
**Syntax**
``` sql
fileCluster(cluster_name, path[, format, structure, compression_method])
```
**Arguments**
- `cluster_name` — Name of a cluster that is used to build a set of addresses and connection parameters to remote and local servers.
- `path` — The relative path to the file from [user_files_path](/docs/en/operations/server-configuration-parameters/settings.md#server_configuration_parameters-user_files_path). Path to file also supports [globs](#globs_in_path).
- `format` — [Format](../../interfaces/formats.md#formats) of the files. Type: [String](../../sql-reference/data-types/string.md).
- `structure` — Table structure in `'UserID UInt64, Name String'` format. Determines column names and types. Type: [String](../../sql-reference/data-types/string.md).
- `compression_method` — Compression method. Supported compression types are `gz`, `br`, `xz`, `zst`, `lz4`, and `bz2`.
**Returned value**
A table with the specified format and structure and with data from files matching the specified path.
**Example**
Given a cluster named `my_cluster` and given the following value of setting `user_files_path`:
``` bash
$ grep user_files_path /etc/clickhouse-server/config.xml
<user_files_path>/var/lib/clickhouse/user_files/</user_files_path>
```
Also, given there are files `test1.csv` and `test2.csv` inside `user_files_path` of each cluster node, and their content is identical across different nodes:
```bash
$ cat /var/lib/clickhouse/user_files/test1.csv
1,"file1"
11,"file11"
$ cat /var/lib/clickhouse/user_files/test1.csv
2,"file2"
22,"file22"
```
For example, one can create these files by executing these two queries on every cluster node:
```sql
INSERT INTO TABLE FUNCTION file('file1.csv', 'CSV', 'i UInt32, s String') VALUES (1,'file1'), (11,'file11');
INSERT INTO TABLE FUNCTION file('file2.csv', 'CSV', 'i UInt32, s String') VALUES (2,'file2'), (22,'file22');
```
Now, read data contents of `test1.csv` and `test2.csv` via `fileCluster` table function:
```sql
SELECT * from fileCluster(
'my_cluster', 'file{1,2}.csv', 'CSV', 'i UInt32, s String') ORDER BY (i, s)"""
)
```
```
┌──i─┬─s──────┐
│ 1 │ file1 │
│ 11 │ file11 │
└────┴────────┘
┌──i─┬─s──────┐
│ 2 │ file2 │
│ 22 │ file22 │
└────┴────────┘
```
## Globs in Path {#globs_in_path}
All patterns supported by [File](../../sql-reference/table-functions/file.md#globs-in-path) table function are supported by FileCluster.
**See Also**
- [File table function](../../sql-reference/table-functions/file.md)

View File

@ -1215,6 +1215,7 @@ ClickHouse использует потоки из глобального пул
- `metrics` флаг для экспорта текущих значений метрик из таблицы [system.metrics](../system-tables/metrics.md#system_tables-metrics).
- `events` флаг для экспорта текущих значений метрик из таблицы [system.events](../system-tables/events.md#system_tables-events).
- `asynchronous_metrics` флаг для экспорта текущих значений значения метрик из таблицы [system.asynchronous_metrics](../system-tables/asynchronous_metrics.md#system_tables-asynchronous_metrics).
- `errors` - флаг для экспорта количества ошибок (по кодам) случившихся с момента последнего рестарта сервера. Эта информация может быть получена из таблицы [system.errors](../system-tables/asynchronous_metrics.md#system_tables-errors)
**Пример**
@ -1225,6 +1226,7 @@ ClickHouse использует потоки из глобального пул
<metrics>true</metrics>
<events>true</events>
<asynchronous_metrics>true</asynchronous_metrics>
<errors>true</errors>
</prometheus>
```
@ -1676,7 +1678,7 @@ TCP порт для защищённого обмена данными с кли
## user_files_path {#server_configuration_parameters-user_files_path}
Каталог с пользовательскими файлами. Используется в табличной функции [file()](../../operations/server-configuration-parameters/settings.md).
Каталог с пользовательскими файлами. Используется в табличных функциях [file()](../../sql-reference/table-functions/fileCluster.md) и [fileCluster()](../../sql-reference/table-functions/fileCluster.md).
**Пример**

View File

@ -13,7 +13,7 @@ sidebar_label: file
**Синтаксис**
``` sql
file(path [,format] [,structure])
file(path [,format] [,structure] [,compression])
```
**Параметры**
@ -21,6 +21,7 @@ file(path [,format] [,structure])
- `path` — относительный путь до файла от [user_files_path](../../sql-reference/table-functions/file.md#server_configuration_parameters-user_files_path). Путь к файлу поддерживает следующие шаблоны в режиме доступа только для чтения `*`, `?`, `{abc,def}` и `{N..M}`, где `N`, `M` — числа, `'abc', 'def'` — строки.
- `format` — [формат](../../interfaces/formats.md#formats) файла.
- `structure` — структура таблицы. Формат: `'colunmn1_name column1_ype, column2_name column2_type, ...'`.
- `compression` — Используемый тип сжатия для запроса SELECT или желаемый тип сжатия для запроса INSERT. Поддерживаемые типы сжатия: `gz`, `br`, `xz`, `zst`, `lz4` и `bz2`.
**Возвращаемое значение**

View File

@ -0,0 +1,84 @@
---
slug: /ru/sql-reference/table-functions/fileCluster
sidebar_position: 38
sidebar_label: fileCluster
---
# fileCluster
Позволяет одновременно обрабатывать файлы, находящиеся по указанному пути, на нескольких узлах внутри кластера. Узел-инициатор устанавливает соединения с рабочими узлами (worker nodes), раскрывает шаблоны в пути к файлам и отдаёт задачи по чтению файлов рабочим узлам. Рабочий узел запрашивает у инициатора путь к следующему файлу для обработки, повторяя до тех пор, пока не завершатся все задачи (то есть пока не будут обработаны все файлы).
:::note
Эта табличная функция будет работать орректно_ только в случае, если набор файлов, соответствующих изначально указанному пути, одинаков на всех узлах и содержание этих файлов идентично на различных узлах. В случае, если эти файлы различаются между узлами, результат не предопределён и зависит от очерёдности, с которой рабочие узлы будут запрашивать задачи у инициатора.
:::
**Синтаксис**
``` sql
fileCluster(cluster_name, path[, format, structure, compression_method])
```
**Аргументы**
- `cluster_name` — имя кластера, используемое для создания набора адресов и параметров подключения к удаленным и локальным серверам.
- `path` — относительный путь до файла от [user_files_path](../../sql-reference/table-functions/file.md#server_configuration_parameters-user_files_path). Путь к файлу поддерживает [шаблоны поискаglobs](#globs_in_path).
- `format` — [формат](../../interfaces/formats.md#formats) файла.
- `structure` — структура таблицы. Формат: `'colunmn1_name column1_ype, column2_name column2_type, ...'`.
- `compression_method` — Используемый тип сжатия. Поддерживаемые типы: `gz`, `br`, `xz`, `zst`, `lz4` и `bz2`.
**Возвращаемое значение**
Таблица с указанным форматом и структурой, содержащая данные из файлов, соответствующих указанному пути.
**Пример**
Пусть есть кластер с именем `my_cluster`, а также установлено нижеследующее значение параметра `user_files_path`:
``` bash
$ grep user_files_path /etc/clickhouse-server/config.xml
<user_files_path>/var/lib/clickhouse/user_files/</user_files_path>
```
Пусть также на каждом узле кластера в директории `user_files_path` находятся файлы `test1.csv` и `test2.csv`, и их содержимое идентично на разных узлах:
```bash
$ cat /var/lib/clickhouse/user_files/test1.csv
1,"file1"
11,"file11"
$ cat /var/lib/clickhouse/user_files/test1.csv
2,"file2"
22,"file22"
```
Например, эти файлы можно создать, выполнив на каждом узле два запроса:
```sql
INSERT INTO TABLE FUNCTION file('file1.csv', 'CSV', 'i UInt32, s String') VALUES (1,'file1'), (11,'file11');
INSERT INTO TABLE FUNCTION file('file2.csv', 'CSV', 'i UInt32, s String') VALUES (2,'file2'), (22,'file22');
```
Прочитаем содержимое файлов `test1.csv` и `test2.csv` с помощью табличной функции `fileCluster`:
```sql
SELECT * from fileCluster(
'my_cluster', 'file{1,2}.csv', 'CSV', 'i UInt32, s String') ORDER BY (i, s)"""
)
```
```
┌──i─┬─s──────┐
│ 1 │ file1 │
│ 11 │ file11 │
└────┴────────┘
┌──i─┬─s──────┐
│ 2 │ file2 │
│ 22 │ file22 │
└────┴────────┘
```
## Шаблоны поиска в компонентах пути {#globs_in_path}
Поддерживаются все шаблоны поиска, что поддерживаются табличной функцией [File](../../sql-reference/table-functions/file.md#globs-in-path).
**Смотрите также**
- [File (табличная функция)](../../sql-reference/table-functions/file.md)

View File

@ -41,6 +41,7 @@
<min_session_timeout_ms>10000</min_session_timeout_ms>
<session_timeout_ms>100000</session_timeout_ms>
<raft_logs_level>information</raft_logs_level>
<compress_logs>false</compress_logs>
<!-- All settings listed in https://github.com/ClickHouse/ClickHouse/blob/master/src/Coordination/CoordinationSettings.h -->
</coordination_settings>

View File

@ -51,6 +51,11 @@ enum class AccessType
M(ALTER_CLEAR_INDEX, "CLEAR INDEX", TABLE, ALTER_INDEX) \
M(ALTER_INDEX, "INDEX", GROUP, ALTER_TABLE) /* allows to execute ALTER ORDER BY or ALTER {ADD|DROP...} INDEX */\
\
M(ALTER_ADD_STATISTIC, "ALTER ADD STATISTIC", TABLE, ALTER_STATISTIC) \
M(ALTER_DROP_STATISTIC, "ALTER DROP STATISTIC", TABLE, ALTER_STATISTIC) \
M(ALTER_MATERIALIZE_STATISTIC, "ALTER MATERIALIZE STATISTIC", TABLE, ALTER_STATISTIC) \
M(ALTER_STATISTIC, "STATISTIC", GROUP, ALTER_TABLE) /* allows to execute ALTER STATISTIC */\
\
M(ALTER_ADD_PROJECTION, "ADD PROJECTION", TABLE, ALTER_PROJECTION) \
M(ALTER_DROP_PROJECTION, "DROP PROJECTION", TABLE, ALTER_PROJECTION) \
M(ALTER_MATERIALIZE_PROJECTION, "MATERIALIZE PROJECTION", TABLE, ALTER_PROJECTION) \

View File

@ -0,0 +1,82 @@
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/AggregateFunctionGroupArraySorted.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <Common/Exception.h>
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
extern const int BAD_ARGUMENTS;
}
namespace
{
template <template <typename> class AggregateFunctionTemplate, typename ... TArgs>
AggregateFunctionPtr createWithNumericOrTimeType(const IDataType & argument_type, TArgs && ... args)
{
WhichDataType which(argument_type);
if (which.idx == TypeIndex::Date) return std::make_shared<AggregateFunctionTemplate<UInt16>>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::DateTime) return std::make_shared<AggregateFunctionTemplate<UInt32>>(std::forward<TArgs>(args)...);
if (which.idx == TypeIndex::IPv4) return std::make_shared<AggregateFunctionTemplate<IPv4>>(std::forward<TArgs>(args)...);
return AggregateFunctionPtr(createWithNumericType<AggregateFunctionTemplate, TArgs...>(argument_type, std::forward<TArgs>(args)...));
}
template <typename ... TArgs>
inline AggregateFunctionPtr createAggregateFunctionGroupArraySortedImpl(const DataTypePtr & argument_type, const Array & parameters, TArgs ... args)
{
if (auto res = createWithNumericOrTimeType<GroupArraySortedNumericImpl>(*argument_type, argument_type, parameters, std::forward<TArgs>(args)...))
return AggregateFunctionPtr(res);
WhichDataType which(argument_type);
return std::make_shared<GroupArraySortedGeneralImpl<GroupArraySortedNodeGeneral>>(argument_type, parameters, std::forward<TArgs>(args)...);
}
AggregateFunctionPtr createAggregateFunctionGroupArraySorted(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertUnary(name, argument_types);
UInt64 max_elems = std::numeric_limits<UInt64>::max();
if (parameters.empty())
{
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter for aggregate function {} should have limit argument", name);
}
else if (parameters.size() == 1)
{
auto type = parameters[0].getType();
if (type != Field::Types::Int64 && type != Field::Types::UInt64)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter for aggregate function {} should be positive number", name);
if ((type == Field::Types::Int64 && parameters[0].get<Int64>() < 0) ||
(type == Field::Types::UInt64 && parameters[0].get<UInt64>() == 0))
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Parameter for aggregate function {} should be positive number", name);
max_elems = parameters[0].get<UInt64>();
}
else
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Function {} does not support this number of arguments", name);
return createAggregateFunctionGroupArraySortedImpl(argument_types[0], parameters, max_elems);
}
}
void registerAggregateFunctionGroupArraySorted(AggregateFunctionFactory & factory)
{
AggregateFunctionProperties properties = { .returns_default_when_only_null = false, .is_order_dependent = false };
factory.registerFunction("groupArraySorted", { createAggregateFunctionGroupArraySorted, properties });
}
}

View File

@ -0,0 +1,355 @@
#pragma once
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/ReadBufferFromString.h>
#include <IO/WriteBufferFromString.h>
#include <IO/Operators.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnString.h>
#include <Columns/ColumnVector.h>
#include <Functions/array/arraySort.h>
#include <Common/Exception.h>
#include <Common/ArenaAllocator.h>
#include <Common/assert_cast.h>
#include <Columns/ColumnConst.h>
#include <DataTypes/IDataType.h>
#include <base/sort.h>
#include <Columns/IColumn.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <Common/RadixSort.h>
#include <algorithm>
#include <type_traits>
#include <utility>
#define AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ELEMENT_SIZE 0xFFFFFF
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int TOO_LARGE_ARRAY_SIZE;
}
template <typename T>
struct GroupArraySortedData;
template <typename T>
struct GroupArraySortedData
{
/// For easy serialization.
static_assert(std::has_unique_object_representations_v<T> || std::is_floating_point_v<T>);
// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
using Allocator = MixedAlignedArenaAllocator<alignof(T), 4096>;
using Array = PODArray<T, 32, Allocator>;
Array value;
};
template <typename T>
class GroupArraySortedNumericImpl final
: public IAggregateFunctionDataHelper<GroupArraySortedData<T>, GroupArraySortedNumericImpl<T>>
{
using Data = GroupArraySortedData<T>;
UInt64 max_elems;
SerializationPtr serialization;
public:
explicit GroupArraySortedNumericImpl(
const DataTypePtr & data_type_, const Array & parameters_, UInt64 max_elems_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<GroupArraySortedData<T>, GroupArraySortedNumericImpl<T>>(
{data_type_}, parameters_, std::make_shared<DataTypeArray>(data_type_))
, max_elems(max_elems_)
, serialization(data_type_->getDefaultSerialization())
{
}
String getName() const override { return "groupArraySorted"; }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
const auto & row_value = assert_cast<const ColumnVector<T> &>(*columns[0]).getData()[row_num];
auto & cur_elems = this->data(place);
cur_elems.value.push_back(row_value, arena);
/// To optimize, we sort (2 * max_size) elements of input array over and over again
/// and after each loop we delete the last half of sorted array
if (cur_elems.value.size() >= max_elems * 2)
{
RadixSort<RadixSortNumTraits<T>>::executeLSD(cur_elems.value.data(), cur_elems.value.size());
cur_elems.value.resize(max_elems, arena);
}
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & cur_elems = this->data(place);
auto & rhs_elems = this->data(rhs);
if (rhs_elems.value.empty())
return;
if (rhs_elems.value.size())
cur_elems.value.insertByOffsets(rhs_elems.value, 0, rhs_elems.value.size(), arena);
RadixSort<RadixSortNumTraits<T>>::executeLSD(cur_elems.value.data(), cur_elems.value.size());
size_t elems_size = cur_elems.value.size() < max_elems ? cur_elems.value.size() : max_elems;
cur_elems.value.resize(elems_size, arena);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
auto & value = this->data(place).value;
size_t size = value.size();
writeVarUInt(size, buf);
for (const auto & elem : value)
writeBinaryLittleEndian(elem, buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
{
size_t size = 0;
readVarUInt(size, buf);
if (unlikely(size > max_elems))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too large array size, it should not exceed {}", max_elems);
auto & value = this->data(place).value;
value.resize(size, arena);
for (auto & element : value)
readBinaryLittleEndian(element, buf);
}
static void checkArraySize(size_t elems, size_t max_elems)
{
if (unlikely(elems > max_elems))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE,
"Too large array size {} (maximum: {})", elems, max_elems);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
auto& value = this->data(place).value;
RadixSort<RadixSortNumTraits<T>>::executeLSD(value.data(), value.size());
size_t elems_size = value.size() < max_elems ? value.size() : max_elems;
value.resize(elems_size, arena);
size_t size = value.size();
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
offsets_to.push_back(offsets_to.back() + size);
if (size)
{
typename ColumnVector<T>::Container & data_to = assert_cast<ColumnVector<T> &>(arr_to.getData()).getData();
data_to.insert(this->data(place).value.begin(), this->data(place).value.end());
RadixSort<RadixSortNumTraits<T>>::executeLSD(value.data(), value.size());
value.resize(elems_size, arena);
}
}
bool allocatesMemoryInArena() const override { return true; }
};
template <typename Node, bool has_sampler>
struct GroupArraySortedGeneralData;
template <typename Node>
struct GroupArraySortedGeneralData<Node, false>
{
// Switch to ordinary Allocator after 4096 bytes to avoid fragmentation and trash in Arena
using Allocator = MixedAlignedArenaAllocator<alignof(Node *), 4096>;
using Array = PODArray<Field, 32, Allocator>;
Array value;
};
template <typename Node>
struct GroupArraySortedNodeBase
{
UInt64 size; // size of payload
/// Returns pointer to actual payload
char * data() { return reinterpret_cast<char *>(this) + sizeof(Node); }
const char * data() const { return reinterpret_cast<const char *>(this) + sizeof(Node); }
};
struct GroupArraySortedNodeString : public GroupArraySortedNodeBase<GroupArraySortedNodeString>
{
using Node = GroupArraySortedNodeString;
};
struct GroupArraySortedNodeGeneral : public GroupArraySortedNodeBase<GroupArraySortedNodeGeneral>
{
using Node = GroupArraySortedNodeGeneral;
};
/// Implementation of groupArraySorted for Generic data via Array
template <typename Node>
class GroupArraySortedGeneralImpl final
: public IAggregateFunctionDataHelper<GroupArraySortedGeneralData<Node, false>, GroupArraySortedGeneralImpl<Node>>
{
using Data = GroupArraySortedGeneralData<Node, false>;
static Data & data(AggregateDataPtr __restrict place) { return *reinterpret_cast<Data *>(place); }
static const Data & data(ConstAggregateDataPtr __restrict place) { return *reinterpret_cast<const Data *>(place); }
DataTypePtr & data_type;
UInt64 max_elems;
SerializationPtr serialization;
public:
GroupArraySortedGeneralImpl(const DataTypePtr & data_type_, const Array & parameters_, UInt64 max_elems_ = std::numeric_limits<UInt64>::max())
: IAggregateFunctionDataHelper<GroupArraySortedGeneralData<Node, false>, GroupArraySortedGeneralImpl<Node>>(
{data_type_}, parameters_, std::make_shared<DataTypeArray>(data_type_))
, data_type(this->argument_types[0])
, max_elems(max_elems_)
, serialization(data_type->getDefaultSerialization())
{
}
String getName() const override { return "groupArraySorted"; }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
auto & cur_elems = data(place);
cur_elems.value.push_back(columns[0][0][row_num], arena);
/// To optimize, we sort (2 * max_size) elements of input array over and over again and
/// after each loop we delete the last half of sorted array
if (cur_elems.value.size() >= max_elems * 2)
{
std::sort(cur_elems.value.begin(), cur_elems.value.begin() + (max_elems * 2));
cur_elems.value.erase(cur_elems.value.begin() + max_elems, cur_elems.value.begin() + (max_elems * 2));
}
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & cur_elems = data(place);
auto & rhs_elems = data(rhs);
if (rhs_elems.value.empty())
return;
UInt64 new_elems = rhs_elems.value.size();
for (UInt64 i = 0; i < new_elems; ++i)
cur_elems.value.push_back(rhs_elems.value[i], arena);
checkArraySize(cur_elems.value.size(), AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ELEMENT_SIZE);
if (!cur_elems.value.empty())
{
std::sort(cur_elems.value.begin(), cur_elems.value.end());
if (cur_elems.value.size() > max_elems)
cur_elems.value.resize(max_elems, arena);
}
}
static void checkArraySize(size_t elems, size_t max_elems)
{
if (unlikely(elems > max_elems))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE,
"Too large array size {} (maximum: {})", elems, max_elems);
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
auto & value = data(place).value;
size_t size = value.size();
checkArraySize(size, AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ELEMENT_SIZE);
writeVarUInt(size, buf);
for (const Field & elem : value)
{
if (elem.isNull())
{
writeBinary(false, buf);
}
else
{
writeBinary(true, buf);
serialization->serializeBinary(elem, buf, {});
}
}
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
{
size_t size = 0;
readVarUInt(size, buf);
if (unlikely(size > max_elems))
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Too large array size, it should not exceed {}", max_elems);
checkArraySize(size, AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ELEMENT_SIZE);
auto & value = data(place).value;
value.resize(size, arena);
for (Field & elem : value)
{
UInt8 is_null = 0;
readBinary(is_null, buf);
if (!is_null)
serialization->deserializeBinary(elem, buf, {});
}
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena * arena) const override
{
auto & column_array = assert_cast<ColumnArray &>(to);
auto & value = data(place).value;
if (!value.empty())
{
std::sort(value.begin(), value.end());
if (value.size() > max_elems)
value.resize_exact(max_elems, arena);
}
auto & offsets = column_array.getOffsets();
offsets.push_back(offsets.back() + value.size());
auto & column_data = column_array.getData();
if (std::is_same_v<Node, GroupArraySortedNodeString>)
{
auto & string_offsets = assert_cast<ColumnString &>(column_data).getOffsets();
string_offsets.reserve(string_offsets.size() + value.size());
}
for (const Field& field : value)
column_data.insert(field);
}
bool allocatesMemoryInArena() const override { return true; }
};
#undef AGGREGATE_FUNCTION_GROUP_ARRAY_MAX_ARRAY_SIZE
}

View File

@ -100,7 +100,16 @@ public:
if (has_null_types)
{
/// Currently the only functions that returns not-NULL on all NULL arguments are count and uniq, and they returns UInt64.
/** Some functions, such as `count`, `uniq`, and others, return 0 :: UInt64 instead of NULL for a NULL argument.
* These functions have the `returns_default_when_only_null` property, so we explicitly specify the result type
* when replacing the function with `nothing`.
*
* Note: It's a bit dangerous to have the function result type depend on properties because we do not serialize properties in AST,
* and we can lose this information. For example, when we have `count(NULL)` replaced with `nothing(NULL) as "count(NULL)"` and send it
* to the remote server, the remote server will execute `nothing(NULL)` and return `NULL` while `0` is expected.
*
* To address this, we handle `nothing` in a special way in `FunctionNode::toASTImpl`.
*/
if (properties.returns_default_when_only_null)
return std::make_shared<AggregateFunctionNothing>(arguments, params, std::make_shared<DataTypeUInt64>());
else
@ -144,11 +153,18 @@ public:
}
else
{
return std::make_shared<AggregateFunctionNullVariadic<false, true>>(nested_function, arguments, params);
#if 0
if (serialize_flag)
return std::make_shared<AggregateFunctionNullVariadic<false, true>>(nested_function, arguments, params);
else
/// This should be <false, false> (no serialize flag) but it was initially added incorrectly and
/// changing it would break the binary compatibility of aggregation states using this method
// (such as AggregateFunction(argMaxOrNull, Nullable(Int64), UInt64)). The extra flag is harmless
return std::make_shared<AggregateFunctionNullVariadic<false, true>>(nested_function, arguments, params);
}
#endif
}
}
}
};

View File

@ -43,6 +43,7 @@ namespace ErrorCodes
template <typename T>
class QuantileTDigest
{
friend class TDigestStatistic;
using Value = Float32;
using Count = Float32;
using BetterFloat = Float64; // For intermediate results and sum(Count). Must have better precision, than Count
@ -334,6 +335,44 @@ public:
compress(); // Allows reading/writing TDigests with different epsilon/max_centroids params
}
Float64 getCountLessThan(Float64 value) const
{
bool first = true;
Count sum = 0;
Count prev_count = 0;
Float64 prev_x = 0;
Value prev_mean = 0;
for (const auto & c : centroids)
{
/// std::cerr << "c "<< c.mean << " "<< c.count << std::endl;
Float64 current_x = sum + c.count * 0.5;
if (c.mean >= value)
{
/// value is smaller than any value.
if (first)
return 0;
Float64 left = prev_x + 0.5 * (prev_count == 1);
Float64 right = current_x - 0.5 * (c.count == 1);
Float64 result = checkOverflow<Float64>(interpolate(
static_cast<Value>(value),
prev_mean,
static_cast<Value>(left),
c.mean,
static_cast<Value>(right)));
return result;
}
sum += c.count;
prev_mean = c.mean;
prev_count = c.count;
prev_x = current_x;
first = false;
}
/// count is larger than any value.
return count;
}
/** Calculates the quantile q [0, 1] based on the digest.
* For an empty digest returns NaN.
*/

View File

@ -15,6 +15,7 @@ void registerAggregateFunctionCount(AggregateFunctionFactory &);
void registerAggregateFunctionDeltaSum(AggregateFunctionFactory &);
void registerAggregateFunctionDeltaSumTimestamp(AggregateFunctionFactory &);
void registerAggregateFunctionGroupArray(AggregateFunctionFactory &);
void registerAggregateFunctionGroupArraySorted(AggregateFunctionFactory & factory);
void registerAggregateFunctionGroupUniqArray(AggregateFunctionFactory &);
void registerAggregateFunctionGroupArrayInsertAt(AggregateFunctionFactory &);
void registerAggregateFunctionsQuantile(AggregateFunctionFactory &);
@ -111,6 +112,7 @@ void registerAggregateFunctions()
registerAggregateFunctionDeltaSum(factory);
registerAggregateFunctionDeltaSumTimestamp(factory);
registerAggregateFunctionGroupArray(factory);
registerAggregateFunctionGroupArraySorted(factory);
registerAggregateFunctionGroupUniqArray(factory);
registerAggregateFunctionGroupArrayInsertAt(factory);
registerAggregateFunctionsQuantile(factory);

View File

@ -203,6 +203,18 @@ ASTPtr FunctionNode::toASTImpl(const ConvertToASTOptions & options) const
function_ast->name = function_name;
if (function_name == "nothing")
{
/** Inside AggregateFunctionCombinatorNull we may replace functions with `NULL` in arguments with `nothing`.
* Result type of `nothing` depends on `returns_default_when_only_null` property of nested function.
* If we convert `nothing` to AST, we will lose this information, so we use original function name instead.
*/
const auto & original_ast = getOriginalAST();
const auto & original_function_ast = original_ast ? original_ast->as<ASTFunction>() : nullptr;
if (original_function_ast)
function_ast->name = original_function_ast->name;
}
if (isWindowFunction())
{
function_ast->is_window_function = true;

View File

@ -278,6 +278,7 @@ QueryTreeNodePtr IQueryTreeNode::cloneAndReplace(const ReplacementMap & replacem
if (it != replacement_map.end())
continue;
node_clone->original_ast = node_to_clone->original_ast;
node_clone->setAlias(node_to_clone->alias);
node_clone->children = node_to_clone->children;
node_clone->weak_pointers = node_to_clone->weak_pointers;
@ -318,6 +319,7 @@ QueryTreeNodePtr IQueryTreeNode::cloneAndReplace(const ReplacementMap & replacem
*weak_pointer_ptr = it->second;
}
result_cloned_node_place->original_ast = original_ast;
return result_cloned_node_place;
}

View File

@ -55,6 +55,10 @@ void WithRetries::renewZooKeeper(FaultyKeeper my_faulty_zookeeper) const
callback(my_faulty_zookeeper);
}
else
{
my_faulty_zookeeper->setKeeper(zookeeper);
}
}
const WithRetries::KeeperSettings & WithRetries::getKeeperSettings() const

View File

@ -222,6 +222,7 @@ add_object_library(clickhouse_storages Storages)
add_object_library(clickhouse_storages_mysql Storages/MySQL)
add_object_library(clickhouse_storages_distributed Storages/Distributed)
add_object_library(clickhouse_storages_mergetree Storages/MergeTree)
add_object_library(clickhouse_storages_statistics Storages/Statistics)
add_object_library(clickhouse_storages_liveview Storages/LiveView)
add_object_library(clickhouse_storages_windowview Storages/WindowView)
add_object_library(clickhouse_storages_s3queue Storages/S3Queue)

View File

@ -2861,7 +2861,7 @@ void ClientBase::init(int argc, char ** argv)
("interactive", "Process queries-file or --query query and start interactive mode")
("pager", po::value<std::string>(), "Pipe all output into this command (less or similar)")
("max_memory_usage_in_client", po::value<int>(), "Set memory limit in client/local server")
("max_memory_usage_in_client", po::value<std::string>(), "Set memory limit in client/local server")
;
addOptions(options_description);
@ -2996,10 +2996,12 @@ void ClientBase::init(int argc, char ** argv)
clearPasswordFromCommandLine(argc, argv);
/// Limit on total memory usage
size_t max_client_memory_usage = config().getInt64("max_memory_usage_in_client", 0 /*default value*/);
if (max_client_memory_usage != 0)
std::string max_client_memory_usage = config().getString("max_memory_usage_in_client", "0" /*default value*/);
if (max_client_memory_usage != "0")
{
total_memory_tracker.setHardLimit(max_client_memory_usage);
UInt64 max_client_memory_usage_int = parseWithSizeSuffix<UInt64>(max_client_memory_usage.c_str(), max_client_memory_usage.length());
total_memory_tracker.setHardLimit(max_client_memory_usage_int);
total_memory_tracker.setDescription("(total)");
total_memory_tracker.setMetric(CurrentMetrics::MemoryTracking);
}

View File

@ -587,6 +587,8 @@
M(705, TABLE_NOT_EMPTY) \
M(706, LIBSSH_ERROR) \
M(707, GCP_ERROR) \
M(708, ILLEGAL_STATISTIC) \
\
M(999, KEEPER_EXCEPTION) \
M(1000, POCO_EXCEPTION) \
M(1001, STD_EXCEPTION) \

View File

@ -1,4 +1,5 @@
#include <Common/LockMemoryExceptionInThread.h>
#include <base/defines.h>
/// LockMemoryExceptionInThread
thread_local uint64_t LockMemoryExceptionInThread::counter = 0;
@ -18,3 +19,19 @@ LockMemoryExceptionInThread::~LockMemoryExceptionInThread()
level = previous_level;
block_fault_injections = previous_block_fault_injections;
}
void LockMemoryExceptionInThread::addUniqueLock(VariableContext level_, bool block_fault_injections_)
{
chassert(counter == 0);
counter = 1;
level = level_;
block_fault_injections = block_fault_injections_;
}
void LockMemoryExceptionInThread::removeUniqueLock()
{
chassert(counter == 1);
counter = 0;
level = VariableContext::Global;
block_fault_injections = false;
}

View File

@ -33,6 +33,9 @@ public:
LockMemoryExceptionInThread(const LockMemoryExceptionInThread &) = delete;
LockMemoryExceptionInThread & operator=(const LockMemoryExceptionInThread &) = delete;
static void addUniqueLock(VariableContext level_ = VariableContext::User, bool block_fault_injections_ = true);
static void removeUniqueLock();
static bool isBlocked(VariableContext current_level, bool fault_injection)
{
return counter > 0 && current_level >= level && (!fault_injection || block_fault_injections);

View File

@ -516,7 +516,7 @@ public:
if (record.header.version > CURRENT_CHANGELOG_VERSION)
throw Exception(
ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported changelog version {} on path {}", record.header.version, filepath);
ErrorCodes::UNKNOWN_FORMAT_VERSION, "Unsupported changelog version {} on path {}", static_cast<uint8_t>(record.header.version), filepath);
/// Read data
if (record.header.blob_size != 0)
@ -1480,4 +1480,9 @@ void Changelog::setRaftServer(const nuraft::ptr<nuraft::raft_server> & raft_serv
raft_server = raft_server_;
}
bool Changelog::isInitialized() const
{
return initialized;
}
}

View File

@ -153,6 +153,8 @@ public:
void setRaftServer(const nuraft::ptr<nuraft::raft_server> & raft_server_);
bool isInitialized() const;
/// Fsync log to disk
~Changelog();

View File

@ -127,7 +127,8 @@ void KeeperLogStore::shutdownChangelog()
bool KeeperLogStore::flushChangelogAndShutdown()
{
std::lock_guard lock(changelog_lock);
changelog.flush();
if (changelog.isInitialized())
changelog.flush();
changelog.shutdown();
return true;
}

View File

@ -329,6 +329,20 @@ void KeeperServer::launchRaftServer(const Poco::Util::AbstractConfiguration & co
params.return_method_ = nuraft::raft_params::async_handler;
nuraft::asio_service::options asio_opts{};
/// If asio worker threads fail in any way, NuRaft will stop to make any progress
/// For that reason we need to suppress out of memory exceptions in such threads
/// TODO: use `get_active_workers` to detect when we have no active workers to abort
asio_opts.worker_start_ = [](uint32_t /*worker_id*/)
{
LockMemoryExceptionInThread::addUniqueLock(VariableContext::Global);
};
asio_opts.worker_stop_ = [](uint32_t /*worker_id*/)
{
LockMemoryExceptionInThread::removeUniqueLock();
};
if (state_manager->isSecure())
{
#if USE_SSL

View File

@ -779,7 +779,7 @@ void KeeperSnapshotManager::removeSnapshot(uint64_t log_idx)
if (itr == existing_snapshots.end())
throw Exception(ErrorCodes::UNKNOWN_SNAPSHOT, "Unknown snapshot with log index {}", log_idx);
const auto & [path, disk] = itr->second;
disk->removeFile(path);
disk->removeFileIfExists(path);
existing_snapshots.erase(itr);
}
@ -809,8 +809,16 @@ SnapshotFileInfo KeeperSnapshotManager::serializeSnapshotToDisk(const KeeperStor
disk->removeFile(tmp_snapshot_file_name);
existing_snapshots.emplace(up_to_log_idx, SnapshotFileInfo{snapshot_file_name, disk});
removeOutdatedSnapshotsIfNeeded();
moveSnapshotsIfNeeded();
try
{
removeOutdatedSnapshotsIfNeeded();
moveSnapshotsIfNeeded();
}
catch (...)
{
tryLogCurrentException(log, "Failed to cleanup and/or move older snapshots");
}
return {snapshot_file_name, disk};
}

View File

@ -139,6 +139,9 @@ class IColumn;
M(Bool, enable_multiple_prewhere_read_steps, true, "Move more conditions from WHERE to PREWHERE and do reads from disk and filtering in multiple steps if there are multiple conditions combined with AND", 0) \
M(Bool, move_primary_key_columns_to_end_of_prewhere, true, "Move PREWHERE conditions containing primary key columns to the end of AND chain. It is likely that these conditions are taken into account during primary key analysis and thus will not contribute a lot to PREWHERE filtering.", 0) \
\
M(Bool, allow_statistic_optimize, false, "Allows using statistic to optimize queries", 0) \
M(Bool, allow_experimental_statistic, false, "Allows using statistic", 0) \
\
M(UInt64, alter_sync, 1, "Wait for actions to manipulate the partitions. 0 - do not wait, 1 - wait for execution only of itself, 2 - wait for everyone.", 0) ALIAS(replication_alter_partitions_sync) \
M(Int64, replication_wait_for_inactive_replica_timeout, 120, "Wait for inactive replica to execute ALTER/OPTIMIZE. Time in seconds, 0 - do not wait, negative - wait for unlimited time.", 0) \
M(Bool, alter_move_to_space_execute_async, false, "Execute ALTER TABLE MOVE ... TO [DISK|VOLUME] asynchronously", 0) \
@ -593,7 +596,7 @@ class IColumn;
M(Bool, allow_experimental_database_materialized_mysql, false, "Allow to create database with Engine=MaterializedMySQL(...).", 0) \
M(Bool, allow_experimental_database_materialized_postgresql, false, "Allow to create database with Engine=MaterializedPostgreSQL(...).", 0) \
M(Bool, system_events_show_zero_values, false, "When querying system.events or system.metrics tables, include all metrics, even with zero values.", 0) \
M(MySQLDataTypesSupport, mysql_datatypes_support_level, 0, "Which MySQL types should be converted to corresponding ClickHouse types (rather than being represented as String). Can be empty or any combination of 'decimal', 'datetime64', 'date2Date32' or 'date2String'. When empty MySQL's DECIMAL and DATETIME/TIMESTAMP with non-zero precision are seen as String on ClickHouse's side.", 0) \
M(MySQLDataTypesSupport, mysql_datatypes_support_level, MySQLDataTypesSupportList{}, "Which MySQL types should be converted to corresponding ClickHouse types (rather than being represented as String). Can be empty or any combination of 'decimal', 'datetime64', 'date2Date32' or 'date2String'. When empty MySQL's DECIMAL and DATETIME/TIMESTAMP with non-zero precision are seen as String on ClickHouse's side.", 0) \
M(Bool, optimize_trivial_insert_select, true, "Optimize trivial 'INSERT INTO table SELECT ... FROM TABLES' query", 0) \
M(Bool, allow_non_metadata_alters, true, "Allow to execute alters which affects not only tables metadata, but also data on disk", 0) \
M(Bool, enable_global_with_statement, true, "Propagate WITH statements to UNION queries and all subqueries", 0) \
@ -824,7 +827,6 @@ class IColumn;
M(UInt64, grace_hash_join_initial_buckets, 1, "Initial number of grace hash join buckets", 0) \
M(UInt64, grace_hash_join_max_buckets, 1024, "Limit on the number of grace hash join buckets", 0) \
M(Bool, optimize_distinct_in_order, true, "Enable DISTINCT optimization if some columns in DISTINCT form a prefix of sorting. For example, prefix of sorting key in merge tree or ORDER BY statement", 0) \
M(Bool, allow_experimental_undrop_table_query, true, "Allow to use undrop query to restore dropped table in a limited time", 0) \
M(Bool, keeper_map_strict_mode, false, "Enforce additional checks during operations on KeeperMap. E.g. throw an exception on an insert for already existing key", 0) \
M(UInt64, extract_kvp_max_pairs_per_row, 1000, "Max number pairs that can be produced by extractKeyValuePairs function. Used to safeguard against consuming too much memory.", 0) \
M(Timezone, session_timezone, "", "This setting can be removed in the future due to potential caveats. It is experimental and is not suitable for production usage. The default timezone for current session or query. The server default timezone if empty.", 0) \
@ -895,6 +897,7 @@ class IColumn;
MAKE_OBSOLETE(M, Bool, query_plan_optimize_projection, true) \
MAKE_OBSOLETE(M, Bool, query_cache_store_results_of_queries_with_nondeterministic_functions, false) \
MAKE_OBSOLETE(M, Bool, optimize_move_functions_out_of_any, false) \
MAKE_OBSOLETE(M, Bool, allow_experimental_undrop_table_query, true) \
/** The section above is for obsolete settings. Do not add anything there. */

View File

@ -459,22 +459,25 @@ template <typename Enum, typename Traits>
struct SettingFieldMultiEnum
{
using EnumType = Enum;
using ValueType = MultiEnum<Enum>;
using StorageType = typename ValueType::StorageType;
using ValueType = std::vector<Enum>;
ValueType value;
bool changed = false;
explicit SettingFieldMultiEnum(ValueType v = ValueType{}) : value{v} {}
explicit SettingFieldMultiEnum(EnumType e) : value{e} {}
explicit SettingFieldMultiEnum(StorageType s) : value(s) {}
explicit SettingFieldMultiEnum(const Field & f) : value(parseValueFromString(f.safeGet<const String &>())) {}
operator ValueType() const { return value; } /// NOLINT
explicit operator StorageType() const { return value.getValue(); }
explicit operator Field() const { return toString(); }
operator MultiEnum<EnumType>() const /// NOLINT
{
MultiEnum<EnumType> res;
for (const auto & v : value)
res.set(v);
return res;
}
SettingFieldMultiEnum & operator= (StorageType x) { changed = true; value.setValue(x); return *this; }
SettingFieldMultiEnum & operator= (ValueType x) { changed = true; value = x; return *this; }
SettingFieldMultiEnum & operator= (const Field & x) { parseFromString(x.safeGet<const String &>()); return *this; }
@ -482,14 +485,10 @@ struct SettingFieldMultiEnum
{
static const String separator = ",";
String result;
for (StorageType i = 0; i < Traits::getEnumSize(); ++i)
for (const auto & v : value)
{
const auto v = static_cast<Enum>(i);
if (value.isSet(v))
{
result += Traits::toString(v);
result += separator;
}
result += Traits::toString(v);
result += separator;
}
if (!result.empty())
@ -508,6 +507,7 @@ private:
static const String separators=", ";
ValueType result;
std::unordered_set<EnumType> values_set;
//to avoid allocating memory on substr()
const std::string_view str_view{str};
@ -519,7 +519,12 @@ private:
if (value_end == std::string::npos)
value_end = str_view.size();
result.set(Traits::fromString(str_view.substr(value_start, value_end - value_start)));
auto value = Traits::fromString(str_view.substr(value_start, value_end - value_start));
/// Deduplicate values
auto [_, inserted] = values_set.emplace(value);
if (inserted)
result.push_back(value);
value_start = str_view.find_first_not_of(separators, value_end);
}
@ -554,7 +559,8 @@ void SettingFieldMultiEnum<EnumT, Traits>::readBinary(ReadBuffer & in)
static EnumType fromString(std::string_view str); \
}; \
\
using SettingField##NEW_NAME = SettingFieldMultiEnum<ENUM_TYPE, SettingField##NEW_NAME##Traits>;
using SettingField##NEW_NAME = SettingFieldMultiEnum<ENUM_TYPE, SettingField##NEW_NAME##Traits>; \
using NEW_NAME##List = typename SettingField##NEW_NAME::ValueType;
/// NOLINTNEXTLINE
#define IMPLEMENT_SETTING_MULTI_ENUM(ENUM_TYPE, ERROR_CODE_FOR_UNEXPECTED_NAME, ...) \

View File

@ -27,16 +27,16 @@ bool operator== (const Field & f, const SettingFieldMultiEnum<Enum, Traits> & se
}
GTEST_TEST(MySQLDataTypesSupport, WithDefault)
GTEST_TEST(SettingMySQLDataTypesSupport, WithDefault)
{
// Setting can be default-initialized and that means all values are unset.
const SettingMySQLDataTypesSupport setting;
ASSERT_EQ(0, setting.value.getValue());
ASSERT_EQ(std::vector<MySQLDataTypesSupport>{}, setting.value);
ASSERT_EQ("", setting.toString());
ASSERT_EQ(setting, Field(""));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_FALSE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DATETIME64));
}
GTEST_TEST(SettingMySQLDataTypesSupport, WithDECIMAL)
@ -44,10 +44,10 @@ GTEST_TEST(SettingMySQLDataTypesSupport, WithDECIMAL)
// Setting can be initialized with MySQLDataTypesSupport::DECIMAL
// and this value can be obtained in varios forms with getters.
const SettingMySQLDataTypesSupport setting(MySQLDataTypesSupport::DECIMAL);
ASSERT_EQ(1, setting.value.getValue());
ASSERT_EQ(std::vector<MySQLDataTypesSupport>{MySQLDataTypesSupport::DECIMAL}, setting.value);
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_TRUE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
ASSERT_EQ(Field("decimal"), setting);
@ -57,95 +57,69 @@ GTEST_TEST(SettingMySQLDataTypesSupport, WithDATE)
{
SettingMySQLDataTypesSupport setting;
setting = String("date2Date32");
ASSERT_EQ(4, setting.value.getValue());
ASSERT_EQ(std::vector<MySQLDataTypesSupport>{MySQLDataTypesSupport::DATE2DATE32}, setting.value);
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DATE2DATE32));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_TRUE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DATE2DATE32));
ASSERT_FALSE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("date2Date32", setting.toString());
ASSERT_EQ(Field("date2Date32"), setting);
setting = String("date2String");
ASSERT_EQ(8, setting.value.getValue());
ASSERT_EQ(std::vector<MySQLDataTypesSupport>{MySQLDataTypesSupport::DATE2STRING}, setting.value);
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DATE2STRING));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATE2DATE32));
ASSERT_TRUE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DATE2STRING));
ASSERT_FALSE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DATE2DATE32));
ASSERT_EQ("date2String", setting.toString());
ASSERT_EQ(Field("date2String"), setting);
}
GTEST_TEST(SettingMySQLDataTypesSupport, With1)
{
// Setting can be initialized with int value corresponding to DECIMAL
// and rest of the test is the same as for that value.
const SettingMySQLDataTypesSupport setting(1u);
ASSERT_EQ(1, setting.value.getValue());
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
ASSERT_EQ(Field("decimal"), setting);
}
GTEST_TEST(SettingMySQLDataTypesSupport, WithMultipleValues)
{
// Setting can be initialized with int value corresponding to (DECIMAL | DATETIME64)
const SettingMySQLDataTypesSupport setting(3u);
ASSERT_EQ(3, setting.value.getValue());
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal,datetime64", setting.toString());
ASSERT_EQ(Field("decimal,datetime64"), setting);
}
GTEST_TEST(SettingMySQLDataTypesSupport, SetString)
{
SettingMySQLDataTypesSupport setting;
setting = String("decimal");
ASSERT_TRUE(setting.changed);
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_TRUE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
ASSERT_EQ(Field("decimal"), setting);
setting = "datetime64,decimal";
ASSERT_TRUE(setting.changed);
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal,datetime64", setting.toString());
ASSERT_EQ(Field("decimal,datetime64"), setting);
ASSERT_TRUE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_TRUE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("datetime64,decimal", setting.toString());
ASSERT_EQ(Field("datetime64,decimal"), setting);
// comma with spaces
setting = " datetime64 , decimal "; /// bad punctuation is ok here
ASSERT_TRUE(setting.changed);
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal,datetime64", setting.toString());
ASSERT_EQ(Field("decimal,datetime64"), setting);
ASSERT_TRUE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_TRUE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("datetime64,decimal", setting.toString());
ASSERT_EQ(Field("datetime64,decimal"), setting);
setting = String(",,,,,,,, ,decimal");
ASSERT_TRUE(setting.changed);
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_TRUE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
ASSERT_EQ(Field("decimal"), setting);
setting = String(",decimal,decimal,decimal,decimal,decimal,decimal,decimal,decimal,decimal,");
ASSERT_TRUE(setting.changed); //since previous value was DECIMAL
ASSERT_TRUE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_TRUE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("decimal", setting.toString());
ASSERT_EQ(Field("decimal"), setting);
setting = String("");
ASSERT_TRUE(setting.changed);
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(setting.value.isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_FALSE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DECIMAL));
ASSERT_FALSE(MultiEnum<MySQLDataTypesSupport>(setting).isSet(MySQLDataTypesSupport::DATETIME64));
ASSERT_EQ("", setting.toString());
ASSERT_EQ(Field(""), setting);
}
@ -156,13 +130,13 @@ GTEST_TEST(SettingMySQLDataTypesSupport, SetInvalidString)
SettingMySQLDataTypesSupport setting;
EXPECT_THROW(setting = String("FOOBAR"), Exception);
ASSERT_FALSE(setting.changed);
ASSERT_EQ(0, setting.value.getValue());
ASSERT_EQ(std::vector<MySQLDataTypesSupport>{}, setting.value);
EXPECT_THROW(setting = String("decimal,datetime64,123"), Exception);
ASSERT_FALSE(setting.changed);
ASSERT_EQ(0, setting.value.getValue());
ASSERT_EQ(std::vector<MySQLDataTypesSupport>{}, setting.value);
EXPECT_NO_THROW(setting = String(", "));
ASSERT_TRUE(setting.changed);
ASSERT_EQ(0, setting.value.getValue());
ASSERT_EQ(std::vector<MySQLDataTypesSupport>{}, setting.value);
}

View File

@ -60,22 +60,22 @@ void MetadataStorageFromStaticFilesWebServer::assertExists(const std::string & p
bool MetadataStorageFromStaticFilesWebServer::isFile(const std::string & path) const
{
assertExists(path);
std::shared_lock shared_lock(object_storage.metadata_mutex);
return object_storage.files.at(path).type == WebObjectStorage::FileType::File;
auto file_info = object_storage.getFileInfo(path);
return file_info.type == WebObjectStorage::FileType::File;
}
bool MetadataStorageFromStaticFilesWebServer::isDirectory(const std::string & path) const
{
assertExists(path);
std::shared_lock shared_lock(object_storage.metadata_mutex);
return object_storage.files.at(path).type == WebObjectStorage::FileType::Directory;
auto file_info = object_storage.getFileInfo(path);
return file_info.type == WebObjectStorage::FileType::Directory;
}
uint64_t MetadataStorageFromStaticFilesWebServer::getFileSize(const String & path) const
{
assertExists(path);
std::shared_lock shared_lock(object_storage.metadata_mutex);
return object_storage.files.at(path).size;
auto file_info = object_storage.getFileInfo(path);
return file_info.size;
}
StoredObjects MetadataStorageFromStaticFilesWebServer::getStorageObjects(const std::string & path) const
@ -86,8 +86,8 @@ StoredObjects MetadataStorageFromStaticFilesWebServer::getStorageObjects(const s
std::string remote_path = fs_path.parent_path() / (escapeForFileName(fs_path.stem()) + fs_path.extension().string());
remote_path = remote_path.substr(object_storage.url.size());
std::shared_lock shared_lock(object_storage.metadata_mutex);
return {StoredObject(remote_path, path, object_storage.files.at(path).size)};
auto file_info = object_storage.getFileInfo(path);
return {StoredObject(remote_path, path, file_info.size)};
}
std::vector<std::string> MetadataStorageFromStaticFilesWebServer::listDirectory(const std::string & path) const

View File

@ -28,6 +28,7 @@ namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
extern const int NOT_IMPLEMENTED;
extern const int FILE_DOESNT_EXIST;
}
void WebObjectStorage::initialize(const String & uri_path, const std::unique_lock<std::shared_mutex> & lock) const
@ -124,7 +125,19 @@ bool WebObjectStorage::exists(const StoredObject & object) const
bool WebObjectStorage::exists(const std::string & path) const
{
LOG_TRACE(&Poco::Logger::get("DiskWeb"), "Checking existence of path: {}", path);
return tryGetFileInfo(path) != std::nullopt;
}
WebObjectStorage::FileData WebObjectStorage::getFileInfo(const String & path) const
{
auto file_info = tryGetFileInfo(path);
if (!file_info)
throw Exception(ErrorCodes::FILE_DOESNT_EXIST, "No such file: {}", path);
return file_info.value();
}
std::optional<WebObjectStorage::FileData> WebObjectStorage::tryGetFileInfo(const String & path) const
{
std::shared_lock shared_lock(metadata_mutex);
if (files.find(path) == files.end())
@ -145,10 +158,10 @@ bool WebObjectStorage::exists(const std::string & path) const
}
if (files.empty())
return false;
return std::nullopt;
if (files.contains(path))
return true;
if (auto it = files.find(path); it != files.end())
return it->second;
/// `object_storage.files` contains files + directories only inside `metadata_path / uuid_3_digit / uuid /`
/// (specific table files only), but we need to be able to also tell if `exists(<metadata_path>)`, for example.
@ -158,7 +171,7 @@ bool WebObjectStorage::exists(const std::string & path) const
);
if (it == files.end())
return false;
return std::nullopt;
if (startsWith(it->first, path)
|| (it != files.begin() && startsWith(std::prev(it)->first, path)))
@ -166,20 +179,15 @@ bool WebObjectStorage::exists(const std::string & path) const
shared_lock.unlock();
std::unique_lock unique_lock(metadata_mutex);
/// The code relies on invariant that if this function returned true
/// the file exists in files.
/// In this case we have a directory which doesn't explicitly exists (like store/xxx/yyy)
/// ^^^^^
/// Adding it to the files
/// Add this directory path not files cache to simplify further checks for this path.
files.emplace(std::make_pair(path, FileData({.type = FileType::Directory})));
unique_lock.unlock();
shared_lock.lock();
return true;
return FileData{ .type = FileType::Directory };
}
return false;
return std::nullopt;
}
std::unique_ptr<ReadBufferFromFileBase> WebObjectStorage::readObjects( /// NOLINT

View File

@ -118,6 +118,9 @@ protected:
mutable Files files;
mutable std::shared_mutex metadata_mutex;
std::optional<FileData> tryGetFileInfo(const String & path) const;
FileData getFileInfo(const String & path) const;
private:
void initialize(const String & path, const std::unique_lock<std::shared_mutex> &) const;

View File

@ -293,7 +293,7 @@ InputFormatPtr FormatFactory::getInput(
// Decide whether to use ParallelParsingInputFormat.
bool parallel_parsing =
max_parsing_threads > 1 && settings.input_format_parallel_parsing && creators.file_segmentation_engine &&
max_parsing_threads > 1 && settings.input_format_parallel_parsing && creators.file_segmentation_engine_creator &&
!creators.random_access_input_creator && !need_only_count;
if (settings.max_memory_usage && settings.min_chunk_bytes_for_parallel_parsing * max_parsing_threads * 2 > settings.max_memory_usage)
@ -323,7 +323,7 @@ InputFormatPtr FormatFactory::getInput(
{ return input_getter(input, sample, row_input_format_params, format_settings); };
ParallelParsingInputFormat::Params params{
buf, sample, parser_creator, creators.file_segmentation_engine, name, max_parsing_threads,
buf, sample, parser_creator, creators.file_segmentation_engine_creator, name, format_settings, max_parsing_threads,
settings.min_chunk_bytes_for_parallel_parsing, max_block_size, context->getApplicationType() == Context::ApplicationType::SERVER};
format = std::make_shared<ParallelParsingInputFormat>(params);
@ -669,10 +669,22 @@ String FormatFactory::getFormatFromFileDescriptor(int fd)
void FormatFactory::registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine)
{
auto & target = dict[name].file_segmentation_engine;
auto & target = dict[name].file_segmentation_engine_creator;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: File segmentation engine {} is already registered", name);
target = std::move(file_segmentation_engine);
auto creator = [file_segmentation_engine](const FormatSettings &)
{
return file_segmentation_engine;
};
target = std::move(creator);
}
void FormatFactory::registerFileSegmentationEngineCreator(const String & name, FileSegmentationEngineCreator file_segmentation_engine_creator)
{
auto & target = dict[name].file_segmentation_engine_creator;
if (target)
throw Exception(ErrorCodes::LOGICAL_ERROR, "FormatFactory: File segmentation engine creator {} is already registered", name);
target = std::move(file_segmentation_engine_creator);
}
void FormatFactory::registerSchemaReader(const String & name, SchemaReaderCreator schema_reader_creator)

View File

@ -71,6 +71,9 @@ public:
size_t min_bytes,
size_t max_rows)>;
using FileSegmentationEngineCreator = std::function<FileSegmentationEngine(
const FormatSettings & settings)>;
private:
// On the input side, there are two kinds of formats:
// * InputCreator - formats parsed sequentially, e.g. CSV. Almost all formats are like this.
@ -132,7 +135,7 @@ private:
InputCreator input_creator;
RandomAccessInputCreator random_access_input_creator;
OutputCreator output_creator;
FileSegmentationEngine file_segmentation_engine;
FileSegmentationEngineCreator file_segmentation_engine_creator;
SchemaReaderCreator schema_reader_creator;
ExternalSchemaReaderCreator external_schema_reader_creator;
bool supports_parallel_formatting{false};
@ -203,6 +206,8 @@ public:
void registerFileSegmentationEngine(const String & name, FileSegmentationEngine file_segmentation_engine);
void registerFileSegmentationEngineCreator(const String & name, FileSegmentationEngineCreator file_segmentation_engine_creator);
void registerNonTrivialPrefixAndSuffixChecker(const String & name, NonTrivialPrefixAndSuffixChecker non_trivial_prefix_and_suffix_checker);
void registerAppendSupportChecker(const String & name, AppendSupportChecker append_support_checker);

View File

@ -225,10 +225,10 @@ private:
if constexpr (is_decimal<T>)
{
const auto * from_col = checkAndGetColumn<ColumnDecimal<T>>(arguments[0].column.get());
UInt8 from_scale = from_col->getScale();
if (from_col)
{
UInt8 from_scale = from_col->getScale();
if (precision_col_const)
vectorConstant(from_col->getData(), precision_col_const->template getValue<UInt8>(), result_chars, result_offsets, from_scale);
else if (precision_col)

View File

@ -26,7 +26,7 @@ struct URIConverter
static void modifyURI(Poco::URI & uri, std::unordered_map<std::string, std::string> mapper)
{
Macros macros({{"bucket", uri.getHost()}});
uri = macros.expand(mapper[uri.getScheme()]).empty()? uri : Poco::URI(macros.expand(mapper[uri.getScheme()]) + "/" + uri.getPathAndQuery());
uri = macros.expand(mapper[uri.getScheme()]).empty() ? uri : Poco::URI(macros.expand(mapper[uri.getScheme()]) + uri.getPathAndQuery());
}
};

View File

@ -52,6 +52,7 @@ public:
size_t getTotalByteCount() const override;
bool alwaysReturnsEmptySet() const override;
bool supportParallelJoin() const override { return true; }
IBlocksStreamPtr
getNonJoinedBlocks(const Block & left_sample_block, const Block & result_sample_block, UInt64 max_block_size) const override;

View File

@ -587,6 +587,28 @@ struct ContextSharedPart : boost::noncopyable
for (const auto & [_, cache] : caches)
cache->cache->deactivateBackgroundOperations();
{
// Disk selector might not be initialized if there was some error during
// its initialization. Don't try to initialize it again on shutdown.
if (merge_tree_disk_selector)
{
for (const auto & [disk_name, disk] : merge_tree_disk_selector->getDisksMap())
{
LOG_INFO(log, "Shutdown disk {}", disk_name);
disk->shutdown();
}
}
/// Special volumes might also use disks that require shutdown.
auto & tmp_data = root_temp_data_on_disk;
if (tmp_data && tmp_data->getVolume())
{
auto & disks = tmp_data->getVolume()->getDisks();
for (auto & disk : disks)
disk->shutdown();
}
}
{
std::lock_guard lock(mutex);
@ -4066,26 +4088,6 @@ void Context::stopServers(const ServerType & server_type) const
void Context::shutdown() TSA_NO_THREAD_SAFETY_ANALYSIS
{
// Disk selector might not be initialized if there was some error during
// its initialization. Don't try to initialize it again on shutdown.
if (shared->merge_tree_disk_selector)
{
for (auto & [disk_name, disk] : getDisksMap())
{
LOG_INFO(shared->log, "Shutdown disk {}", disk_name);
disk->shutdown();
}
}
/// Special volumes might also use disks that require shutdown.
auto & tmp_data = shared->root_temp_data_on_disk;
if (tmp_data && tmp_data->getVolume())
{
auto & disks = tmp_data->getVolume()->getDisks();
for (auto & disk : disks)
disk->shutdown();
}
shared->shutdown();
}

View File

@ -215,7 +215,7 @@ ContextMutablePtr DDLTaskBase::makeQueryContext(ContextPtr from_context, const Z
}
bool DDLTask::findCurrentHostID(ContextPtr global_context, Poco::Logger * log)
bool DDLTask::findCurrentHostID(ContextPtr global_context, Poco::Logger * log, const ZooKeeperPtr & zookeeper)
{
bool host_in_hostlist = false;
std::exception_ptr first_exception = nullptr;
@ -262,6 +262,22 @@ bool DDLTask::findCurrentHostID(ContextPtr global_context, Poco::Logger * log)
if (!host_in_hostlist && first_exception)
{
if (zookeeper->exists(getFinishedNodePath()))
{
LOG_WARNING(log, "Failed to find current host ID, but assuming that {} is finished because {} exists. Skipping the task. Error: {}",
entry_name, getFinishedNodePath(), getExceptionMessage(first_exception, /*with_stacktrace*/ true));
return false;
}
size_t finished_nodes_count = zookeeper->getChildren(fs::path(entry_path) / "finished").size();
if (entry.hosts.size() == finished_nodes_count)
{
LOG_WARNING(log, "Failed to find current host ID, but assuming that {} is finished because the number of finished nodes ({}) "
"equals to the number of hosts in list. Skipping the task. Error: {}",
entry_name, finished_nodes_count, getExceptionMessage(first_exception, /*with_stacktrace*/ true));
return false;
}
/// We don't know for sure if we should process task or not
std::rethrow_exception(first_exception);
}

View File

@ -143,7 +143,7 @@ struct DDLTask : public DDLTaskBase
{
DDLTask(const String & name, const String & path) : DDLTaskBase(name, path) {}
bool findCurrentHostID(ContextPtr global_context, Poco::Logger * log);
bool findCurrentHostID(ContextPtr global_context, Poco::Logger * log, const ZooKeeperPtr & zookeeper);
void setClusterInfo(ContextPtr context, Poco::Logger * log);

View File

@ -214,7 +214,7 @@ DDLTaskPtr DDLWorker::initAndCheckTask(const String & entry_name, String & out_r
/// Stage 2: resolve host_id and check if we should execute query or not
/// Multiple clusters can use single DDL queue path in ZooKeeper,
/// So we should skip task if we cannot find current host in cluster hosts list.
if (!task->findCurrentHostID(context, log))
if (!task->findCurrentHostID(context, log, zookeeper))
{
out_reason = "There is no a local address in host list";
return add_to_skip_set();

View File

@ -944,18 +944,16 @@ JoinPtr SelectQueryExpressionAnalyzer::appendJoin(
std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block);
static std::shared_ptr<IJoin> chooseJoinAlgorithm(
std::shared_ptr<TableJoin> analyzed_join, const ColumnsWithTypeAndName & left_sample_columns, std::unique_ptr<QueryPlan> & joined_plan, ContextPtr context)
static std::shared_ptr<IJoin> tryCreateJoin(
JoinAlgorithm algorithm,
std::shared_ptr<TableJoin> analyzed_join,
const ColumnsWithTypeAndName & left_sample_columns,
const Block & right_sample_block,
std::unique_ptr<QueryPlan> & joined_plan,
ContextPtr context)
{
const auto & settings = context->getSettings();
Block right_sample_block = joined_plan->getCurrentDataStream().header;
std::vector<String> tried_algorithms;
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::DIRECT))
if (algorithm == JoinAlgorithm::DIRECT || algorithm == JoinAlgorithm::DEFAULT)
{
tried_algorithms.push_back(toString(JoinAlgorithm::DIRECT));
JoinPtr direct_join = tryKeyValueJoin(analyzed_join, right_sample_block);
if (direct_join)
{
@ -965,54 +963,63 @@ static std::shared_ptr<IJoin> chooseJoinAlgorithm(
}
}
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::PARTIAL_MERGE) ||
analyzed_join->isEnabledAlgorithm(JoinAlgorithm::PREFER_PARTIAL_MERGE))
if (algorithm == JoinAlgorithm::PARTIAL_MERGE ||
algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE)
{
tried_algorithms.push_back(toString(JoinAlgorithm::PARTIAL_MERGE));
if (MergeJoin::isSupported(analyzed_join))
return std::make_shared<MergeJoin>(analyzed_join, right_sample_block);
}
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::HASH) ||
if (algorithm == JoinAlgorithm::HASH ||
/// partial_merge is preferred, but can't be used for specified kind of join, fallback to hash
analyzed_join->isEnabledAlgorithm(JoinAlgorithm::PREFER_PARTIAL_MERGE) ||
analyzed_join->isEnabledAlgorithm(JoinAlgorithm::PARALLEL_HASH))
algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE ||
algorithm == JoinAlgorithm::PARALLEL_HASH ||
algorithm == JoinAlgorithm::DEFAULT)
{
tried_algorithms.push_back(toString(JoinAlgorithm::HASH));
const auto & settings = context->getSettings();
if (analyzed_join->allowParallelHashJoin())
return std::make_shared<ConcurrentHashJoin>(context, analyzed_join, settings.max_threads, right_sample_block);
return std::make_shared<HashJoin>(analyzed_join, right_sample_block);
}
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::FULL_SORTING_MERGE))
if (algorithm == JoinAlgorithm::FULL_SORTING_MERGE)
{
tried_algorithms.push_back(toString(JoinAlgorithm::FULL_SORTING_MERGE));
if (FullSortingMergeJoin::isSupported(analyzed_join))
return std::make_shared<FullSortingMergeJoin>(analyzed_join, right_sample_block);
}
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::GRACE_HASH))
if (algorithm == JoinAlgorithm::GRACE_HASH)
{
tried_algorithms.push_back(toString(JoinAlgorithm::GRACE_HASH));
// Grace hash join requires that columns exist in left_sample_block.
Block left_sample_block(left_sample_columns);
if (sanitizeBlock(left_sample_block, false) && GraceHashJoin::isSupported(analyzed_join))
return std::make_shared<GraceHashJoin>(context, analyzed_join, left_sample_block, right_sample_block, context->getTempDataOnDisk());
}
if (analyzed_join->isEnabledAlgorithm(JoinAlgorithm::AUTO))
if (algorithm == JoinAlgorithm::AUTO)
{
tried_algorithms.push_back(toString(JoinAlgorithm::AUTO));
if (MergeJoin::isSupported(analyzed_join))
return std::make_shared<JoinSwitcher>(analyzed_join, right_sample_block);
return std::make_shared<HashJoin>(analyzed_join, right_sample_block);
}
return nullptr;
}
static std::shared_ptr<IJoin> chooseJoinAlgorithm(
std::shared_ptr<TableJoin> analyzed_join, const ColumnsWithTypeAndName & left_sample_columns, std::unique_ptr<QueryPlan> & joined_plan, ContextPtr context)
{
Block right_sample_block = joined_plan->getCurrentDataStream().header;
const auto & join_algorithms = analyzed_join->getEnabledJoinAlgorithms();
for (const auto alg : join_algorithms)
{
auto join = tryCreateJoin(alg, analyzed_join, left_sample_columns, right_sample_block, joined_plan, context);
if (join)
return join;
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED,
"Can't execute {} join algorithm for this strictness/kind and right storage type",
fmt::join(tried_algorithms, " or "));
"Can't execute any of specified join algorithms for this strictness/kind and right storage type");
}
static std::unique_ptr<QueryPlan> buildJoinedPlan(
@ -1070,9 +1077,6 @@ static std::unique_ptr<QueryPlan> buildJoinedPlan(
std::shared_ptr<DirectKeyValueJoin> tryKeyValueJoin(std::shared_ptr<TableJoin> analyzed_join, const Block & right_sample_block)
{
if (!analyzed_join->isEnabledAlgorithm(JoinAlgorithm::DIRECT))
return nullptr;
auto storage = analyzed_join->getStorageKeyValue();
if (!storage)
return nullptr;

View File

@ -155,6 +155,11 @@ BlockIO InterpreterAlterQuery::executeToTable(const ASTAlterQuery & alter)
}
else
throw Exception(ErrorCodes::LOGICAL_ERROR, "Wrong parameter type in ALTER query");
if (!getContext()->getSettings().allow_experimental_statistic && (
command_ast->type == ASTAlterCommand::ADD_STATISTIC ||
command_ast->type == ASTAlterCommand::DROP_STATISTIC ||
command_ast->type == ASTAlterCommand::MATERIALIZE_STATISTIC))
throw Exception(ErrorCodes::INCORRECT_QUERY, "Alter table with statistic is now disabled. Turn on allow_experimental_statistic");
}
if (typeid_cast<DatabaseReplicated *>(database.get()))
@ -318,6 +323,21 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
required_access.emplace_back(AccessType::ALTER_SAMPLE_BY, database, table);
break;
}
case ASTAlterCommand::ADD_STATISTIC:
{
required_access.emplace_back(AccessType::ALTER_ADD_STATISTIC, database, table);
break;
}
case ASTAlterCommand::DROP_STATISTIC:
{
required_access.emplace_back(AccessType::ALTER_DROP_STATISTIC, database, table);
break;
}
case ASTAlterCommand::MATERIALIZE_STATISTIC:
{
required_access.emplace_back(AccessType::ALTER_MATERIALIZE_STATISTIC, database, table);
break;
}
case ASTAlterCommand::ADD_INDEX:
{
required_access.emplace_back(AccessType::ALTER_ADD_INDEX, database, table);

View File

@ -437,6 +437,12 @@ ASTPtr InterpreterCreateQuery::formatColumns(const ColumnsDescription & columns)
column_declaration->children.push_back(column_declaration->codec);
}
if (column.stat)
{
column_declaration->stat_type = column.stat->ast;
column_declaration->children.push_back(column_declaration->stat_type);
}
if (column.ttl)
{
column_declaration->ttl = column.ttl;
@ -639,6 +645,13 @@ ColumnsDescription InterpreterCreateQuery::getColumnsDescription(
col_decl.codec, column.type, sanity_check_compression_codecs, allow_experimental_codecs, enable_deflate_qpl_codec);
}
if (col_decl.stat_type)
{
if (!attach && !context_->getSettingsRef().allow_experimental_statistic)
throw Exception(ErrorCodes::INCORRECT_QUERY, "Create table with statistic is now disabled. Turn on allow_experimental_statistic");
column.stat = StatisticDescription::getStatisticFromColumnDeclaration(col_decl);
}
if (col_decl.ttl)
column.ttl = col_decl.ttl;

View File

@ -66,8 +66,10 @@ namespace
static void visit(ASTSelectQuery & select, ASTPtr & node, Data & data)
{
/// we need to read statistic when `allow_statistic_optimize` is enabled.
bool only_analyze = !data.getContext()->getSettings().allow_statistic_optimize;
InterpreterSelectQuery interpreter(
node, data.getContext(), SelectQueryOptions(QueryProcessingStage::FetchColumns).analyze().modify());
node, data.getContext(), SelectQueryOptions(QueryProcessingStage::FetchColumns).analyze(only_analyze).modify());
const SelectQueryInfo & query_info = interpreter.getQueryInfo();
if (query_info.view_query)

View File

@ -658,6 +658,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
MergeTreeWhereOptimizer where_optimizer{
std::move(column_compressed_sizes),
metadata_snapshot,
storage->getConditionEstimatorByPredicate(query_info, storage_snapshot, context),
queried_columns,
supported_prewhere_columns,
log};

View File

@ -23,11 +23,6 @@ InterpreterUndropQuery::InterpreterUndropQuery(const ASTPtr & query_ptr_, Contex
BlockIO InterpreterUndropQuery::execute()
{
if (!getContext()->getSettingsRef().allow_experimental_undrop_table_query)
throw Exception(ErrorCodes::SUPPORT_IS_DISABLED,
"Undrop table is experimental. "
"Set `allow_experimental_undrop_table_query` setting to enable it");
getContext()->checkAccess(AccessType::UNDROP_TABLE);
auto & undrop = query_ptr->as<ASTUndropQuery &>();
if (!undrop.cluster.empty() && !maybeRemoveOnCluster(query_ptr, getContext()))

View File

@ -308,6 +308,7 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
auto settings = context->getSettingsRef();
MultiEnum<JoinAlgorithm> join_algorithm = settings.join_algorithm;
bool try_use_direct_join = join_algorithm.isSet(JoinAlgorithm::DIRECT) || join_algorithm.isSet(JoinAlgorithm::DEFAULT);
auto table_join = std::make_shared<TableJoin>(settings, context->getGlobalTemporaryVolume());
const ASTTablesInSelectQueryElement * ast_join = select_query_.join();
@ -325,8 +326,8 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
table_join->setStorageJoin(storage_join);
}
if (auto storage_dict = std::dynamic_pointer_cast<StorageDictionary>(storage);
storage_dict && join_algorithm.isSet(JoinAlgorithm::DIRECT))
auto storage_dict = std::dynamic_pointer_cast<StorageDictionary>(storage);
if (storage_dict && try_use_direct_join && storage_dict->getDictionary()->getSpecialKeyType() != DictionarySpecialKeyType::Range)
{
FunctionDictHelper dictionary_helper(context);
@ -347,8 +348,7 @@ std::shared_ptr<TableJoin> JoinedTables::makeTableJoin(const ASTSelectQuery & se
table_join->setStorageJoin(dictionary_kv);
}
if (auto storage_kv = std::dynamic_pointer_cast<IKeyValueEntity>(storage);
storage_kv && join_algorithm.isSet(JoinAlgorithm::DIRECT))
if (auto storage_kv = std::dynamic_pointer_cast<IKeyValueEntity>(storage); storage_kv && try_use_direct_join)
{
table_join->setStorageJoin(storage_kv);
}

View File

@ -55,6 +55,7 @@ namespace ErrorCodes
extern const int CANNOT_UPDATE_COLUMN;
extern const int UNEXPECTED_EXPRESSION;
extern const int THERE_IS_NO_COLUMN;
extern const int ILLEGAL_STATISTIC;
}
@ -730,7 +731,7 @@ void MutationsInterpreter::prepare(bool dry_run)
}
else if (command.type == MutationCommand::MATERIALIZE_INDEX)
{
mutation_kind.set(MutationKind::MUTATE_INDEX_PROJECTION);
mutation_kind.set(MutationKind::MUTATE_INDEX_STATISTIC_PROJECTION);
auto it = std::find_if(
std::cbegin(indices_desc), std::end(indices_desc),
[&](const IndexDescription & index)
@ -750,9 +751,20 @@ void MutationsInterpreter::prepare(bool dry_run)
materialized_indices.emplace(command.index_name);
}
}
else if (command.type == MutationCommand::MATERIALIZE_STATISTIC)
{
mutation_kind.set(MutationKind::MUTATE_INDEX_STATISTIC_PROJECTION);
for (const auto & stat_column_name: command.statistic_columns)
{
if (!columns_desc.has(stat_column_name) || !columns_desc.get(stat_column_name).stat)
throw Exception(ErrorCodes::ILLEGAL_STATISTIC, "Unknown statistic column: {}", stat_column_name);
dependencies.emplace(stat_column_name, ColumnDependency::STATISTIC);
materialized_statistics.emplace(stat_column_name);
}
}
else if (command.type == MutationCommand::MATERIALIZE_PROJECTION)
{
mutation_kind.set(MutationKind::MUTATE_INDEX_PROJECTION);
mutation_kind.set(MutationKind::MUTATE_INDEX_STATISTIC_PROJECTION);
const auto & projection = projections_desc.get(command.projection_name);
if (!source.hasProjection(projection.name))
{
@ -763,12 +775,18 @@ void MutationsInterpreter::prepare(bool dry_run)
}
else if (command.type == MutationCommand::DROP_INDEX)
{
mutation_kind.set(MutationKind::MUTATE_INDEX_PROJECTION);
mutation_kind.set(MutationKind::MUTATE_INDEX_STATISTIC_PROJECTION);
materialized_indices.erase(command.index_name);
}
else if (command.type == MutationCommand::DROP_STATISTIC)
{
mutation_kind.set(MutationKind::MUTATE_INDEX_STATISTIC_PROJECTION);
for (const auto & stat_column_name: command.statistic_columns)
materialized_statistics.erase(stat_column_name);
}
else if (command.type == MutationCommand::DROP_PROJECTION)
{
mutation_kind.set(MutationKind::MUTATE_INDEX_PROJECTION);
mutation_kind.set(MutationKind::MUTATE_INDEX_STATISTIC_PROJECTION);
materialized_projections.erase(command.projection_name);
}
else if (command.type == MutationCommand::MATERIALIZE_TTL)
@ -818,7 +836,9 @@ void MutationsInterpreter::prepare(bool dry_run)
auto new_dependencies = metadata_snapshot->getColumnDependencies(new_updated_columns, true, has_dependency);
for (const auto & dependency : new_dependencies)
{
if (dependency.kind == ColumnDependency::SKIP_INDEX || dependency.kind == ColumnDependency::PROJECTION)
if (dependency.kind == ColumnDependency::SKIP_INDEX
|| dependency.kind == ColumnDependency::PROJECTION
|| dependency.kind == ColumnDependency::STATISTIC)
dependencies.insert(dependency);
}
}
@ -1358,7 +1378,7 @@ QueryPipelineBuilder MutationsInterpreter::execute()
Block MutationsInterpreter::getUpdatedHeader() const
{
// If it's an index/projection materialization, we don't write any data columns, thus empty header is used
return mutation_kind.mutation_kind == MutationKind::MUTATE_INDEX_PROJECTION ? Block{} : *updated_header;
return mutation_kind.mutation_kind == MutationKind::MUTATE_INDEX_STATISTIC_PROJECTION ? Block{} : *updated_header;
}
const ColumnDependencies & MutationsInterpreter::getColumnDependencies() const

View File

@ -91,6 +91,8 @@ public:
NameSet grabMaterializedIndices() { return std::move(materialized_indices); }
NameSet grabMaterializedStatistics() { return std::move(materialized_statistics); }
NameSet grabMaterializedProjections() { return std::move(materialized_projections); }
struct MutationKind
@ -98,7 +100,7 @@ public:
enum MutationKindEnum
{
MUTATE_UNKNOWN,
MUTATE_INDEX_PROJECTION,
MUTATE_INDEX_STATISTIC_PROJECTION,
MUTATE_OTHER,
} mutation_kind = MUTATE_UNKNOWN;
@ -214,6 +216,7 @@ private:
NameSet materialized_indices;
NameSet materialized_projections;
NameSet materialized_statistics;
MutationKind mutation_kind; /// Do we meet any index or projection mutation.

View File

@ -936,7 +936,9 @@ void TableJoin::resetToCross()
bool TableJoin::allowParallelHashJoin() const
{
if (!right_storage_name.empty() || !join_algorithm.isSet(JoinAlgorithm::PARALLEL_HASH))
if (std::find(join_algorithm.begin(), join_algorithm.end(), JoinAlgorithm::PARALLEL_HASH) == join_algorithm.end())
return false;
if (!right_storage_name.empty())
return false;
if (table_join.kind != JoinKind::Left && table_join.kind != JoinKind::Inner)
return false;

View File

@ -140,7 +140,7 @@ private:
const size_t default_max_bytes = 0;
const bool join_use_nulls = false;
const size_t max_joined_block_rows = 0;
MultiEnum<JoinAlgorithm> join_algorithm = MultiEnum<JoinAlgorithm>(JoinAlgorithm::AUTO);
std::vector<JoinAlgorithm> join_algorithm;
const size_t partial_merge_join_rows_in_right_blocks = 0;
const size_t partial_merge_join_left_table_buffer_bytes = 0;
const size_t max_files_to_merge = 0;
@ -236,7 +236,7 @@ public:
: size_limits(limits)
, default_max_bytes(0)
, join_use_nulls(use_nulls)
, join_algorithm(JoinAlgorithm::DEFAULT)
, join_algorithm({JoinAlgorithm::DEFAULT})
{
clauses.emplace_back().key_names_right = key_names_right;
table_join.kind = kind;
@ -253,16 +253,16 @@ public:
ActionsDAGPtr createJoinedBlockActions(ContextPtr context) const;
const std::vector<JoinAlgorithm> & getEnabledJoinAlgorithms() const { return join_algorithm; }
bool isEnabledAlgorithm(JoinAlgorithm val) const
{
/// When join_algorithm = 'default' (not specified by user) we use hash or direct algorithm.
/// It's behaviour that was initially supported by clickhouse.
bool is_enabled_by_default = val == JoinAlgorithm::DEFAULT
|| val == JoinAlgorithm::HASH
|| val == JoinAlgorithm::DIRECT;
if (join_algorithm.isSet(JoinAlgorithm::DEFAULT) && is_enabled_by_default)
bool is_default_enabled = std::find(join_algorithm.begin(), join_algorithm.end(), JoinAlgorithm::DEFAULT) != join_algorithm.end();
if (is_default_enabled && (val == JoinAlgorithm::DEFAULT || val == JoinAlgorithm::HASH || val == JoinAlgorithm::DIRECT))
return true;
return join_algorithm.isSet(val);
return std::find(join_algorithm.begin(), join_algorithm.end(), val) != join_algorithm.end();
}
bool allowParallelHashJoin() const;

View File

@ -14,6 +14,11 @@
#include <Core/Defines.h>
#include <Interpreters/Cache/WriteBufferToFileSegment.h>
namespace ProfileEvents
{
extern const Event ExternalProcessingFilesTotal;
}
namespace DB
{
@ -97,6 +102,8 @@ FileSegmentsHolderPtr TemporaryDataOnDisk::createCacheFile(size_t max_file_size)
if (!file_cache)
throw Exception(ErrorCodes::LOGICAL_ERROR, "TemporaryDataOnDiskScope has no cache");
ProfileEvents::increment(ProfileEvents::ExternalProcessingFilesTotal);
const auto key = FileSegment::Key::random();
auto holder = file_cache->set(key, 0, std::max(10_MiB, max_file_size), CreateFileSegmentSettings(FileSegmentKind::Temporary, /* unbounded */ true));
fs::create_directories(file_cache->getPathInLocalCache(key));
@ -120,7 +127,7 @@ TemporaryFileOnDiskHolder TemporaryDataOnDisk::createRegularFile(size_t max_file
{
disk = volume->getDisk();
}
/// We do not increment ProfileEvents::ExternalProcessingFilesTotal here because it is incremented in TemporaryFileOnDisk constructor.
return std::make_unique<TemporaryFileOnDisk>(disk, current_metric_scope);
}

View File

@ -201,6 +201,33 @@ void ASTAlterCommand::formatImpl(const FormatSettings & settings, FormatState &
partition->formatImpl(settings, state, frame);
}
}
else if (type == ASTAlterCommand::ADD_STATISTIC)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "ADD STATISTIC " << (if_not_exists ? "IF NOT EXISTS " : "")
<< (settings.hilite ? hilite_none : "");
statistic_decl->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::DROP_STATISTIC)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << (clear_statistic ? "CLEAR " : "DROP ") << "STATISTIC "
<< (if_exists ? "IF EXISTS " : "") << (settings.hilite ? hilite_none : "");
statistic_decl->formatImpl(settings, state, frame);
if (partition)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " IN PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
}
else if (type == ASTAlterCommand::MATERIALIZE_STATISTIC)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "MATERIALIZE STATISTIC " << (settings.hilite ? hilite_none : "");
statistic_decl->formatImpl(settings, state, frame);
if (partition)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << " IN PARTITION " << (settings.hilite ? hilite_none : "");
partition->formatImpl(settings, state, frame);
}
}
else if (type == ASTAlterCommand::ADD_CONSTRAINT)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "ADD CONSTRAINT " << (if_not_exists ? "IF NOT EXISTS " : "")

View File

@ -54,6 +54,10 @@ public:
DROP_PROJECTION,
MATERIALIZE_PROJECTION,
ADD_STATISTIC,
DROP_STATISTIC,
MATERIALIZE_STATISTIC,
DROP_PARTITION,
DROP_DETACHED_PARTITION,
ATTACH_PARTITION,
@ -129,6 +133,8 @@ public:
*/
ASTPtr projection;
ASTPtr statistic_decl;
/** Used in DROP PARTITION, ATTACH PARTITION FROM, UPDATE, DELETE queries.
* The value or ID of the partition is stored here.
*/
@ -167,6 +173,8 @@ public:
bool clear_index = false; /// for CLEAR INDEX (do not drop index from metadata)
bool clear_statistic = false; /// for CLEAR STATISTIC (do not drop statistic from metadata)
bool clear_projection = false; /// for CLEAR PROJECTION (do not drop projection from metadata)
bool if_not_exists = false; /// option for ADD_COLUMN

View File

@ -39,6 +39,12 @@ ASTPtr ASTColumnDeclaration::clone() const
res->children.push_back(res->codec);
}
if (stat_type)
{
res->stat_type = stat_type->clone();
res->children.push_back(res->stat_type);
}
if (ttl)
{
res->ttl = ttl->clone();
@ -99,6 +105,12 @@ void ASTColumnDeclaration::formatImpl(const FormatSettings & settings, FormatSta
codec->formatImpl(settings, state, frame);
}
if (stat_type)
{
settings.ostr << ' ';
stat_type->formatImpl(settings, state, frame);
}
if (ttl)
{
settings.ostr << ' ' << (settings.hilite ? hilite_keyword : "") << "TTL" << (settings.hilite ? hilite_none : "") << ' ';

View File

@ -19,6 +19,7 @@ public:
bool ephemeral_default = false;
ASTPtr comment;
ASTPtr codec;
ASTPtr stat_type;
ASTPtr ttl;
ASTPtr collation;
bool primary_key_specifier = false;

View File

@ -0,0 +1,42 @@
#include <Parsers/ASTStatisticDeclaration.h>
#include <Parsers/ASTIdentifier.h>
#include <Common/quoteString.h>
#include <IO/Operators.h>
#include <Parsers/ASTFunction.h>
namespace DB
{
ASTPtr ASTStatisticDeclaration::clone() const
{
auto res = std::make_shared<ASTStatisticDeclaration>();
res->set(res->columns, columns->clone());
res->type = type;
return res;
}
std::vector<String> ASTStatisticDeclaration::getColumnNames() const
{
std::vector<String> result;
result.reserve(columns->children.size());
for (const ASTPtr & column_ast : columns->children)
{
result.push_back(column_ast->as<ASTIdentifier &>().name());
}
return result;
}
void ASTStatisticDeclaration::formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const
{
columns->formatImpl(s, state, frame);
s.ostr << (s.hilite ? hilite_keyword : "") << " TYPE " << (s.hilite ? hilite_none : "");
s.ostr << backQuoteIfNeed(type);
}
}

View File

@ -0,0 +1,28 @@
#pragma once
#include <Parsers/IAST.h>
namespace DB
{
class ASTFunction;
/** name BY columns TYPE typename(args) in create query
*/
class ASTStatisticDeclaration : public IAST
{
public:
IAST * columns;
/// TODO type should be a list of ASTFunction, for example, 'tdigest(256), hyperloglog(128)', etc.
String type;
/** Get the text that identifies this element. */
String getID(char) const override { return "Stat"; }
std::vector<String> getColumnNames() const;
ASTPtr clone() const override;
void formatImpl(const FormatSettings & s, FormatState & state, FormatStateStacked frame) const override;
};
}

View File

@ -684,6 +684,33 @@ bool ParserCodec::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
return true;
}
bool ParserStatisticType::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserList stat_type_parser(std::make_unique<ParserIdentifierWithOptionalParameters>(),
std::make_unique<ParserToken>(TokenType::Comma), false);
if (pos->type != TokenType::OpeningRoundBracket)
return false;
ASTPtr stat_type;
++pos;
if (!stat_type_parser.parse(pos, stat_type, expected))
return false;
if (pos->type != TokenType::ClosingRoundBracket)
return false;
++pos;
auto function_node = std::make_shared<ASTFunction>();
function_node->name = "STATISTIC";
function_node->arguments = stat_type;
function_node->children.push_back(function_node->arguments);
node = function_node;
return true;
}
bool ParserCollation::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ASTPtr collation;

View File

@ -197,6 +197,14 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/// STATISTIC(tdigest(200))
class ParserStatisticType : public IParserBase
{
protected:
const char * getName() const override { return "statistic"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
/** Parse collation
* COLLATE utf8_unicode_ci NOT NULL
*/

View File

@ -44,6 +44,11 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_clear_index("CLEAR INDEX");
ParserKeyword s_materialize_index("MATERIALIZE INDEX");
ParserKeyword s_add_statistic("ADD STATISTIC");
ParserKeyword s_drop_statistic("DROP STATISTIC");
ParserKeyword s_clear_statistic("CLEAR STATISTIC");
ParserKeyword s_materialize_statistic("MATERIALIZE STATISTIC");
ParserKeyword s_add_constraint("ADD CONSTRAINT");
ParserKeyword s_drop_constraint("DROP CONSTRAINT");
@ -112,6 +117,7 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserIdentifier parser_remove_property;
ParserCompoundColumnDeclaration parser_col_decl;
ParserIndexDeclaration parser_idx_decl;
ParserStatisticDeclaration parser_stat_decl;
ParserConstraintDeclaration parser_constraint_decl;
ParserProjectionDeclaration parser_projection_decl;
ParserCompoundColumnDeclaration parser_modify_col_decl(false, false, true);
@ -327,6 +333,61 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
return false;
}
}
else if (s_add_statistic.ignore(pos, expected))
{
if (s_if_not_exists.ignore(pos, expected))
command->if_not_exists = true;
if (!parser_stat_decl.parse(pos, command->statistic_decl, expected))
return false;
command->type = ASTAlterCommand::ADD_STATISTIC;
}
else if (s_drop_statistic.ignore(pos, expected))
{
if (s_if_exists.ignore(pos, expected))
command->if_exists = true;
if (!parser_stat_decl.parse(pos, command->statistic_decl, expected))
return false;
command->type = ASTAlterCommand::DROP_STATISTIC;
}
else if (s_clear_statistic.ignore(pos, expected))
{
if (s_if_exists.ignore(pos, expected))
command->if_exists = true;
if (!parser_stat_decl.parse(pos, command->statistic_decl, expected))
return false;
command->type = ASTAlterCommand::DROP_STATISTIC;
command->clear_statistic = true;
command->detach = false;
if (s_in_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))
return false;
}
}
else if (s_materialize_statistic.ignore(pos, expected))
{
if (s_if_exists.ignore(pos, expected))
command->if_exists = true;
if (!parser_stat_decl.parse(pos, command->statistic_decl, expected))
return false;
command->type = ASTAlterCommand::MATERIALIZE_STATISTIC;
command->detach = false;
if (s_in_partition.ignore(pos, expected))
{
if (!parser_partition.parse(pos, command->partition, expected))
return false;
}
}
else if (s_add_projection.ignore(pos, expected))
{
if (s_if_not_exists.ignore(pos, expected))

View File

@ -6,6 +6,7 @@
#include <Parsers/ASTFunction.h>
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTStatisticDeclaration.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTProjectionDeclaration.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
@ -161,6 +162,33 @@ bool ParserIndexDeclaration::parseImpl(Pos & pos, ASTPtr & node, Expected & expe
return true;
}
bool ParserStatisticDeclaration::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_type("TYPE");
ParserList columns_p(std::make_unique<ParserIdentifier>(), std::make_unique<ParserToken>(TokenType::Comma), false);
ParserIdentifier type_p;
ASTPtr columns;
ASTPtr type;
if (!columns_p.parse(pos, columns, expected))
return false;
if (!s_type.ignore(pos, expected))
return false;
if (!type_p.parse(pos, type, expected))
return false;
auto stat = std::make_shared<ASTStatisticDeclaration>();
stat->set(stat->columns, columns);
stat->type = type->as<ASTIdentifier &>().name();
node = stat;
return true;
}
bool ParserConstraintDeclaration::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
ParserKeyword s_check("CHECK");

View File

@ -131,6 +131,7 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
ParserKeyword s_auto_increment{"AUTO_INCREMENT"};
ParserKeyword s_comment{"COMMENT"};
ParserKeyword s_codec{"CODEC"};
ParserKeyword s_stat{"STATISTIC"};
ParserKeyword s_ttl{"TTL"};
ParserKeyword s_remove{"REMOVE"};
ParserKeyword s_type{"TYPE"};
@ -141,6 +142,7 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
ParserLiteral literal_parser;
ParserCodec codec_parser;
ParserCollation collation_parser;
ParserStatisticType stat_type_parser;
ParserExpression expression_parser;
/// mandatory column name
@ -176,6 +178,7 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
ASTPtr default_expression;
ASTPtr comment_expression;
ASTPtr codec_expression;
ASTPtr stat_type_expression;
ASTPtr ttl_expression;
ASTPtr collation_expression;
bool primary_key_specifier = false;
@ -301,6 +304,12 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
return false;
}
if (s_stat.ignore(pos, expected))
{
if (!stat_type_parser.parse(pos, stat_type_expression, expected))
return false;
}
if (s_ttl.ignore(pos, expected))
{
if (!expression_parser.parse(pos, ttl_expression, expected))
@ -342,6 +351,12 @@ bool IParserColumnDeclaration<NameParser>::parseImpl(Pos & pos, ASTPtr & node, E
column_declaration->children.push_back(std::move(codec_expression));
}
if (stat_type_expression)
{
column_declaration->stat_type = stat_type_expression;
column_declaration->children.push_back(std::move(stat_type_expression));
}
if (ttl_expression)
{
column_declaration->ttl = ttl_expression;
@ -389,6 +404,16 @@ protected:
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
class ParserStatisticDeclaration : public IParserBase
{
public:
ParserStatisticDeclaration() = default;
protected:
const char * getName() const override { return "statistics declaration"; }
bool parseImpl(Pos & pos, ASTPtr & node, Expected & expected) override;
};
class ParserConstraintDeclaration : public IParserBase
{
protected:

View File

@ -558,7 +558,7 @@ void trySetStorageInTableJoin(const QueryTreeNodePtr & table_expression, std::sh
return;
}
if (!table_join->isEnabledAlgorithm(JoinAlgorithm::DIRECT))
if (!table_join->isEnabledAlgorithm(JoinAlgorithm::DIRECT) && !table_join->isEnabledAlgorithm(JoinAlgorithm::DEFAULT))
return;
if (auto storage_dictionary = std::dynamic_pointer_cast<StorageDictionary>(storage);
@ -645,6 +645,74 @@ std::shared_ptr<DirectKeyValueJoin> tryDirectJoin(const std::shared_ptr<TableJoi
}
static std::shared_ptr<IJoin> tryCreateJoin(JoinAlgorithm algorithm,
std::shared_ptr<TableJoin> & table_join,
const QueryTreeNodePtr & right_table_expression,
const Block & left_table_expression_header,
const Block & right_table_expression_header,
const PlannerContextPtr & planner_context)
{
/// Direct JOIN with special storages that support key value access. For example JOIN with Dictionary
if (algorithm == JoinAlgorithm::DIRECT || algorithm == JoinAlgorithm::DEFAULT)
{
JoinPtr direct_join = tryDirectJoin(table_join, right_table_expression, right_table_expression_header, planner_context);
if (direct_join)
return direct_join;
}
if (algorithm == JoinAlgorithm::PARTIAL_MERGE ||
algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE)
{
if (MergeJoin::isSupported(table_join))
return std::make_shared<MergeJoin>(table_join, right_table_expression_header);
}
if (algorithm == JoinAlgorithm::HASH ||
/// partial_merge is preferred, but can't be used for specified kind of join, fallback to hash
algorithm == JoinAlgorithm::PREFER_PARTIAL_MERGE ||
algorithm == JoinAlgorithm::PARALLEL_HASH ||
algorithm == JoinAlgorithm::DEFAULT)
{
if (table_join->allowParallelHashJoin())
{
auto query_context = planner_context->getQueryContext();
return std::make_shared<ConcurrentHashJoin>(query_context, table_join, query_context->getSettings().max_threads, right_table_expression_header);
}
return std::make_shared<HashJoin>(table_join, right_table_expression_header);
}
if (algorithm == JoinAlgorithm::FULL_SORTING_MERGE)
{
if (FullSortingMergeJoin::isSupported(table_join))
return std::make_shared<FullSortingMergeJoin>(table_join, right_table_expression_header);
}
if (algorithm == JoinAlgorithm::GRACE_HASH)
{
if (GraceHashJoin::isSupported(table_join))
{
auto query_context = planner_context->getQueryContext();
return std::make_shared<GraceHashJoin>(
query_context,
table_join,
left_table_expression_header,
right_table_expression_header,
query_context->getTempDataOnDisk());
}
}
if (algorithm == JoinAlgorithm::AUTO)
{
if (MergeJoin::isSupported(table_join))
return std::make_shared<JoinSwitcher>(table_join, right_table_expression_header);
return std::make_shared<HashJoin>(table_join, right_table_expression_header);
}
return nullptr;
}
std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> & table_join,
const QueryTreeNodePtr & right_table_expression,
const Block & left_table_expression_header,
@ -679,7 +747,7 @@ std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> & table_jo
if (table_join->isJoinWithConstant())
{
if (!table_join->isEnabledAlgorithm(JoinAlgorithm::HASH))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "JOIN with constant supported only with join algorithm 'hash'");
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "JOIN ON constant supported only with join algorithm 'hash'");
return std::make_shared<HashJoin>(table_join, right_table_expression_header);
}
@ -687,60 +755,11 @@ std::shared_ptr<IJoin> chooseJoinAlgorithm(std::shared_ptr<TableJoin> & table_jo
if (!table_join->oneDisjunct() && !table_join->isEnabledAlgorithm(JoinAlgorithm::HASH) && !table_join->isEnabledAlgorithm(JoinAlgorithm::AUTO))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Only `hash` join supports multiple ORs for keys in JOIN ON section");
/// Direct JOIN with special storages that support key value access. For example JOIN with Dictionary
if (table_join->isEnabledAlgorithm(JoinAlgorithm::DIRECT))
for (auto algorithm : table_join->getEnabledJoinAlgorithms())
{
JoinPtr direct_join = tryDirectJoin(table_join, right_table_expression, right_table_expression_header, planner_context);
if (direct_join)
return direct_join;
}
if (table_join->isEnabledAlgorithm(JoinAlgorithm::PARTIAL_MERGE) ||
table_join->isEnabledAlgorithm(JoinAlgorithm::PREFER_PARTIAL_MERGE))
{
if (MergeJoin::isSupported(table_join))
return std::make_shared<MergeJoin>(table_join, right_table_expression_header);
}
if (table_join->isEnabledAlgorithm(JoinAlgorithm::HASH) ||
/// partial_merge is preferred, but can't be used for specified kind of join, fallback to hash
table_join->isEnabledAlgorithm(JoinAlgorithm::PREFER_PARTIAL_MERGE) ||
table_join->isEnabledAlgorithm(JoinAlgorithm::PARALLEL_HASH))
{
if (table_join->allowParallelHashJoin())
{
auto query_context = planner_context->getQueryContext();
return std::make_shared<ConcurrentHashJoin>(query_context, table_join, query_context->getSettings().max_threads, right_table_expression_header);
}
return std::make_shared<HashJoin>(table_join, right_table_expression_header);
}
if (table_join->isEnabledAlgorithm(JoinAlgorithm::FULL_SORTING_MERGE))
{
if (FullSortingMergeJoin::isSupported(table_join))
return std::make_shared<FullSortingMergeJoin>(table_join, right_table_expression_header);
}
if (table_join->isEnabledAlgorithm(JoinAlgorithm::GRACE_HASH))
{
if (GraceHashJoin::isSupported(table_join))
{
auto query_context = planner_context->getQueryContext();
return std::make_shared<GraceHashJoin>(
query_context,
table_join,
left_table_expression_header,
right_table_expression_header,
query_context->getTempDataOnDisk());
}
}
if (table_join->isEnabledAlgorithm(JoinAlgorithm::AUTO))
{
if (MergeJoin::isSupported(table_join))
return std::make_shared<JoinSwitcher>(table_join, right_table_expression_header);
return std::make_shared<HashJoin>(table_join, right_table_expression_header);
auto join = tryCreateJoin(algorithm, table_join, right_table_expression, left_table_expression_header, right_table_expression_header, planner_context);
if (join)
return join;
}
throw Exception(ErrorCodes::NOT_IMPLEMENTED,

View File

@ -167,7 +167,9 @@ void CSVFormatReader::skipRow()
else if (*pos == '\r')
{
++istr.position();
if (!istr.eof() && *pos == '\n')
if (format_settings.csv.allow_cr_end_of_line)
return;
else if (!istr.eof() && *pos == '\n')
{
++pos;
return;
@ -509,7 +511,7 @@ void registerInputFormatCSV(FormatFactory & factory)
registerWithNamesAndTypes("CSV", register_func);
}
std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows)
std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows, const FormatSettings & settings)
{
char * pos = in.position();
bool quotes = false;
@ -561,7 +563,9 @@ std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memor
else if (*pos == '\r')
{
++pos;
if (loadAtPosition(in, memory, pos) && *pos == '\n')
if (settings.csv.allow_cr_end_of_line)
continue;
else if (loadAtPosition(in, memory, pos) && *pos == '\n')
++pos;
else
continue;
@ -584,9 +588,12 @@ void registerFileSegmentationEngineCSV(FormatFactory & factory)
auto register_func = [&](const String & format_name, bool, bool)
{
static constexpr size_t min_rows = 3; /// Make it 3 for header auto detection (first 3 rows must be always in the same segment).
factory.registerFileSegmentationEngine(format_name, [](ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
factory.registerFileSegmentationEngineCreator(format_name, [](const FormatSettings & settings) -> FormatFactory::FileSegmentationEngine
{
return fileSegmentationEngineCSVImpl(in, memory, min_bytes, min_rows, max_rows);
return [settings] (ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
return fileSegmentationEngineCSVImpl(in, memory, min_bytes, min_rows, max_rows, settings);
};
});
};

View File

@ -116,6 +116,6 @@ private:
DataTypes buffered_types;
};
std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows);
std::pair<bool, size_t> fileSegmentationEngineCSVImpl(ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t min_rows, size_t max_rows, const FormatSettings & settings);
}

View File

@ -66,10 +66,13 @@ void registerInputFormatHiveText(FormatFactory & factory)
void registerFileSegmentationEngineHiveText(FormatFactory & factory)
{
factory.registerFileSegmentationEngine(
factory.registerFileSegmentationEngineCreator(
"HiveText",
[](ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows) -> std::pair<bool, size_t> {
return fileSegmentationEngineCSVImpl(in, memory, min_bytes, 0, max_rows);
[](const FormatSettings & settings) -> FormatFactory::FileSegmentationEngine {
return [settings] (ReadBuffer & in, DB::Memory<> & memory, size_t min_bytes, size_t max_rows)
{
return fileSegmentationEngineCSVImpl(in, memory, min_bytes, 0, max_rows, settings);
};
});
}

View File

@ -40,6 +40,7 @@ void ParallelParsingInputFormat::segmentatorThreadFunction(ThreadGroupPtr thread
unit.segment.resize(0);
size_t segment_start = getDataOffsetMaybeCompressed(*in);
auto file_segmentation_engine = file_segmentation_engine_creator(format_settings);
auto [have_more_data, currently_read_rows] = file_segmentation_engine(*in, unit.segment, min_chunk_bytes, max_block_size);
unit.original_segment_size = getDataOffsetMaybeCompressed(*in) - segment_start;

View File

@ -86,8 +86,9 @@ public:
ReadBuffer & in;
Block header;
InternalParserCreator internal_parser_creator;
FormatFactory::FileSegmentationEngine file_segmentation_engine;
FormatFactory::FileSegmentationEngineCreator file_segmentation_engine_creator;
String format_name;
FormatSettings format_settings;
size_t max_threads;
size_t min_chunk_bytes;
size_t max_block_size;
@ -97,8 +98,9 @@ public:
explicit ParallelParsingInputFormat(Params params)
: IInputFormat(std::move(params.header), &params.in)
, internal_parser_creator(params.internal_parser_creator)
, file_segmentation_engine(params.file_segmentation_engine)
, file_segmentation_engine_creator(params.file_segmentation_engine_creator)
, format_name(params.format_name)
, format_settings(params.format_settings)
, min_chunk_bytes(params.min_chunk_bytes)
, max_block_size(params.max_block_size)
, is_server(params.is_server)
@ -197,8 +199,9 @@ private:
const InternalParserCreator internal_parser_creator;
/// Function to segment the file. Then "parsers" will parse that segments.
FormatFactory::FileSegmentationEngine file_segmentation_engine;
FormatFactory::FileSegmentationEngineCreator file_segmentation_engine_creator;
const String format_name;
const FormatSettings format_settings;
const size_t min_chunk_bytes;
const size_t max_block_size;

View File

@ -15,6 +15,29 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace
{
std::vector<std::pair<String, String>> describeJoinActions(const JoinPtr & join)
{
std::vector<std::pair<String, String>> description;
const auto & table_join = join->getTableJoin();
description.emplace_back("Type", toString(table_join.kind()));
description.emplace_back("Strictness", toString(table_join.strictness()));
description.emplace_back("Algorithm", join->getName());
if (table_join.strictness() == JoinStrictness::Asof)
description.emplace_back("ASOF inequality", toString(table_join.getAsofInequality()));
if (!table_join.getClauses().empty())
description.emplace_back("Clauses", table_join.formatClauses(table_join.getClauses(), true /*short_format*/));
return description;
}
}
JoinStep::JoinStep(
const DataStream & left_stream_,
const DataStream & right_stream_,
@ -65,30 +88,14 @@ void JoinStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, ' ');
const auto & table_join = join->getTableJoin();
settings.out << prefix << "Type: " << toString(table_join.kind()) << '\n';
settings.out << prefix << "Strictness: " << toString(table_join.strictness()) << '\n';
settings.out << prefix << "Algorithm: " << join->getName() << '\n';
if (table_join.strictness() == JoinStrictness::Asof)
settings.out << prefix << "ASOF inequality: " << toString(table_join.getAsofInequality()) << '\n';
if (!table_join.getClauses().empty())
settings.out << prefix << "Clauses: " << table_join.formatClauses(table_join.getClauses(), true /*short_format*/) << '\n';
for (const auto & [name, value] : describeJoinActions(join))
settings.out << prefix << name << ": " << value << '\n';
}
void JoinStep::describeActions(JSONBuilder::JSONMap & map) const
{
const auto & table_join = join->getTableJoin();
map.add("Type", toString(table_join.kind()));
map.add("Strictness", toString(table_join.strictness()));
map.add("Algorithm", join->getName());
if (table_join.strictness() == JoinStrictness::Asof)
map.add("ASOF inequality", toString(table_join.getAsofInequality()));
if (!table_join.getClauses().empty())
map.add("Clauses", table_join.formatClauses(table_join.getClauses(), true /*short_format*/));
for (const auto & [name, value] : describeJoinActions(join))
map.add(name, value);
}
void JoinStep::updateOutputStream()
@ -151,4 +158,18 @@ void FilledJoinStep::updateOutputStream()
input_streams.front(), JoiningTransform::transformHeader(input_streams.front().header, join), getDataStreamTraits());
}
void FilledJoinStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, ' ');
for (const auto & [name, value] : describeJoinActions(join))
settings.out << prefix << name << ": " << value << '\n';
}
void FilledJoinStep::describeActions(JSONBuilder::JSONMap & map) const
{
for (const auto & [name, value] : describeJoinActions(join))
map.add(name, value);
}
}

View File

@ -54,6 +54,9 @@ public:
String getName() const override { return "FilledJoin"; }
void transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings &) override;
void describeActions(JSONBuilder::JSONMap & map) const override;
void describeActions(FormatSettings & settings) const override;
const JoinPtr & getJoin() const { return join; }
private:

View File

@ -161,6 +161,7 @@ void optimizePrewhere(Stack & stack, QueryPlan::Nodes & nodes)
MergeTreeWhereOptimizer where_optimizer{
std::move(column_compressed_sizes),
storage_metadata,
storage.getConditionEstimatorByPredicate(read_from_merge_tree->getQueryInfo(), storage_snapshot, context),
queried_columns,
storage.supportedPrewhereColumns(),
&Poco::Logger::get("QueryPlanOptimizePrewhere")};

View File

@ -50,6 +50,7 @@ PrometheusMetricsWriter::PrometheusMetricsWriter(
, send_events(config.getBool(config_name + ".events", true))
, send_metrics(config.getBool(config_name + ".metrics", true))
, send_asynchronous_metrics(config.getBool(config_name + ".asynchronous_metrics", true))
, send_errors(config.getBool(config_name + ".errors", true))
{
}
@ -112,12 +113,32 @@ void PrometheusMetricsWriter::write(WriteBuffer & wb) const
std::string metric_doc{value.documentation};
convertHelpToSingleLine(metric_doc);
// TODO: add HELP section? asynchronous_metrics contains only key and value
writeOutLine(wb, "# HELP", key, metric_doc);
writeOutLine(wb, "# TYPE", key, "gauge");
writeOutLine(wb, key, value.value);
}
}
if (send_errors)
{
for (size_t i = 0, end = ErrorCodes::end(); i < end; ++i)
{
const auto & error = ErrorCodes::values[i].get();
std::string_view name = ErrorCodes::getName(static_cast<ErrorCodes::ErrorCode>(i));
if (name.empty())
continue;
std::string key{error_metrics_prefix + toString(name)};
std::string help = fmt::format("The number of {} errors since last server restart", name);
writeOutLine(wb, "# HELP", key, help);
writeOutLine(wb, "# TYPE", key, "counter");
/// We are interested in errors which are happened only on this server.
writeOutLine(wb, key, error.local.count);
}
}
}
}

View File

@ -27,10 +27,12 @@ private:
const bool send_events;
const bool send_metrics;
const bool send_asynchronous_metrics;
const bool send_errors;
static inline constexpr auto profile_events_prefix = "ClickHouseProfileEvents_";
static inline constexpr auto current_metrics_prefix = "ClickHouseMetrics_";
static inline constexpr auto asynchronous_metrics_prefix = "ClickHouseAsyncMetrics_";
static inline constexpr auto error_metrics_prefix = "ClickHouseErrorMetric_";
};
}

View File

@ -22,6 +22,7 @@
#include <Parsers/ASTIdentifier.h>
#include <Parsers/ASTIndexDeclaration.h>
#include <Parsers/ASTProjectionDeclaration.h>
#include <Parsers/ASTStatisticDeclaration.h>
#include <Parsers/ASTLiteral.h>
#include <Parsers/ASTSetQuery.h>
#include <Parsers/queryToString.h>
@ -41,6 +42,7 @@ namespace DB
namespace ErrorCodes
{
extern const int ILLEGAL_COLUMN;
extern const int ILLEGAL_STATISTIC;
extern const int BAD_ARGUMENTS;
extern const int NOT_FOUND_COLUMN_IN_BLOCK;
extern const int LOGICAL_ERROR;
@ -237,6 +239,21 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
return command;
}
else if (command_ast->type == ASTAlterCommand::ADD_STATISTIC)
{
AlterCommand command;
command.ast = command_ast->clone();
command.statistic_decl = command_ast->statistic_decl;
command.type = AlterCommand::ADD_STATISTIC;
const auto & ast_stat_decl = command_ast->statistic_decl->as<ASTStatisticDeclaration &>();
command.statistic_columns = ast_stat_decl.getColumnNames();
command.statistic_type = ast_stat_decl.type;
command.if_not_exists = command_ast->if_not_exists;
return command;
}
else if (command_ast->type == ASTAlterCommand::ADD_CONSTRAINT)
{
AlterCommand command;
@ -296,6 +313,23 @@ std::optional<AlterCommand> AlterCommand::parse(const ASTAlterCommand * command_
return command;
}
else if (command_ast->type == ASTAlterCommand::DROP_STATISTIC)
{
AlterCommand command;
command.ast = command_ast->clone();
command.type = AlterCommand::DROP_STATISTIC;
const auto & ast_stat_decl = command_ast->statistic_decl->as<ASTStatisticDeclaration &>();
command.statistic_columns = ast_stat_decl.getColumnNames();
command.statistic_type = ast_stat_decl.type;
command.if_exists = command_ast->if_exists;
command.clear = command_ast->clear_statistic;
if (command_ast->partition)
command.partition = command_ast->partition;
return command;
}
else if (command_ast->type == ASTAlterCommand::DROP_PROJECTION)
{
AlterCommand command;
@ -582,6 +616,42 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
metadata.secondary_indices.erase(erase_it);
}
}
else if (type == ADD_STATISTIC)
{
for (const auto & statistic_column_name : statistic_columns)
{
if (!metadata.columns.has(statistic_column_name))
{
throw Exception(ErrorCodes::ILLEGAL_STATISTIC, "Cannot add statistic {} with type {}: this column is not found", statistic_column_name, statistic_type);
}
if (!if_exists && metadata.columns.get(statistic_column_name).stat)
throw Exception(ErrorCodes::ILLEGAL_STATISTIC, "Cannot add statistic {} with type {}: statistic on this column with this type already exists", statistic_column_name, statistic_type);
}
auto stats = StatisticDescription::getStatisticsFromAST(statistic_decl, metadata.columns);
for (auto && stat : stats)
{
metadata.columns.modify(stat.column_name,
[&](ColumnDescription & column) { column.stat = std::move(stat); });
}
}
else if (type == DROP_STATISTIC)
{
for (const auto & stat_column_name : statistic_columns)
{
if (!metadata.columns.has(stat_column_name) || !metadata.columns.get(stat_column_name).stat)
{
if (if_exists)
return;
throw Exception(ErrorCodes::ILLEGAL_STATISTIC, "Wrong statistic name. Cannot find statistic {} with type {} to drop", backQuote(stat_column_name), statistic_type);
}
if (!partition && !clear)
{
metadata.columns.modify(stat_column_name,
[&](ColumnDescription & column) { column.stat = std::nullopt; });
}
}
}
else if (type == ADD_CONSTRAINT)
{
auto constraints = metadata.constraints.getConstraints();
@ -684,6 +754,8 @@ void AlterCommand::apply(StorageInMemoryMetadata & metadata, ContextPtr context)
rename_visitor.visit(column_to_modify.default_desc.expression);
if (column_to_modify.ttl)
rename_visitor.visit(column_to_modify.ttl);
if (column_to_modify.name == column_name && column_to_modify.stat)
column_to_modify.stat->column_name = rename_to;
});
}
if (metadata.table_ttl.definition_ast)
@ -805,7 +877,7 @@ bool AlterCommand::isRequireMutationStage(const StorageInMemoryMetadata & metada
if (isRemovingProperty() || type == REMOVE_TTL || type == REMOVE_SAMPLE_BY)
return false;
if (type == DROP_INDEX || type == DROP_PROJECTION || type == RENAME_COLUMN)
if (type == DROP_INDEX || type == DROP_PROJECTION || type == RENAME_COLUMN || type == DROP_STATISTIC)
return true;
/// Drop alias is metadata alter, in other case mutation is required.
@ -913,6 +985,18 @@ std::optional<MutationCommand> AlterCommand::tryConvertToMutationCommand(Storage
result.predicate = nullptr;
}
else if (type == DROP_STATISTIC)
{
result.type = MutationCommand::Type::DROP_STATISTIC;
result.statistic_columns = statistic_columns;
if (clear)
result.clear = true;
if (partition)
result.partition = partition;
result.predicate = nullptr;
}
else if (type == DROP_PROJECTION)
{
result.type = MutationCommand::Type::DROP_PROJECTION;

View File

@ -38,6 +38,8 @@ struct AlterCommand
DROP_CONSTRAINT,
ADD_PROJECTION,
DROP_PROJECTION,
ADD_STATISTIC,
DROP_STATISTIC,
MODIFY_TTL,
MODIFY_SETTING,
RESET_SETTING,
@ -118,6 +120,10 @@ struct AlterCommand
/// For ADD/DROP PROJECTION
String projection_name;
ASTPtr statistic_decl = nullptr;
std::vector<String> statistic_columns;
String statistic_type;
/// For MODIFY TTL
ASTPtr ttl = nullptr;

View File

@ -24,7 +24,10 @@ struct ColumnDependency
TTL_EXPRESSION,
/// TTL is set for @column_name.
TTL_TARGET
TTL_TARGET,
/// Exists any statistic, that requires @column_name
STATISTIC,
};
ColumnDependency(const String & column_name_, Kind kind_)

View File

@ -71,6 +71,7 @@ bool ColumnDescription::operator==(const ColumnDescription & other) const
return name == other.name
&& type->equals(*other.type)
&& default_desc == other.default_desc
&& stat == other.stat
&& ast_to_str(codec) == ast_to_str(other.codec)
&& ast_to_str(ttl) == ast_to_str(other.ttl);
}
@ -104,6 +105,12 @@ void ColumnDescription::writeText(WriteBuffer & buf) const
writeEscapedString(queryToString(codec), buf);
}
if (stat)
{
writeChar('\t', buf);
writeEscapedString(queryToString(stat->ast), buf);
}
if (ttl)
{
writeChar('\t', buf);

View File

@ -7,6 +7,7 @@
#include <Core/NamesAndAliases.h>
#include <Interpreters/Context_fwd.h>
#include <Storages/ColumnDefault.h>
#include <Storages/StatisticsDescription.h>
#include <Common/Exception.h>
#include <boost/multi_index/member.hpp>
@ -83,6 +84,7 @@ struct ColumnDescription
String comment;
ASTPtr codec;
ASTPtr ttl;
std::optional<StatisticDescription> stat;
ColumnDescription() = default;
ColumnDescription(ColumnDescription &&) = default;

View File

@ -969,6 +969,11 @@ NamesAndTypesList StorageHDFS::getVirtuals() const
return virtual_columns;
}
Names StorageHDFS::getVirtualColumnNames()
{
return VirtualColumnUtils::getPathFileAndSizeVirtualsForStorage({}).getNames();
}
SchemaCache & StorageHDFS::getSchemaCache(const ContextPtr & ctx)
{
static SchemaCache schema_cache(ctx->getConfigRef().getUInt("schema_inference_cache_max_elements_for_hdfs", DEFAULT_SCHEMA_CACHE_ELEMENTS));

View File

@ -69,6 +69,7 @@ public:
TableExclusiveLockHolder &) override;
NamesAndTypesList getVirtuals() const override;
static Names getVirtualColumnNames();
bool supportsPartitionBy() const override { return true; }

View File

@ -12,6 +12,7 @@
#include <Processors/QueryPlan/ReadFromPreparedSource.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Storages/AlterCommands.h>
#include <Storages/Statistics/Estimator.h>
#include <Backups/RestorerFromBackup.h>
#include <Backups/IBackup.h>
@ -225,6 +226,11 @@ StorageID IStorage::getStorageID() const
return storage_id;
}
ConditionEstimator IStorage::getConditionEstimatorByPredicate(const SelectQueryInfo &, const StorageSnapshotPtr &, ContextPtr) const
{
return {};
}
void IStorage::renameInMemory(const StorageID & new_table_id)
{
std::lock_guard lock(id_mutex);

View File

@ -68,6 +68,8 @@ using DatabaseAndTableName = std::pair<String, String>;
class BackupEntriesCollector;
class RestorerFromBackup;
class ConditionEstimator;
struct ColumnSize
{
size_t marks = 0;
@ -135,6 +137,8 @@ public:
/// Returns true if the storage supports queries with the PREWHERE section.
virtual bool supportsPrewhere() const { return false; }
virtual ConditionEstimator getConditionEstimatorByPredicate(const SelectQueryInfo &, const StorageSnapshotPtr &, ContextPtr) const;
/// Returns which columns supports PREWHERE, or empty std::nullopt if all columns is supported.
/// This is needed for engines whose aggregates data from multiple tables, like Merge.
virtual std::optional<NameSet> supportedPrewhereColumns() const { return std::nullopt; }

View File

@ -10,6 +10,8 @@
#include <Parsers/parseQuery.h>
#include <Storages/extractKeyExpressionList.h>
#include <Storages/ReplaceAliasByExpressionVisitor.h>
#include <Core/Defines.h>
#include "Common/Exception.h"
@ -22,6 +24,11 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace
{
using ReplaceAliasToExprVisitor = InDepthNodeVisitor<ReplaceAliasByExpressionMatcher, true>;
}
IndexDescription::IndexDescription(const IndexDescription & other)
: definition_ast(other.definition_ast ? other.definition_ast->clone() : nullptr)
, expression_list_ast(other.expression_list_ast ? other.expression_list_ast->clone() : nullptr)
@ -94,6 +101,10 @@ IndexDescription IndexDescription::getIndexFromAST(const ASTPtr & definition_ast
if (index_definition->expr)
{
expr_list = extractKeyExpressionList(index_definition->expr->clone());
ReplaceAliasToExprVisitor::Data data{columns};
ReplaceAliasToExprVisitor{data}.visit(expr_list);
result.expression_list_ast = expr_list->clone();
}
else

View File

@ -779,7 +779,7 @@ MergeTreeData::MutableDataPartPtr Fetcher::downloadPartToMemory(
}
MergedBlockOutputStream part_out(
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {},
new_data_part, metadata_snapshot, block.getNamesAndTypesList(), {}, {},
CompressionCodecFactory::instance().get("NONE", {}), NO_TRANSACTION_PTR);
part_out.write(block);

View File

@ -18,6 +18,7 @@
#include <Storages/MergeTree/PartMetadataManagerOrdinary.h>
#include <Core/NamesAndTypes.h>
#include <Storages/ColumnsDescription.h>
#include <Compression/CompressedReadBuffer.h>
#include <Common/StringUtils/StringUtils.h>
#include <Common/escapeForFileName.h>
#include <Common/CurrentMetrics.h>
@ -628,6 +629,31 @@ String IMergeTreeDataPart::getColumnNameWithMinimumCompressedSize(bool with_subc
return *minimum_size_column;
}
Statistics IMergeTreeDataPart::loadStatistics() const
{
const auto & metadata_snaphost = storage.getInMemoryMetadata();
auto total_statistics = MergeTreeStatisticsFactory::instance().getMany(metadata_snaphost.getColumns());
Statistics result;
for (auto & stat : total_statistics)
{
String file_name = stat->getFileName() + STAT_FILE_SUFFIX;
String file_path = fs::path(getDataPartStorage().getRelativePath()) / file_name;
if (!metadata_manager->exists(file_name))
{
LOG_INFO(storage.log, "Cannot find stats file {}", file_path);
continue;
}
auto stat_file = metadata_manager->read(file_name);
CompressedReadBuffer compressed_buffer(*stat_file);
stat->deserialize(compressed_buffer);
result.push_back(stat);
}
return result;
}
void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency)
{
assertOnDisk();

View File

@ -17,6 +17,7 @@
#include <Storages/MergeTree/MergeTreeDataPartChecksum.h>
#include <Storages/MergeTree/MergeTreeDataPartTTLInfo.h>
#include <Storages/MergeTree/MergeTreeIOSettings.h>
#include <Storages/Statistics/Statistics.h>
#include <Storages/MergeTree/KeyCondition.h>
#include <Storages/MergeTree/MergeTreeDataPartBuilder.h>
#include <Storages/ColumnsDescription.h>
@ -103,6 +104,7 @@ public:
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const Statistics & stats_to_recalc_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity) = 0;
@ -168,6 +170,8 @@ public:
void remove();
Statistics loadStatistics() const;
/// Initialize columns (from columns.txt if exists, or create from column files if not).
/// Load various metadata into memory: checksums from checksums.txt, index if required, etc.
void loadColumnsChecksumsIndexes(bool require_columns_checksums, bool check_consistency);

View File

@ -1,4 +1,5 @@
#include "Storages/MergeTree/IDataPartStorage.h"
#include <Storages/MergeTree/IDataPartStorage.h>
#include <Storages/Statistics/Statistics.h>
#include <Storages/MergeTree/MergeTask.h>
#include <memory>
@ -373,6 +374,7 @@ bool MergeTask::ExecuteAndFinalizeHorizontalPart::prepare()
global_ctx->metadata_snapshot,
global_ctx->merging_columns,
MergeTreeIndexFactory::instance().getMany(global_ctx->metadata_snapshot->getSecondaryIndices()),
MergeTreeStatisticsFactory::instance().getMany(global_ctx->metadata_snapshot->getColumns()),
ctx->compression_codec,
global_ctx->txn,
/*reset_columns=*/ true,
@ -590,6 +592,7 @@ void MergeTask::VerticalMergeStage::prepareVerticalMergeForOneColumn() const
/// because all of them were already recalculated and written
/// as key part of vertical merge
std::vector<MergeTreeIndexPtr>{},
std::vector<StatisticPtr>{}, /// TODO: think about it
&global_ctx->written_offset_columns,
global_ctx->to->getIndexGranularity());

View File

@ -76,6 +76,7 @@
#include <Storages/MergeTree/MergeTreeDataPartCompact.h>
#include <Storages/MergeTree/MergeTreeDataPartInMemory.h>
#include <Storages/MergeTree/MergeTreeDataPartWide.h>
#include <Storages/Statistics/Estimator.h>
#include <Storages/MergeTree/MergeTreeSelectProcessor.h>
#include <Storages/MergeTree/checkDataPart.h>
#include <Storages/MutationCommands.h>
@ -444,6 +445,50 @@ StoragePolicyPtr MergeTreeData::getStoragePolicy() const
return storage_policy;
}
ConditionEstimator MergeTreeData::getConditionEstimatorByPredicate(const SelectQueryInfo & query_info, const StorageSnapshotPtr & storage_snapshot, ContextPtr local_context) const
{
if (!local_context->getSettings().allow_statistic_optimize)
return {};
const auto & parts = assert_cast<const MergeTreeData::SnapshotData &>(*storage_snapshot->data).parts;
if (parts.empty())
{
return {};
}
ASTPtr expression_ast;
ConditionEstimator result;
PartitionPruner partition_pruner(storage_snapshot->metadata, query_info, local_context, true /* strict */);
if (partition_pruner.isUseless())
{
/// Read all partitions.
for (const auto & part : parts)
{
auto stats = part->loadStatistics();
/// TODO: We only have one stats file for every part.
for (const auto & stat : stats)
result.merge(part->info.getPartNameV1(), part->rows_count, stat);
}
}
else
{
for (const auto & part : parts)
{
if (!partition_pruner.canBePruned(*part))
{
auto stats = part->loadStatistics();
for (const auto & stat : stats)
result.merge(part->info.getPartNameV1(), part->rows_count, stat);
}
}
}
return result;
}
bool MergeTreeData::supportsFinal() const
{
return merging_params.mode == MergingParams::Collapsing
@ -611,6 +656,12 @@ void MergeTreeData::checkProperties(
}
}
for (const auto & col : new_metadata.columns)
{
if (col.stat)
MergeTreeStatisticsFactory::instance().validate(*col.stat, col.type);
}
checkKeyExpression(*new_sorting_key.expression, new_sorting_key.sample_block, "Sorting", allow_nullable_key_);
}
@ -3121,6 +3172,17 @@ void MergeTreeData::checkAlterIsPossible(const AlterCommands & commands, Context
{
columns_to_check_conversion.push_back(
new_metadata.getColumns().getPhysical(command.column_name));
const auto & old_column = old_metadata.getColumns().get(command.column_name);
if (old_column.stat)
{
const auto & new_column = new_metadata.getColumns().get(command.column_name);
if (!old_column.type->equals(*new_column.type))
throw Exception(ErrorCodes::ALTER_OF_COLUMN_IS_FORBIDDEN,
"ALTER types of column {} with statistic is not not safe "
"because it can change the representation of statistic",
backQuoteIfNeed(command.column_name));
}
}
}
}
@ -4793,7 +4855,6 @@ void MergeTreeData::movePartitionToDisk(const ASTPtr & partition, const String &
if (parts.empty())
{
String no_parts_to_move_message;
if (moving_part)
throw Exception(ErrorCodes::UNKNOWN_DISK, "Part '{}' is already on disk '{}'", partition_id, disk->getName());
else
@ -4875,7 +4936,6 @@ void MergeTreeData::movePartitionToVolume(const ASTPtr & partition, const String
if (parts.empty())
{
String no_parts_to_move_message;
if (moving_part)
throw Exception(ErrorCodes::UNKNOWN_DISK, "Part '{}' is already on volume '{}'", partition_id, volume->getName());
else
@ -8219,7 +8279,9 @@ std::pair<MergeTreeData::MutableDataPartPtr, scope_guard> MergeTreeData::createE
const auto & index_factory = MergeTreeIndexFactory::instance();
MergedBlockOutputStream out(new_data_part, metadata_snapshot, columns,
index_factory.getMany(metadata_snapshot->getSecondaryIndices()), compression_codec, txn);
index_factory.getMany(metadata_snapshot->getSecondaryIndices()),
Statistics{},
compression_codec, txn);
bool sync_on_insert = settings->fsync_after_insert;

View File

@ -428,6 +428,8 @@ public:
bool supportsPrewhere() const override { return true; }
ConditionEstimator getConditionEstimatorByPredicate(const SelectQueryInfo &, const StorageSnapshotPtr &, ContextPtr) const override;
bool supportsFinal() const override;
bool supportsSubcolumns() const override { return true; }

View File

@ -53,6 +53,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
const NamesAndTypesList & columns_list,
const StorageMetadataPtr & metadata_snapshot,
const std::vector<MergeTreeIndexPtr> & indices_to_recalc,
const Statistics & stats_to_recalc_,
const CompressionCodecPtr & default_codec_,
const MergeTreeWriterSettings & writer_settings,
const MergeTreeIndexGranularity & computed_index_granularity)
@ -73,7 +74,7 @@ IMergeTreeDataPart::MergeTreeWriterPtr MergeTreeDataPartCompact::getWriter(
return std::make_unique<MergeTreeDataPartWriterCompact>(
shared_from_this(), ordered_columns_list, metadata_snapshot,
indices_to_recalc, getMarksFileExtension(),
indices_to_recalc, stats_to_recalc_, getMarksFileExtension(),
default_codec_, writer_settings, computed_index_granularity);
}

Some files were not shown because too many files have changed in this diff Show More