Merge branch 'master' of github.com:ClickHouse/ClickHouse into revert-60216-revert-59697-check-stack-size-in-parser

This commit is contained in:
Alexey Milovidov 2024-02-27 17:30:32 +01:00
commit 0f3c9963d1
85 changed files with 1109 additions and 1119 deletions

View File

@ -74,6 +74,10 @@ Specifying the `sharding_key` is necessary for the following:
`fsync_directories` - do the `fsync` for directories. Guarantees that the OS refreshed directory metadata after operations related to background inserts on Distributed table (after insert, after sending the data to shard, etc.).
#### skip_unavailable_shards
`skip_unavailable_shards` - If true, ClickHouse silently skips unavailable shards. Shard is marked as unavailable when: 1) The shard cannot be reached due to a connection failure. 2) Shard is unresolvable through DNS. 3) Table does not exist on the shard. Default false.
#### bytes_to_throw_insert
`bytes_to_throw_insert` - if more than this number of compressed bytes will be pending for background INSERT, an exception will be thrown. 0 - do not throw. Default 0.
@ -102,6 +106,10 @@ Specifying the `sharding_key` is necessary for the following:
`background_insert_max_sleep_time_ms` - same as [distributed_background_insert_max_sleep_time_ms](../../../operations/settings/settings.md#distributed_background_insert_max_sleep_time_ms)
#### flush_on_detach
`flush_on_detach` - Flush data to remote nodes on DETACH/DROP/server shutdown. Default true.
:::note
**Durability settings** (`fsync_...`):

View File

@ -79,10 +79,7 @@ It is recommended to use official pre-compiled `deb` packages for Debian or Ubun
#### Setup the Debian repository
``` bash
sudo apt-get install -y apt-transport-https ca-certificates dirmngr
GNUPGHOME=$(mktemp -d)
sudo GNUPGHOME="$GNUPGHOME" gpg --no-default-keyring --keyring /usr/share/keyrings/clickhouse-keyring.gpg --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys 8919F6BD2B48D754
sudo rm -rf "$GNUPGHOME"
sudo chmod +r /usr/share/keyrings/clickhouse-keyring.gpg
sudo gpg --no-default-keyring --keyring /usr/share/keyrings/clickhouse-keyring.gpg --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys 8919F6BD2B48D754
echo "deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb stable main" | sudo tee \
/etc/apt/sources.list.d/clickhouse.list

View File

@ -2041,7 +2041,7 @@ Possible values:
- 0 — Disabled.
- 1 — Enabled.
Default value: 1.
Default value: 0.
By default, async inserts are inserted into replicated tables by the `INSERT` statement enabling [async_insert](#async-insert) are deduplicated (see [Data Replication](../../engines/table-engines/mergetree-family/replication.md)).
For the replicated tables, by default, only 10000 of the most recent inserts for each partition are deduplicated (see [replicated_deduplication_window_for_async_inserts](merge-tree-settings.md/#replicated-deduplication-window-async-inserts), [replicated_deduplication_window_seconds_for_async_inserts](merge-tree-settings.md/#replicated-deduplication-window-seconds-async-inserts)).
@ -3449,7 +3449,7 @@ Has an effect only when the connection is made through the MySQL wire protocol.
- 0 - Use `BLOB`.
- 1 - Use `TEXT`.
Default value: `0`.
Default value: `1`.
## mysql_map_fixed_string_to_text_in_show_columns {#mysql_map_fixed_string_to_text_in_show_columns}
@ -3460,7 +3460,7 @@ Has an effect only when the connection is made through the MySQL wire protocol.
- 0 - Use `BLOB`.
- 1 - Use `TEXT`.
Default value: `0`.
Default value: `1`.
## execute_merges_on_single_replica_time_threshold {#execute-merges-on-single-replica-time-threshold}
@ -3710,7 +3710,7 @@ Default value: `0`.
## allow_experimental_live_view {#allow-experimental-live-view}
Allows creation of experimental [live views](../../sql-reference/statements/create/view.md/#live-view).
Allows creation of a deprecated LIVE VIEW.
Possible values:
@ -3721,21 +3721,15 @@ Default value: `0`.
## live_view_heartbeat_interval {#live-view-heartbeat-interval}
Sets the heartbeat interval in seconds to indicate [live view](../../sql-reference/statements/create/view.md/#live-view) is alive .
Default value: `15`.
Deprecated.
## max_live_view_insert_blocks_before_refresh {#max-live-view-insert-blocks-before-refresh}
Sets the maximum number of inserted blocks after which mergeable blocks are dropped and query for [live view](../../sql-reference/statements/create/view.md/#live-view) is re-executed.
Default value: `64`.
Deprecated.
## periodic_live_view_refresh {#periodic-live-view-refresh}
Sets the interval in seconds after which periodically refreshed [live view](../../sql-reference/statements/create/view.md/#live-view) is forced to refresh.
Default value: `60`.
Deprecated.
## http_connection_timeout {#http_connection_timeout}

View File

@ -111,6 +111,14 @@ On newer Linux kernels transparent huge pages are alright.
$ echo 'madvise' | sudo tee /sys/kernel/mm/transparent_hugepage/enabled
```
If you want to modify the transparent huge pages setting permanently, editing the `/etc/default/grub` to add the `transparent_hugepage=never` to the `GRUB_CMDLINE_LINUX_DEFAULT` option:
```bash
$ GRUB_CMDLINE_LINUX_DEFAULT="transparent_hugepage=madvise ..."
```
After that, run the `sudo update-grub` command then reboot to take effect.
## Hypervisor configuration
If you are using OpenStack, set

View File

@ -0,0 +1,50 @@
---
slug: /en/sql-reference/aggregate-functions/reference/grouparrayintersect
sidebar_position: 115
---
# groupArrayIntersect
Return an intersection of given arrays (Return all items of arrays, that are in all given arrays).
**Syntax**
``` sql
groupArrayIntersect(x)
```
**Arguments**
- `x` — Argument (column name or expression).
**Returned values**
- Array that contains elements that are in all arrays.
Type: [Array](../../data-types/array.md).
**Examples**
Consider table `numbers`:
``` text
┌─a──────────────┐
│ [1,2,4] │
│ [1,5,2,8,-1,0] │
│ [1,5,7,5,8,2] │
└────────────────┘
```
Query with column name as argument:
``` sql
SELECT groupArrayIntersect(a) as intersection FROM numbers;
```
Result:
```text
┌─intersection──────┐
│ [1, 2] │
└───────────────────┘
```

View File

@ -55,6 +55,7 @@ ClickHouse-specific aggregate functions:
- [groupArrayMovingSum](/docs/en/sql-reference/aggregate-functions/reference/grouparraymovingsum.md)
- [groupArraySample](./grouparraysample.md)
- [groupArraySorted](/docs/en/sql-reference/aggregate-functions/reference/grouparraysorted.md)
- [groupArrayIntersect](./grouparrayintersect.md)
- [groupBitAnd](/docs/en/sql-reference/aggregate-functions/reference/groupbitand.md)
- [groupBitOr](/docs/en/sql-reference/aggregate-functions/reference/groupbitor.md)
- [groupBitXor](/docs/en/sql-reference/aggregate-functions/reference/groupbitxor.md)

View File

@ -5,25 +5,25 @@ sidebar_position: 221
# stochasticLinearRegression
This function implements stochastic linear regression. It supports custom parameters for learning rate, L2 regularization coefficient, mini-batch size and has few methods for updating weights ([Adam](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Adam) (used by default), [simple SGD](https://en.wikipedia.org/wiki/Stochastic_gradient_descent), [Momentum](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Momentum), [Nesterov](https://mipt.ru/upload/medialibrary/d7e/41-91.pdf)).
This function implements stochastic linear regression. It supports custom parameters for learning rate, L2 regularization coefficient, mini-batch size, and has a few methods for updating weights ([Adam](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Adam) (used by default), [simple SGD](https://en.wikipedia.org/wiki/Stochastic_gradient_descent), [Momentum](https://en.wikipedia.org/wiki/Stochastic_gradient_descent#Momentum), and [Nesterov](https://mipt.ru/upload/medialibrary/d7e/41-91.pdf)).
### Parameters
There are 4 customizable parameters. They are passed to the function sequentially, but there is no need to pass all four - default values will be used, however good model required some parameter tuning.
``` text
stochasticLinearRegression(1.0, 1.0, 10, 'SGD')
stochasticLinearRegression(0.00001, 0.1, 15, 'Adam')
```
1. `learning rate` is the coefficient on step length, when gradient descent step is performed. Too big learning rate may cause infinite weights of the model. Default is `0.00001`.
1. `learning rate` is the coefficient on step length, when the gradient descent step is performed. A learning rate that is too big may cause infinite weights of the model. Default is `0.00001`.
2. `l2 regularization coefficient` which may help to prevent overfitting. Default is `0.1`.
3. `mini-batch size` sets the number of elements, which gradients will be computed and summed to perform one step of gradient descent. Pure stochastic descent uses one element, however having small batches(about 10 elements) make gradient steps more stable. Default is `15`.
4. `method for updating weights`, they are: `Adam` (by default), `SGD`, `Momentum`, `Nesterov`. `Momentum` and `Nesterov` require little bit more computations and memory, however they happen to be useful in terms of speed of convergence and stability of stochastic gradient methods.
3. `mini-batch size` sets the number of elements, which gradients will be computed and summed to perform one step of gradient descent. Pure stochastic descent uses one element, however, having small batches (about 10 elements) makes gradient steps more stable. Default is `15`.
4. `method for updating weights`, they are: `Adam` (by default), `SGD`, `Momentum`, and `Nesterov`. `Momentum` and `Nesterov` require a little bit more computations and memory, however, they happen to be useful in terms of speed of convergence and stability of stochastic gradient methods.
### Usage
`stochasticLinearRegression` is used in two steps: fitting the model and predicting on new data. In order to fit the model and save its state for later usage we use `-State` combinator, which basically saves the state (model weights, etc).
To predict we use function [evalMLMethod](../../../sql-reference/functions/machine-learning-functions.md#machine_learning_methods-evalmlmethod), which takes a state as an argument as well as features to predict on.
`stochasticLinearRegression` is used in two steps: fitting the model and predicting on new data. In order to fit the model and save its state for later usage, we use the `-State` combinator, which saves the state (e.g. model weights).
To predict, we use the function [evalMLMethod](../../../sql-reference/functions/machine-learning-functions.md#machine_learning_methods-evalmlmethod), which takes a state as an argument as well as features to predict on.
<a name="stochasticlinearregression-usage-fitting"></a>
@ -44,12 +44,12 @@ stochasticLinearRegressionState(0.1, 0.0, 5, 'SGD')(target, param1, param2)
AS state FROM train_data;
```
Here we also need to insert data into `train_data` table. The number of parameters is not fixed, it depends only on number of arguments, passed into `linearRegressionState`. They all must be numeric values.
Note that the column with target value(which we would like to learn to predict) is inserted as the first argument.
Here, we also need to insert data into the `train_data` table. The number of parameters is not fixed, it depends only on the number of arguments passed into `linearRegressionState`. They all must be numeric values.
Note that the column with target value (which we would like to learn to predict) is inserted as the first argument.
**2.** Predicting
After saving a state into the table, we may use it multiple times for prediction, or even merge with other states and create new even better models.
After saving a state into the table, we may use it multiple times for prediction or even merge with other states and create new, even better models.
``` sql
WITH (SELECT state FROM your_model) AS model SELECT

View File

@ -780,8 +780,52 @@ If executed in the context of a distributed table, this function generates a nor
## version()
Returns the server version as a string.
If executed in the context of a distributed table, this function generates a normal column with values relevant to each shard. Otherwise it produces a constant value.
Returns the current version of ClickHouse as a string in the form of:
- Major version
- Minor version
- Patch version
- Number of commits since the previous stable release.
```plaintext
major_version.minor_version.patch_version.number_of_commits_since_the_previous_stable_release
```
If executed in the context of a distributed table, this function generates a normal column with values relevant to each shard. Otherwise, it produces a constant value.
**Syntax**
```sql
version()
```
**Arguments**
None.
**Returned value**
Type: [String](../data-types/string)
**Implementation details**
None.
**Example**
Query:
```sql
SELECT version()
```
**Result**:
```response
┌─version()─┐
│ 24.2.1.1 │
└───────────┘
```
## buildId()

View File

@ -176,7 +176,7 @@ INSERT INTO infile_globs FROM INFILE 'input_?.csv' FORMAT CSV;
```
:::
## Inserting into Table Function
## Inserting using a Table Function
Data can be inserted into tables referenced by [table functions](../../sql-reference/table-functions/index.md).
@ -204,7 +204,7 @@ Result:
└─────┴───────────────────────┘
```
## Inserts into ClickHouse Cloud
## Inserting into ClickHouse Cloud
By default, services on ClickHouse Cloud provide multiple replicas for high availability. When you connect to a service, a connection is established to one of these replicas.
@ -218,6 +218,12 @@ SELECT .... SETTINGS select_sequential_consistency = 1;
Note that using `select_sequential_consistency` will increase the load on ClickHouse Keeper (used by ClickHouse Cloud internally) and may result in slower performance depending on the load on the service. We recommend against enabling this setting unless necessary. The recommended approach is to execute read/writes in the same session or to use a client driver that uses the native protocol (and thus supports sticky connections).
## Inserting into a replicated setup
In a replicated setup, data will be visible on other replicas after it has been replicated. Data begins being replicated (downloaded on other replicas) immediately after an `INSERT`. This differs from ClickHouse Cloud, where data is immediately written to shared storage and replicas subscribe to metadata changes.
Note that for replicated setups, `INSERTs` can sometimes take a considerable amount of time (in the order of one second) as it requires committing to ClickHouse Keeper for distributed consensus. Using S3 for storage also adds additional latency.
## Performance Considerations
`INSERT` sorts the input data by primary key and splits them into partitions by a partition key. If you insert data into several partitions at once, it can significantly reduce the performance of the `INSERT` query. To avoid this:
@ -230,7 +236,15 @@ Performance will not decrease if:
- Data is added in real time.
- You upload data that is usually sorted by time.
It's also possible to asynchronously insert data in small but frequent inserts. The data from such insertions is combined into batches and then safely inserted into a table. To enable the asynchronous mode, switch on the [async_insert](../../operations/settings/settings.md#async-insert) setting. Note that asynchronous insertions are supported only over HTTP protocol, and deduplication is not supported for them.
### Asynchronous inserts
It is possible to asynchronously insert data in small but frequent inserts. The data from such insertions is combined into batches and then safely inserted into a table. To use asynchronous inserts, enable the [`async_insert`](../../operations/settings/settings.md#async-insert) setting.
Using `async_insert` or the [`Buffer` table engine](/en/engines/table-engines/special/buffer) results in additional buffering.
### Large or long-running inserts
When you are inserting large amounts of data, ClickHouse will optimize write performance through a process called "squashing". Small blocks of inserted data in memory are merged and squashed into larger blocks before being written to disk. Squashing reduces the overhead associated with each write operation. In this process, inserted data will be available to query after ClickHouse completes writing each [`max_insert_block_size`](/en/operations/settings/settings#max_insert_block_size) rows.
**See Also**

View File

@ -3258,7 +3258,7 @@ SELECT * FROM test2;
## allow_experimental_live_view {#allow-experimental-live-view}
Включает экспериментальную возможность использования [LIVE-представлений](../../sql-reference/statements/create/view.md#live-view).
Включает устаревшую возможность использования [LIVE-представлений](../../sql-reference/statements/create/view.md#live-view).
Возможные значения:
- 0 — живые представления не поддерживаются.
@ -3268,21 +3268,15 @@ SELECT * FROM test2;
## live_view_heartbeat_interval {#live-view-heartbeat-interval}
Задает интервал в секундах для периодической проверки существования [LIVE VIEW](../../sql-reference/statements/create/view.md#live-view).
Значение по умолчанию: `15`.
Устарело.
## max_live_view_insert_blocks_before_refresh {#max-live-view-insert-blocks-before-refresh}
Задает наибольшее число вставок, после которых запрос на формирование [LIVE VIEW](../../sql-reference/statements/create/view.md#live-view) исполняется снова.
Значение по умолчанию: `64`.
Устарело.
## periodic_live_view_refresh {#periodic-live-view-refresh}
Задает время в секундах, по истечении которого [LIVE VIEW](../../sql-reference/statements/create/view.md#live-view) с установленным автообновлением обновляется.
Значение по умолчанию: `60`.
Устарело.
## check_query_single_value_result {#check_query_single_value_result}

View File

@ -280,9 +280,6 @@ GRANT INSERT(x,y) ON db.table TO john
- `ALTER MOVE PARTITION`. 级别: `TABLE`. 别名: `ALTER MOVE PART`, `MOVE PARTITION`, `MOVE PART`
- `ALTER FETCH PARTITION`. 级别: `TABLE`. 别名: `FETCH PARTITION`
- `ALTER FREEZE PARTITION`. 级别: `TABLE`. 别名: `FREEZE PARTITION`
- `ALTER VIEW` 级别: `GROUP`
- `ALTER VIEW REFRESH`. 级别: `VIEW`. 别名: `ALTER LIVE VIEW REFRESH`, `REFRESH VIEW`
- `ALTER VIEW MODIFY QUERY`. 级别: `VIEW`. 别名: `ALTER TABLE MODIFY QUERY`
如何对待该层级的示例:
- `ALTER` 权限包含所有其它 `ALTER *` 的权限

View File

@ -1392,13 +1392,27 @@
<!-- <host_name>replica</host_name> -->
</distributed_ddl>
<!-- Settings to fine tune MergeTree tables. See documentation in source code, in MergeTreeSettings.h -->
<!-- Settings to fine-tune MergeTree tables. See documentation in source code, in MergeTreeSettings.h -->
<!--
<merge_tree>
<max_suspicious_broken_parts>5</max_suspicious_broken_parts>
</merge_tree>
-->
<!-- Settings to fine-tune ReplicatedMergeTree tables. See documentation in source code, in MergeTreeSettings.h -->
<!--
<replicated_merge_tree>
<max_replicated_fetches_network_bandwidth>1000000000</max_replicated_fetches_network_bandwidth>
</replicated_merge_tree>
-->
<!-- Settings to fine-tune Distributed tables. See documentation in source code, in DistributedSettings.h -->
<!--
<distributed>
<flush_on_detach>false</flush_on_detach>
</distributed>
-->
<!-- Protection from accidental DROP.
If size of a MergeTree table is greater than max_table_size_to_drop (in bytes) than table could not be dropped with any DROP query.
If you want do delete one table and don't want to change clickhouse-server config, you could create special file <clickhouse-path>/flags/force_drop_table and make DROP once.

View File

@ -80,13 +80,12 @@ enum class AccessType
M(ALTER_TABLE, "", GROUP, ALTER) \
M(ALTER_DATABASE, "", GROUP, ALTER) \
\
M(ALTER_VIEW_REFRESH, "ALTER LIVE VIEW REFRESH, REFRESH VIEW", VIEW, ALTER_VIEW) \
M(ALTER_VIEW_MODIFY_QUERY, "ALTER TABLE MODIFY QUERY", VIEW, ALTER_VIEW) \
M(ALTER_VIEW_MODIFY_REFRESH, "ALTER TABLE MODIFY QUERY", VIEW, ALTER_VIEW) \
M(ALTER_VIEW, "", GROUP, ALTER) /* allows to execute ALTER VIEW REFRESH, ALTER VIEW MODIFY QUERY, ALTER VIEW MODIFY REFRESH;
implicitly enabled by the grant ALTER_TABLE */\
\
M(ALTER, "", GROUP, ALL) /* allows to execute ALTER {TABLE|LIVE VIEW} */\
M(ALTER, "", GROUP, ALL) /* allows to execute ALTER TABLE */\
\
M(CREATE_DATABASE, "", DATABASE, CREATE) /* allows to execute {CREATE|ATTACH} DATABASE */\
M(CREATE_TABLE, "", TABLE, CREATE) /* allows to execute {CREATE|ATTACH} {TABLE|VIEW} */\

View File

@ -0,0 +1,439 @@
#include <cassert>
#include <memory>
#include <IO/WriteHelpers.h>
#include <IO/ReadHelpers.h>
#include <IO/ReadHelpersArena.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeString.h>
#include <Columns/ColumnArray.h>
#include <Common/HashTable/HashSet.h>
#include <Common/HashTable/HashTableKeyHolder.h>
#include <Common/assert_cast.h>
#include <AggregateFunctions/IAggregateFunction.h>
#include <AggregateFunctions/KeyHolderHelpers.h>
#include <Core/Field.h>
#include <AggregateFunctions/AggregateFunctionFactory.h>
#include <AggregateFunctions/Helpers.h>
#include <AggregateFunctions/FactoryHelpers.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDate32.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
namespace DB
{
struct Settings;
namespace ErrorCodes
{
extern const int ILLEGAL_TYPE_OF_ARGUMENT;
extern const int NUMBER_OF_ARGUMENTS_DOESNT_MATCH;
}
struct Settings;
template <typename T>
struct AggregateFunctionGroupArrayIntersectData
{
using Set = HashSet<T>;
Set value;
UInt64 version = 0;
};
/// Puts all values to the hash set. Returns an array of unique values. Implemented for numeric types.
template <typename T>
class AggregateFunctionGroupArrayIntersect
: public IAggregateFunctionDataHelper<AggregateFunctionGroupArrayIntersectData<T>, AggregateFunctionGroupArrayIntersect<T>>
{
private:
using State = AggregateFunctionGroupArrayIntersectData<T>;
public:
AggregateFunctionGroupArrayIntersect(const DataTypePtr & argument_type, const Array & parameters_)
: IAggregateFunctionDataHelper<AggregateFunctionGroupArrayIntersectData<T>,
AggregateFunctionGroupArrayIntersect<T>>({argument_type}, parameters_, argument_type) {}
AggregateFunctionGroupArrayIntersect(const DataTypePtr & argument_type, const Array & parameters_, const DataTypePtr & result_type_)
: IAggregateFunctionDataHelper<AggregateFunctionGroupArrayIntersectData<T>,
AggregateFunctionGroupArrayIntersect<T>>({argument_type}, parameters_, result_type_) {}
String getName() const override { return "GroupArrayIntersect"; }
bool allocatesMemoryInArena() const override { return false; }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
auto & version = this->data(place).version;
auto & set = this->data(place).value;
const auto data_column = assert_cast<const ColumnArray &>(*columns[0]).getDataPtr();
const auto & offsets = assert_cast<const ColumnArray &>(*columns[0]).getOffsets();
const size_t offset = offsets[static_cast<ssize_t>(row_num) - 1];
const auto arr_size = offsets[row_num] - offset;
++version;
if (version == 1)
{
for (size_t i = 0; i < arr_size; ++i)
set.insert(static_cast<T>((*data_column)[offset + i].get<T>()));
}
else if (!set.empty())
{
typename State::Set new_set;
for (size_t i = 0; i < arr_size; ++i)
{
typename State::Set::LookupResult set_value = set.find(static_cast<T>((*data_column)[offset + i].get<T>()));
if (set_value != nullptr)
new_set.insert(static_cast<T>((*data_column)[offset + i].get<T>()));
}
set = std::move(new_set);
}
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena *) const override
{
auto & set = this->data(place).value;
const auto & rhs_set = this->data(rhs).value;
if (this->data(rhs).version == 0)
return;
UInt64 version = this->data(place).version++;
if (version == 0)
{
for (auto & rhs_elem : rhs_set)
set.insert(rhs_elem.getValue());
return;
}
if (!set.empty())
{
auto create_new_set = [](auto & lhs_val, auto & rhs_val)
{
typename State::Set new_set;
for (auto & lhs_elem : lhs_val)
{
auto res = rhs_val.find(lhs_elem.getValue());
if (res != nullptr)
new_set.insert(lhs_elem.getValue());
}
return new_set;
};
auto new_set = rhs_set.size() < set.size() ? create_new_set(rhs_set, set) : create_new_set(set, rhs_set);
set = std::move(new_set);
}
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
auto & set = this->data(place).value;
auto version = this->data(place).version;
writeVarUInt(version, buf);
writeVarUInt(set.size(), buf);
for (const auto & elem : set)
writeIntBinary(elem.getValue(), buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena *) const override
{
readVarUInt(this->data(place).version, buf);
this->data(place).value.read(buf);
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
const auto & set = this->data(place).value;
offsets_to.push_back(offsets_to.back() + set.size());
typename ColumnVector<T>::Container & data_to = assert_cast<ColumnVector<T> &>(arr_to.getData()).getData();
size_t old_size = data_to.size();
data_to.resize(old_size + set.size());
size_t i = 0;
for (auto it = set.begin(); it != set.end(); ++it, ++i)
data_to[old_size + i] = it->getValue();
}
};
/// Generic implementation, it uses serialized representation as object descriptor.
struct AggregateFunctionGroupArrayIntersectGenericData
{
using Set = HashSet<StringRef>;
Set value;
UInt64 version = 0;
};
/** Template parameter with true value should be used for columns that store their elements in memory continuously.
* For such columns GroupArrayIntersect() can be implemented more efficiently (especially for small numeric arrays).
*/
template <bool is_plain_column = false>
class AggregateFunctionGroupArrayIntersectGeneric
: public IAggregateFunctionDataHelper<AggregateFunctionGroupArrayIntersectGenericData,
AggregateFunctionGroupArrayIntersectGeneric<is_plain_column>>
{
const DataTypePtr & input_data_type;
using State = AggregateFunctionGroupArrayIntersectGenericData;
public:
AggregateFunctionGroupArrayIntersectGeneric(const DataTypePtr & input_data_type_, const Array & parameters_)
: IAggregateFunctionDataHelper<AggregateFunctionGroupArrayIntersectGenericData, AggregateFunctionGroupArrayIntersectGeneric<is_plain_column>>({input_data_type_}, parameters_, input_data_type_)
, input_data_type(this->argument_types[0]) {}
AggregateFunctionGroupArrayIntersectGeneric(const DataTypePtr & input_data_type_, const Array & parameters_, const DataTypePtr & result_type_)
: IAggregateFunctionDataHelper<AggregateFunctionGroupArrayIntersectGenericData, AggregateFunctionGroupArrayIntersectGeneric<is_plain_column>>({input_data_type_}, parameters_, result_type_)
, input_data_type(result_type_) {}
String getName() const override { return "GroupArrayIntersect"; }
bool allocatesMemoryInArena() const override { return true; }
void add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena * arena) const override
{
auto & set = this->data(place).value;
auto & version = this->data(place).version;
bool inserted;
State::Set::LookupResult it;
const auto data_column = assert_cast<const ColumnArray &>(*columns[0]).getDataPtr();
const auto & offsets = assert_cast<const ColumnArray &>(*columns[0]).getOffsets();
const size_t offset = offsets[static_cast<ssize_t>(row_num) - 1];
const auto arr_size = offsets[row_num] - offset;
++version;
if (version == 1)
{
for (size_t i = 0; i < arr_size; ++i)
{
if constexpr (is_plain_column)
set.emplace(ArenaKeyHolder{data_column->getDataAt(offset + i), *arena}, it, inserted);
else
{
const char * begin = nullptr;
StringRef serialized = data_column->serializeValueIntoArena(offset + i, *arena, begin);
assert(serialized.data != nullptr);
set.emplace(SerializedKeyHolder{serialized, *arena}, it, inserted);
}
}
}
else if (!set.empty())
{
typename State::Set new_set;
for (size_t i = 0; i < arr_size; ++i)
{
if constexpr (is_plain_column)
{
it = set.find(data_column->getDataAt(offset + i));
if (it != nullptr)
new_set.emplace(ArenaKeyHolder{data_column->getDataAt(offset + i), *arena}, it, inserted);
}
else
{
const char * begin = nullptr;
StringRef serialized = data_column->serializeValueIntoArena(offset + i, *arena, begin);
assert(serialized.data != nullptr);
it = set.find(serialized);
if (it != nullptr)
new_set.emplace(SerializedKeyHolder{serialized, *arena}, it, inserted);
}
}
set = std::move(new_set);
}
}
void merge(AggregateDataPtr __restrict place, ConstAggregateDataPtr rhs, Arena * arena) const override
{
auto & set = this->data(place).value;
const auto & rhs_value = this->data(rhs).value;
if (this->data(rhs).version == 0)
return;
UInt64 version = this->data(place).version++;
if (version == 0)
{
bool inserted;
State::Set::LookupResult it;
for (auto & rhs_elem : rhs_value)
{
set.emplace(ArenaKeyHolder{rhs_elem.getValue(), *arena}, it, inserted);
}
}
else if (!set.empty())
{
auto create_new_map = [](auto & lhs_val, auto & rhs_val)
{
typename State::Set new_map;
for (auto & lhs_elem : lhs_val)
{
auto val = rhs_val.find(lhs_elem.getValue());
if (val != nullptr)
new_map.insert(lhs_elem.getValue());
}
return new_map;
};
auto new_map = rhs_value.size() < set.size() ? create_new_map(rhs_value, set) : create_new_map(set, rhs_value);
set = std::move(new_map);
}
}
void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & buf, std::optional<size_t> /* version */) const override
{
auto & set = this->data(place).value;
auto & version = this->data(place).version;
writeVarUInt(version, buf);
writeVarUInt(set.size(), buf);
for (const auto & elem : set)
writeStringBinary(elem.getValue(), buf);
}
void deserialize(AggregateDataPtr __restrict place, ReadBuffer & buf, std::optional<size_t> /* version */, Arena * arena) const override
{
auto & set = this->data(place).value;
auto & version = this->data(place).version;
size_t size;
readVarUInt(version, buf);
readVarUInt(size, buf);
set.reserve(size);
UInt64 elem_version;
for (size_t i = 0; i < size; ++i)
{
auto key = readStringBinaryInto(*arena, buf);
readVarUInt(elem_version, buf);
set.insert(key);
}
}
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
ColumnArray & arr_to = assert_cast<ColumnArray &>(to);
ColumnArray::Offsets & offsets_to = arr_to.getOffsets();
IColumn & data_to = arr_to.getData();
auto & set = this->data(place).value;
offsets_to.push_back(offsets_to.back() + set.size());
for (auto & elem : set)
{
if constexpr (is_plain_column)
data_to.insertData(elem.getValue().data, elem.getValue().size);
else
std::ignore = data_to.deserializeAndInsertFromArena(elem.getValue().data);
}
}
};
namespace
{
/// Substitute return type for Date and DateTime
class AggregateFunctionGroupArrayIntersectDate : public AggregateFunctionGroupArrayIntersect<DataTypeDate::FieldType>
{
public:
explicit AggregateFunctionGroupArrayIntersectDate(const DataTypePtr & argument_type, const Array & parameters_)
: AggregateFunctionGroupArrayIntersect<DataTypeDate::FieldType>(argument_type, parameters_, createResultType()) {}
static DataTypePtr createResultType() { return std::make_shared<DataTypeArray>(std::make_shared<DataTypeDate>()); }
};
class AggregateFunctionGroupArrayIntersectDateTime : public AggregateFunctionGroupArrayIntersect<DataTypeDateTime::FieldType>
{
public:
explicit AggregateFunctionGroupArrayIntersectDateTime(const DataTypePtr & argument_type, const Array & parameters_)
: AggregateFunctionGroupArrayIntersect<DataTypeDateTime::FieldType>(argument_type, parameters_, createResultType()) {}
static DataTypePtr createResultType() { return std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>()); }
};
class AggregateFunctionGroupArrayIntersectDate32 : public AggregateFunctionGroupArrayIntersect<DataTypeDate32::FieldType>
{
public:
explicit AggregateFunctionGroupArrayIntersectDate32(const DataTypePtr & argument_type, const Array & parameters_)
: AggregateFunctionGroupArrayIntersect<DataTypeDate32::FieldType>(argument_type, parameters_, createResultType()) {}
static DataTypePtr createResultType() { return std::make_shared<DataTypeArray>(std::make_shared<DataTypeDate32>()); }
};
IAggregateFunction * createWithExtraTypes(const DataTypePtr & argument_type, const Array & parameters)
{
WhichDataType which(argument_type);
if (which.idx == TypeIndex::Date) return new AggregateFunctionGroupArrayIntersectDate(argument_type, parameters);
else if (which.idx == TypeIndex::DateTime) return new AggregateFunctionGroupArrayIntersectDateTime(argument_type, parameters);
else if (which.idx == TypeIndex::Date32) return new AggregateFunctionGroupArrayIntersectDate32(argument_type, parameters);
else if (which.idx == TypeIndex::DateTime64)
{
const auto * datetime64_type = dynamic_cast<const DataTypeDateTime64 *>(argument_type.get());
const auto return_type = std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime64>(datetime64_type->getScale()));
return new AggregateFunctionGroupArrayIntersectGeneric<true>(argument_type, parameters, return_type);
}
else
{
/// Check that we can use plain version of AggregateFunctionGroupArrayIntersectGeneric
if (argument_type->isValueUnambiguouslyRepresentedInContiguousMemoryRegion())
return new AggregateFunctionGroupArrayIntersectGeneric<true>(argument_type, parameters);
else
return new AggregateFunctionGroupArrayIntersectGeneric<false>(argument_type, parameters);
}
}
inline AggregateFunctionPtr createAggregateFunctionGroupArrayIntersectImpl(const std::string & name, const DataTypePtr & argument_type, const Array & parameters)
{
const auto & nested_type = dynamic_cast<const DataTypeArray &>(*argument_type).getNestedType();
AggregateFunctionPtr res(createWithNumericType<AggregateFunctionGroupArrayIntersect, const DataTypePtr &>(*nested_type, argument_type, parameters));
if (!res)
{
res = AggregateFunctionPtr(createWithExtraTypes(argument_type, parameters));
}
if (!res)
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}",
argument_type->getName(), name);
return res;
}
AggregateFunctionPtr createAggregateFunctionGroupArrayIntersect(
const std::string & name, const DataTypes & argument_types, const Array & parameters, const Settings *)
{
assertUnary(name, argument_types);
if (!WhichDataType(argument_types.at(0)).isArray())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Aggregate function groupArrayIntersect accepts only array type argument.");
if (!parameters.empty())
throw Exception(ErrorCodes::NUMBER_OF_ARGUMENTS_DOESNT_MATCH,
"Incorrect number of parameters for aggregate function {}, should be 0", name);
return createAggregateFunctionGroupArrayIntersectImpl(name, argument_types[0], parameters);
}
}
void registerAggregateFunctionGroupArrayIntersect(AggregateFunctionFactory & factory)
{
AggregateFunctionProperties properties = { .returns_default_when_only_null = false, .is_order_dependent = true };
factory.registerFunction("groupArrayIntersect", { createAggregateFunctionGroupArrayIntersect, properties });
}
}

View File

@ -18,6 +18,7 @@ void registerAggregateFunctionGroupArray(AggregateFunctionFactory &);
void registerAggregateFunctionGroupArraySorted(AggregateFunctionFactory & factory);
void registerAggregateFunctionGroupUniqArray(AggregateFunctionFactory &);
void registerAggregateFunctionGroupArrayInsertAt(AggregateFunctionFactory &);
void registerAggregateFunctionGroupArrayIntersect(AggregateFunctionFactory &);
void registerAggregateFunctionsQuantile(AggregateFunctionFactory &);
void registerAggregateFunctionsQuantileDeterministic(AggregateFunctionFactory &);
void registerAggregateFunctionsQuantileExact(AggregateFunctionFactory &);
@ -116,6 +117,7 @@ void registerAggregateFunctions()
registerAggregateFunctionGroupArraySorted(factory);
registerAggregateFunctionGroupUniqArray(factory);
registerAggregateFunctionGroupArrayInsertAt(factory);
registerAggregateFunctionGroupArrayIntersect(factory);
registerAggregateFunctionsQuantile(factory);
registerAggregateFunctionsQuantileDeterministic(factory);
registerAggregateFunctionsQuantileExact(factory);

View File

@ -32,8 +32,6 @@ namespace ErrorCodes
M(UInt64, shard_num) \
M(UInt64, replica_num) \
M(Bool, check_parts) \
M(Bool, check_projection_parts) \
M(Bool, allow_backup_broken_projections) \
M(Bool, internal) \
M(String, host_id) \
M(OptionalUUID, backup_uuid)

View File

@ -62,12 +62,6 @@ struct BackupSettings
/// Check checksums of the data parts before writing them to a backup.
bool check_parts = true;
/// Check checksums of the projection data parts before writing them to a backup.
bool check_projection_parts = true;
/// Allow to create backup with broken projections.
bool allow_backup_broken_projections = false;
/// Internal, should not be specified by user.
/// Whether this backup is a part of a distributed backup created by BACKUP ON CLUSTER.
bool internal = false;

View File

@ -592,7 +592,6 @@
M(710, FAULT_INJECTED) \
M(711, FILECACHE_ACCESS_DENIED) \
M(712, TOO_MANY_MATERIALIZED_VIEWS) \
M(713, BROKEN_PROJECTION) \
M(714, UNEXPECTED_CLUSTER) \
M(715, CANNOT_DETECT_FORMAT) \
M(716, CANNOT_FORGET_PARTITION) \

View File

@ -36,7 +36,6 @@ static constexpr auto DEFAULT_BLOCK_SIZE
static constexpr auto DEFAULT_INSERT_BLOCK_SIZE
= 1048449; /// 1048576 - PADDING_FOR_SIMD - (PADDING_FOR_SIMD - 1) bytes padding that we usually have in arrays
static constexpr auto DEFAULT_PERIODIC_LIVE_VIEW_REFRESH_SEC = 60;
static constexpr auto SHOW_CHARS_ON_SYNTAX_ERROR = ptrdiff_t(160);
/// each period reduces the error counter by 2 times
/// too short a period can cause errors to disappear immediately after creation.

View File

@ -224,8 +224,8 @@ class IColumn;
M(Bool, allow_experimental_inverted_index, false, "If it is set to true, allow to use experimental inverted index.", 0) \
\
M(UInt64, mysql_max_rows_to_insert, 65536, "The maximum number of rows in MySQL batch insertion of the MySQL storage engine", 0) \
M(Bool, mysql_map_string_to_text_in_show_columns, false, "If enabled, String type will be mapped to TEXT in SHOW [FULL] COLUMNS, BLOB otherwise.", 0) \
M(Bool, mysql_map_fixed_string_to_text_in_show_columns, false, "If enabled, FixedString type will be mapped to TEXT in SHOW [FULL] COLUMNS, BLOB otherwise.", 0) \
M(Bool, mysql_map_string_to_text_in_show_columns, true, "If enabled, String type will be mapped to TEXT in SHOW [FULL] COLUMNS, BLOB otherwise. Has an effect only when the connection is made through the MySQL wire protocol.", 0) \
M(Bool, mysql_map_fixed_string_to_text_in_show_columns, true, "If enabled, FixedString type will be mapped to TEXT in SHOW [FULL] COLUMNS, BLOB otherwise. Has an effect only when the connection is made through the MySQL wire protocol.", 0) \
\
M(UInt64, optimize_min_equality_disjunction_chain_length, 3, "The minimum length of the expression `expr = x1 OR ... expr = xN` for optimization ", 0) \
M(UInt64, optimize_min_inequality_conjunction_chain_length, 3, "The minimum length of the expression `expr <> x1 AND ... expr <> xN` for optimization ", 0) \
@ -604,7 +604,7 @@ class IColumn;
M(Bool, validate_polygons, true, "Throw exception if polygon is invalid in function pointInPolygon (e.g. self-tangent, self-intersecting). If the setting is false, the function will accept invalid polygons but may silently return wrong result.", 0) \
M(UInt64, max_parser_depth, DBMS_DEFAULT_MAX_PARSER_DEPTH, "Maximum parser depth (recursion depth of recursive descend parser).", 0) \
M(Bool, allow_settings_after_format_in_insert, false, "Allow SETTINGS after FORMAT, but note, that this is not always safe (note: this is a compatibility setting).", 0) \
M(Seconds, periodic_live_view_refresh, DEFAULT_PERIODIC_LIVE_VIEW_REFRESH_SEC, "Interval after which periodically refreshed live view is forced to refresh.", 0) \
M(Seconds, periodic_live_view_refresh, 60, "Interval after which periodically refreshed live view is forced to refresh.", 0) \
M(Bool, transform_null_in, false, "If enabled, NULL values will be matched with 'IN' operator as if they are considered equal.", 0) \
M(Bool, allow_nondeterministic_mutations, false, "Allow non-deterministic functions in ALTER UPDATE/ALTER DELETE statements", 0) \
M(Seconds, lock_acquire_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "How long locking request should wait before failing", 0) \
@ -719,6 +719,7 @@ class IColumn;
M(Bool, query_plan_split_filter, true, "Allow to split filters in the query plan", 0) \
M(Bool, query_plan_merge_expressions, true, "Allow to merge expressions in the query plan", 0) \
M(Bool, query_plan_filter_push_down, true, "Allow to push down filter by predicate query plan step", 0) \
M(Bool, query_plan_optimize_prewhere, true, "Allow to push down filter to PREWHERE expression for supported storages", 0) \
M(Bool, query_plan_execute_functions_after_sorting, true, "Allow to re-order functions after sorting", 0) \
M(Bool, query_plan_reuse_storage_ordering_for_window_functions, true, "Allow to use the storage sorting for window functions", 0) \
M(Bool, query_plan_lift_up_union, true, "Allow to move UNIONs up so that more parts of the query plan can be optimized", 0) \
@ -841,6 +842,7 @@ class IColumn;
M(Bool, use_with_fill_by_sorting_prefix, true, "Columns preceding WITH FILL columns in ORDER BY clause form sorting prefix. Rows with different values in sorting prefix are filled independently", 0) \
M(Bool, optimize_uniq_to_count, true, "Rewrite uniq and its variants(except uniqUpTo) to count if subquery has distinct or group by clause.", 0) \
M(Bool, use_variant_as_common_type, false, "Use Variant as a result type for if/multiIf in case when there is no common type for arguments", 0) \
M(Bool, enable_order_by_all, true, "Enable sorting expression ORDER BY ALL.", 0) \
\
/** Experimental functions */ \
M(Bool, allow_experimental_materialized_postgresql_table, false, "Allows to use the MaterializedPostgreSQL table engine. Disabled by default, because this feature is experimental", 0) \
@ -872,7 +874,6 @@ class IColumn;
M(UInt64, cache_warmer_threads, 4, "Only available in ClickHouse Cloud", 0) \
M(Int64, ignore_cold_parts_seconds, 0, "Only available in ClickHouse Cloud", 0) \
M(Int64, prefer_warmed_unmerged_parts_seconds, 0, "Only available in ClickHouse Cloud", 0) \
M(Bool, enable_order_by_all, true, "Enable sorting expression ORDER BY ALL.", 0) \
M(Bool, iceberg_engine_ignore_schema_evolution, false, "Ignore schema evolution in Iceberg table engine and read all data using latest schema saved on table creation. Note that it can lead to incorrect result", 0) \
// End of COMMON_SETTINGS

View File

@ -78,7 +78,8 @@ namespace SettingsChangesHistory
/// History of settings changes that controls some backward incompatible changes
/// across all ClickHouse versions. It maps ClickHouse version to settings changes that were done
/// in this version. This history contains both changes to existing settings and newly added settings.
/// Settings changes is a vector of structs {setting_name, previous_value, new_value}.
/// Settings changes is a vector of structs
/// {setting_name, previous_value, new_value, reason}.
/// For newly added setting choose the most appropriate previous_value (for example, if new setting
/// controls new feature and it's 'true' by default, use 'false' as previous_value).
/// It's used to implement `compatibility` setting (see https://github.com/ClickHouse/ClickHouse/issues/35972)
@ -87,6 +88,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"24.2", {
{"output_format_values_escape_quote_with_quote", false, false, "If true escape ' with '', otherwise quoted with \\'"},
{"input_format_try_infer_exponent_floats", true, false, "Don't infer floats in exponential notation by default"},
{"query_plan_optimize_prewhere", true, true, "Allow to push down filter to PREWHERE expression for supported storages"},
{"async_insert_max_data_size", 1000000, 10485760, "The previous value appeared to be too small."},
{"async_insert_poll_timeout_ms", 10, 10, "Timeout in milliseconds for polling data from asynchronous insert queue"},
{"async_insert_use_adaptive_busy_timeout", true, true, "Use adaptive asynchronous insert timeout"},
@ -103,6 +105,8 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"min_external_table_block_size_bytes", DEFAULT_INSERT_BLOCK_SIZE * 256, DEFAULT_INSERT_BLOCK_SIZE * 256, "Squash blocks passed to external table to specified size in bytes, if blocks are not big enough."},
{"parallel_replicas_prefer_local_join", true, true, "If true, and JOIN can be executed with parallel replicas algorithm, and all storages of right JOIN part are *MergeTree, local JOIN will be used instead of GLOBAL JOIN."},
{"extract_key_value_pairs_max_pairs_per_row", 0, 0, "Max number of pairs that can be produced by the `extractKeyValuePairs` function. Used as a safeguard against consuming too much memory."},
{"mysql_map_string_to_text_in_show_columns", false, true, "Reduce the configuration effort to connect ClickHouse with BI tools."},
{"mysql_map_fixed_string_to_text_in_show_columns", false, true, "Reduce the configuration effort to connect ClickHouse with BI tools."},
}},
{"24.1", {{"print_pretty_type_names", false, true, "Better user experience."},
{"input_format_json_read_bools_as_strings", false, true, "Allow to read bools as strings in JSON formats by default"},

View File

@ -45,7 +45,7 @@ public:
bool isVariadic() const override { return true; }
size_t getNumberOfArguments() const override { return 0; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return false; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const ColumnsWithTypeAndName & arguments) const override
{

View File

@ -8,7 +8,7 @@
#include <memory>
#include <boost/algorithm/string.hpp>
#include <boost/algorithm/string/split.hpp>
#include <Poco/URI.h>
@ -45,7 +45,7 @@ String getSSEAndSignedHeaders(const Poco::Net::MessageHeader & message_header)
String content;
for (const auto & [header_name, header_value] : message_header)
{
if (boost::algorithm::starts_with(header_name, "x-amz-server-side-encryption"))
if (header_name.starts_with("x-amz-server-side-encryption"))
{
content += header_name + ": " + header_value + "\n";
}
@ -55,7 +55,7 @@ String getSSEAndSignedHeaders(const Poco::Net::MessageHeader & message_header)
boost::split(parts, header_value, [](char c){ return c == ' '; });
for (const auto & part : parts)
{
if (boost::algorithm::starts_with(part, "SignedHeaders="))
if (part.starts_with("SignedHeaders="))
content += header_name + ": ... " + part + " ...\n";
}
}

View File

@ -29,6 +29,7 @@
#include <Storages/MergeTree/ReplicatedFetchList.h>
#include <Storages/MergeTree/MergeTreeData.h>
#include <Storages/MergeTree/MergeTreeSettings.h>
#include <Storages/Distributed/DistributedSettings.h>
#include <Storages/CompressionCodecSelector.h>
#include <Storages/StorageS3Settings.h>
#include <Disks/DiskLocal.h>
@ -112,6 +113,7 @@
#include <Parsers/FunctionParameterValuesVisitor.h>
#include <Parsers/ASTSelectWithUnionQuery.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <base/defines.h>
namespace fs = std::filesystem;
@ -353,6 +355,7 @@ struct ContextSharedPart : boost::noncopyable
std::optional<MergeTreeSettings> merge_tree_settings TSA_GUARDED_BY(mutex); /// Settings of MergeTree* engines.
std::optional<MergeTreeSettings> replicated_merge_tree_settings TSA_GUARDED_BY(mutex); /// Settings of ReplicatedMergeTree* engines.
std::optional<DistributedSettings> distributed_settings TSA_GUARDED_BY(mutex);
std::atomic_size_t max_table_size_to_drop = 50000000000lu; /// Protects MergeTree tables from accidental DROP (50GB by default)
std::atomic_size_t max_partition_size_to_drop = 50000000000lu; /// Protects MergeTree partitions from accidental DROP (50GB by default)
/// No lock required for format_schema_path modified only during initialization
@ -4118,6 +4121,21 @@ const MergeTreeSettings & Context::getReplicatedMergeTreeSettings() const
return *shared->replicated_merge_tree_settings;
}
const DistributedSettings & Context::getDistributedSettings() const
{
std::lock_guard lock(shared->mutex);
if (!shared->distributed_settings)
{
const auto & config = shared->getConfigRefWithLock(lock);
DistributedSettings distributed_settings;
distributed_settings.loadFromConfig("distributed", config);
shared->distributed_settings.emplace(distributed_settings);
}
return *shared->distributed_settings;
}
const StorageS3Settings & Context::getStorageS3Settings() const
{
std::lock_guard lock(shared->mutex);

View File

@ -113,6 +113,7 @@ class BlobStorageLog;
class IAsynchronousReader;
class IOUringReader;
struct MergeTreeSettings;
struct DistributedSettings;
struct InitialAllRangesAnnouncement;
struct ParallelReadRequest;
struct ParallelReadResponse;
@ -1075,6 +1076,7 @@ public:
const MergeTreeSettings & getMergeTreeSettings() const;
const MergeTreeSettings & getReplicatedMergeTreeSettings() const;
const DistributedSettings & getDistributedSettings() const;
const StorageS3Settings & getStorageS3Settings() const;
/// Prevents DROP TABLE if its size is greater than max_size (50GB by default, max_size=0 turn off this check)

View File

@ -60,8 +60,7 @@ BlockIO InterpreterAlterQuery::execute()
{
return executeToDatabase(alter);
}
else if (alter.alter_object == ASTAlterQuery::AlterObjectType::TABLE
|| alter.alter_object == ASTAlterQuery::AlterObjectType::LIVE_VIEW)
else if (alter.alter_object == ASTAlterQuery::AlterObjectType::TABLE)
{
return executeToTable(alter);
}
@ -467,11 +466,6 @@ AccessRightsElements InterpreterAlterQuery::getRequiredAccessForCommand(const AS
required_access.emplace_back(AccessType::ALTER_VIEW_MODIFY_REFRESH, database, table);
break;
}
case ASTAlterCommand::LIVE_VIEW_REFRESH:
{
required_access.emplace_back(AccessType::ALTER_VIEW_REFRESH, database, table);
break;
}
case ASTAlterCommand::RENAME_COLUMN:
{
required_access.emplace_back(AccessType::ALTER_RENAME_COLUMN, database, table, column_name());

View File

@ -604,7 +604,7 @@ InterpreterSelectQuery::InterpreterSelectQuery(
query.setFinal();
}
auto analyze = [&] ()
auto analyze = [&] (bool try_move_to_prewhere)
{
/// Allow push down and other optimizations for VIEW: replace with subquery and rewrite it.
ASTPtr view_table;
@ -635,6 +635,38 @@ InterpreterSelectQuery::InterpreterSelectQuery(
view = nullptr;
}
if (try_move_to_prewhere
&& storage && storage->canMoveConditionsToPrewhere()
&& query.where() && !query.prewhere()
&& !query.hasJoin()) /// Join may produce rows with nulls or default values, it's difficult to analyze if they affected or not.
{
/// PREWHERE optimization: transfer some condition from WHERE to PREWHERE if enabled and viable
if (const auto & column_sizes = storage->getColumnSizes(); !column_sizes.empty())
{
/// Extract column compressed sizes.
std::unordered_map<std::string, UInt64> column_compressed_sizes;
for (const auto & [name, sizes] : column_sizes)
column_compressed_sizes[name] = sizes.data_compressed;
SelectQueryInfo current_info;
current_info.query = query_ptr;
current_info.syntax_analyzer_result = syntax_analyzer_result;
Names queried_columns = syntax_analyzer_result->requiredSourceColumns();
const auto & supported_prewhere_columns = storage->supportedPrewhereColumns();
MergeTreeWhereOptimizer where_optimizer{
std::move(column_compressed_sizes),
metadata_snapshot,
storage->getConditionEstimatorByPredicate(query_info, storage_snapshot, context),
queried_columns,
supported_prewhere_columns,
log};
where_optimizer.optimize(current_info, context);
}
}
if (query.prewhere() && query.where())
{
/// Filter block in WHERE instead to get better performance
@ -748,7 +780,8 @@ InterpreterSelectQuery::InterpreterSelectQuery(
result_header = getSampleBlockImpl();
};
analyze();
/// Conditionally support AST-based PREWHERE optimization.
analyze(shouldMoveToPrewhere() && (!settings.query_plan_optimize_prewhere || !settings.query_plan_enable_optimizations));
bool need_analyze_again = false;
bool can_analyze_again = false;
@ -792,7 +825,9 @@ InterpreterSelectQuery::InterpreterSelectQuery(
/// Reuse already built sets for multiple passes of analysis
prepared_sets = query_analyzer->getPreparedSets();
analyze();
/// Do not try move conditions to PREWHERE for the second time.
/// Otherwise, we won't be able to fallback from inefficient PREWHERE to WHERE later.
analyze(/* try_move_to_prewhere = */ false);
}
/// If there is no WHERE, filter blocks as usual
@ -2491,7 +2526,8 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
}
else if (storage)
{
if (typeid_cast<const StorageMerge *>(storage.get()))
if (shouldMoveToPrewhere() && settings.query_plan_optimize_prewhere && settings.query_plan_enable_optimizations
&& typeid_cast<const StorageMerge *>(storage.get()))
collectFiltersForAnalysis(query_ptr, context, storage_snapshot, options, query_info);
/// Table.

View File

@ -342,11 +342,6 @@ bool MutationsInterpreter::Source::hasProjection(const String & name) const
return part && part->hasProjection(name);
}
bool MutationsInterpreter::Source::hasBrokenProjection(const String & name) const
{
return part && part->hasBrokenProjection(name);
}
bool MutationsInterpreter::Source::isCompactPart() const
{
return part && part->getType() == MergeTreeDataPartType::Compact;
@ -812,7 +807,7 @@ void MutationsInterpreter::prepare(bool dry_run)
{
mutation_kind.set(MutationKind::MUTATE_INDEX_STATISTIC_PROJECTION);
const auto & projection = projections_desc.get(command.projection_name);
if (!source.hasProjection(projection.name) || source.hasBrokenProjection(projection.name))
if (!source.hasProjection(projection.name))
{
for (const auto & column : projection.required_columns)
dependencies.emplace(column, ColumnDependency::PROJECTION);
@ -999,13 +994,6 @@ void MutationsInterpreter::prepare(bool dry_run)
if (!source.hasProjection(projection.name))
continue;
/// Always rebuild broken projections.
if (source.hasBrokenProjection(projection.name))
{
materialized_projections.insert(projection.name);
continue;
}
if (need_rebuild_projections)
{
materialized_projections.insert(projection.name);

View File

@ -126,7 +126,6 @@ public:
bool materializeTTLRecalculateOnly() const;
bool hasSecondaryIndex(const String & name) const;
bool hasProjection(const String & name) const;
bool hasBrokenProjection(const String & name) const;
bool isCompactPart() const;
void read(

View File

@ -466,10 +466,6 @@ void ASTAlterCommand::formatImpl(const FormatSettings & settings, FormatState &
<< (settings.hilite ? hilite_none : "");
refresh->formatImpl(settings, state, frame);
}
else if (type == ASTAlterCommand::LIVE_VIEW_REFRESH)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "REFRESH " << (settings.hilite ? hilite_none : "");
}
else if (type == ASTAlterCommand::RENAME_COLUMN)
{
settings.ostr << (settings.hilite ? hilite_keyword : "") << "RENAME COLUMN " << (if_exists ? "IF EXISTS " : "")
@ -621,9 +617,6 @@ void ASTAlterQuery::formatQueryImpl(const FormatSettings & settings, FormatState
case AlterObjectType::DATABASE:
settings.ostr << "ALTER DATABASE ";
break;
case AlterObjectType::LIVE_VIEW:
settings.ostr << "ALTER LIVE VIEW ";
break;
default:
break;
}

View File

@ -17,8 +17,6 @@ namespace DB
* MODIFY COLUMN col_name type,
* DROP PARTITION partition,
* COMMENT_COLUMN col_name 'comment',
* ALTER LIVE VIEW [db.]name_type
* REFRESH
*/
class ASTAlterCommand : public IAST
@ -79,8 +77,6 @@ public:
NO_TYPE,
LIVE_VIEW_REFRESH,
MODIFY_DATABASE_SETTING,
MODIFY_COMMENT,
@ -242,7 +238,6 @@ public:
{
TABLE,
DATABASE,
LIVE_VIEW,
UNKNOWN,
};

View File

@ -63,9 +63,6 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
ParserKeyword s_add("ADD");
ParserKeyword s_drop("DROP");
ParserKeyword s_suspend("SUSPEND");
ParserKeyword s_resume("RESUME");
ParserKeyword s_refresh("REFRESH");
ParserKeyword s_modify("MODIFY");
ParserKeyword s_attach_partition("ATTACH PARTITION");
@ -175,16 +172,6 @@ bool ParserAlterCommand::parseImpl(Pos & pos, ASTPtr & node, Expected & expected
switch (alter_object)
{
case ASTAlterQuery::AlterObjectType::LIVE_VIEW:
{
if (s_refresh.ignore(pos, expected))
{
command->type = ASTAlterCommand::LIVE_VIEW_REFRESH;
}
else
return false;
break;
}
case ASTAlterQuery::AlterObjectType::DATABASE:
{
if (s_modify_setting.ignore(pos, expected))
@ -986,7 +973,6 @@ bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
ParserKeyword s_alter_table("ALTER TABLE");
ParserKeyword s_alter_temporary_table("ALTER TEMPORARY TABLE");
ParserKeyword s_alter_live_view("ALTER LIVE VIEW");
ParserKeyword s_alter_database("ALTER DATABASE");
ASTAlterQuery::AlterObjectType alter_object_type;
@ -995,10 +981,6 @@ bool ParserAlterQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
alter_object_type = ASTAlterQuery::AlterObjectType::TABLE;
}
else if (s_alter_live_view.ignore(pos, expected))
{
alter_object_type = ASTAlterQuery::AlterObjectType::LIVE_VIEW;
}
else if (s_alter_database.ignore(pos, expected))
{
alter_object_type = ASTAlterQuery::AlterObjectType::DATABASE;

View File

@ -28,8 +28,6 @@ namespace DB
* [DROP INDEX [IF EXISTS] index_name]
* [CLEAR INDEX [IF EXISTS] index_name IN PARTITION partition]
* [MATERIALIZE INDEX [IF EXISTS] index_name [IN PARTITION partition]]
* ALTER LIVE VIEW [db.name]
* [REFRESH]
*/
class ParserAlterQuery : public IParserBase

View File

@ -890,7 +890,7 @@ bool ParserCreateLiveViewQuery::parseImpl(Pos & pos, ASTPtr & node, Expected & e
if (ParserKeyword{"REFRESH"}.ignore(pos, expected) || ParserKeyword{"PERIODIC REFRESH"}.ignore(pos, expected))
{
if (!ParserNumber{}.parse(pos, live_view_periodic_refresh, expected))
live_view_periodic_refresh = std::make_shared<ASTLiteral>(static_cast<UInt64>(DEFAULT_PERIODIC_LIVE_VIEW_REFRESH_SEC));
live_view_periodic_refresh = std::make_shared<ASTLiteral>(static_cast<UInt64>(60));
with_periodic_refresh = true;
}

View File

@ -22,6 +22,8 @@ QueryPlanOptimizationSettings QueryPlanOptimizationSettings::fromSettings(const
settings.filter_push_down = from.query_plan_enable_optimizations && from.query_plan_filter_push_down;
settings.optimize_prewhere = from.query_plan_enable_optimizations && from.query_plan_optimize_prewhere;
settings.execute_functions_after_sorting = from.query_plan_enable_optimizations && from.query_plan_execute_functions_after_sorting;
settings.reuse_storage_ordering_for_window_functions = from.query_plan_enable_optimizations && from.query_plan_reuse_storage_ordering_for_window_functions;

View File

@ -61,6 +61,8 @@ struct QueryPlanOptimizationSettings
/// If remove-redundant-distinct-steps optimization is enabled.
bool remove_redundant_distinct = true;
bool optimize_prewhere = true;
/// If reading from projection can be applied
bool optimize_projection = false;
bool force_use_projection = false;

View File

@ -118,7 +118,8 @@ void optimizeTreeSecondPass(const QueryPlanOptimizationSettings & optimization_s
/// NOTE: optimizePrewhere can modify the stack.
/// Prewhere optimization relies on PK optimization (getConditionEstimatorByPredicate)
optimizePrewhere(stack, nodes);
if (optimization_settings.optimize_prewhere)
optimizePrewhere(stack, nodes);
auto & frame = stack.back();

View File

@ -223,7 +223,7 @@ bool analyzeProjectionCandidate(
{
const auto & created_projections = part_with_ranges.data_part->getProjectionParts();
auto it = created_projections.find(candidate.projection->name);
if (it != created_projections.end() && !it->second->is_broken)
if (it != created_projections.end())
{
projection_parts.push_back(it->second);
}

View File

@ -548,7 +548,6 @@ Chain buildPushingToViewsChain(
result_chain.addSource(std::move(sink));
}
/// TODO: add pushing to live view
if (result_chain.empty())
result_chain.addSink(std::make_shared<NullSinkToStorage>(storage_header));

View File

@ -461,6 +461,12 @@ void MySQLHandler::comQuery(ReadBuffer & payload, bool binary_protocol)
auto query_context = session->makeQueryContext();
query_context->setCurrentQueryId(fmt::format("mysql:{}:{}", connection_id, toString(UUIDHelpers::generateV4())));
/// --- Workaround for Bug 56173. Can be removed when the analyzer is on by default.
auto settings = query_context->getSettings();
settings.prefer_column_name_to_alias = true;
query_context->setSettings(settings);
CurrentThread::QueryScope query_scope{query_context};
std::atomic<size_t> affected_rows {0};

View File

@ -15,6 +15,27 @@ namespace ErrorCodes
IMPLEMENT_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS)
void DistributedSettings::loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config)
{
if (!config.has(config_elem))
return;
Poco::Util::AbstractConfiguration::Keys config_keys;
config.keys(config_elem, config_keys);
try
{
for (const String & key : config_keys)
set(key, config.getString(config_elem + "." + key));
}
catch (Exception & e)
{
if (e.code() == ErrorCodes::UNKNOWN_SETTING)
e.addMessage("in Distributed config");
throw;
}
}
void DistributedSettings::loadFromQuery(ASTStorage & storage_def)
{
if (storage_def.settings)

View File

@ -37,6 +37,7 @@ DECLARE_SETTINGS_TRAITS(DistributedSettingsTraits, LIST_OF_DISTRIBUTED_SETTINGS)
*/
struct DistributedSettings : public BaseSettings<DistributedSettingsTraits>
{
void loadFromConfig(const String & config_elem, const Poco::Util::AbstractConfiguration & config);
void loadFromQuery(ASTStorage & storage_def);
};

View File

@ -335,9 +335,7 @@ void DataPartStorageOnDiskBase::backup(
const ReadSettings & read_settings,
bool make_temporary_hard_links,
BackupEntries & backup_entries,
TemporaryFilesOnDisks * temp_dirs,
bool is_projection_part,
bool allow_backup_broken_projection) const
TemporaryFilesOnDisks * temp_dirs) const
{
fs::path part_path_on_disk = fs::path{root_path} / part_dir;
fs::path part_path_in_backup = fs::path{path_in_backup} / part_dir;
@ -379,7 +377,7 @@ void DataPartStorageOnDiskBase::backup(
bool copy_encrypted = !backup_settings.decrypt_files_from_encrypted_disks;
auto backup_file = [&](const String & filepath)
for (const auto & filepath : files_to_backup)
{
auto filepath_on_disk = part_path_on_disk / filepath;
auto filepath_in_backup = part_path_in_backup / filepath;
@ -387,10 +385,8 @@ void DataPartStorageOnDiskBase::backup(
if (files_without_checksums.contains(filepath))
{
backup_entries.emplace_back(filepath_in_backup, std::make_unique<BackupEntryFromSmallFile>(disk, filepath_on_disk, read_settings, copy_encrypted));
return;
continue;
}
else if (is_projection_part && allow_backup_broken_projection && !disk->exists(filepath_on_disk))
return;
if (make_temporary_hard_links)
{
@ -415,31 +411,6 @@ void DataPartStorageOnDiskBase::backup(
backup_entry = wrapBackupEntryWith(std::move(backup_entry), temp_dir_owner);
backup_entries.emplace_back(filepath_in_backup, std::move(backup_entry));
};
auto * log = &Poco::Logger::get("DataPartStorageOnDiskBase::backup");
for (const auto & filepath : files_to_backup)
{
if (is_projection_part && allow_backup_broken_projection)
{
try
{
backup_file(filepath);
}
catch (Exception & e)
{
if (e.code() != ErrorCodes::FILE_DOESNT_EXIST)
throw;
LOG_ERROR(log, "Cannot backup file {} of projection part {}. Will try to ignore it", filepath, part_dir);
continue;
}
}
else
{
backup_file(filepath);
}
}
}

View File

@ -58,9 +58,7 @@ public:
const ReadSettings & read_settings,
bool make_temporary_hard_links,
BackupEntries & backup_entries,
TemporaryFilesOnDisks * temp_dirs,
bool is_projection_part,
bool allow_backup_broken_projection) const override;
TemporaryFilesOnDisks * temp_dirs) const override;
MutableDataPartStoragePtr freeze(
const std::string & to,

View File

@ -223,9 +223,7 @@ public:
const ReadSettings & read_settings,
bool make_temporary_hard_links,
BackupEntries & backup_entries,
TemporaryFilesOnDisks * temp_dirs,
bool is_projection_part,
bool allow_backup_broken_projection) const = 0;
TemporaryFilesOnDisks * temp_dirs) const = 0;
/// Creates hardlinks into 'to/dir_path' for every file in data part.
/// Callback is called after hardlinks are created, but before 'delete-on-destroy.txt' marker is removed.

View File

@ -699,14 +699,13 @@ void IMergeTreeDataPart::loadColumnsChecksumsIndexes(bool require_columns_checks
calculateColumnsAndSecondaryIndicesSizesOnDisk();
loadRowsCount(); /// Must be called after loadIndexGranularity() as it uses the value of `index_granularity`.
loadPartitionAndMinMaxIndex();
bool has_broken_projections = false;
if (!parent_part)
{
loadTTLInfos();
loadProjections(require_columns_checksums, check_consistency, has_broken_projections, false /* if_not_loaded */);
loadProjections(require_columns_checksums, check_consistency, false /* if_not_loaded */);
}
if (check_consistency && !has_broken_projections)
if (check_consistency)
checkConsistency(require_columns_checksums);
loadDefaultCompressionCodec();
@ -771,7 +770,7 @@ void IMergeTreeDataPart::addProjectionPart(
projection_parts[projection_name] = std::move(projection_part);
}
void IMergeTreeDataPart::loadProjections(bool require_columns_checksums, bool check_consistency, bool & has_broken_projection, bool if_not_loaded)
void IMergeTreeDataPart::loadProjections(bool require_columns_checksums, bool check_consistency, bool if_not_loaded)
{
auto metadata_snapshot = storage.getInMemoryMetadataPtr();
for (const auto & projection : metadata_snapshot->projections)
@ -788,34 +787,10 @@ void IMergeTreeDataPart::loadProjections(bool require_columns_checksums, bool ch
else
{
auto part = getProjectionPartBuilder(projection.name).withPartFormatFromDisk().build();
try
{
part->loadColumnsChecksumsIndexes(require_columns_checksums, check_consistency);
}
catch (...)
{
if (isRetryableException(std::current_exception()))
throw;
auto message = getCurrentExceptionMessage(true);
LOG_ERROR(&Poco::Logger::get("IMergeTreeDataPart"),
"Cannot load projection {}, will consider it broken. Reason: {}", projection.name, message);
has_broken_projection = true;
part->setBrokenReason(message, getCurrentExceptionCode());
}
part->loadColumnsChecksumsIndexes(require_columns_checksums, check_consistency);
addProjectionPart(projection.name, std::move(part));
}
}
else if (checksums.has(path))
{
auto part = getProjectionPartBuilder(projection.name).withPartFormatFromDisk().build();
part->setBrokenReason("Projection directory " + path + " does not exist while loading projections", ErrorCodes::NO_FILE_IN_DATA_PART);
addProjectionPart(projection.name, std::move(part));
has_broken_projection = true;
}
}
}
@ -1214,8 +1189,7 @@ void IMergeTreeDataPart::loadChecksums(bool require)
/// Check the data while we are at it.
LOG_WARNING(storage.log, "Checksums for part {} not found. Will calculate them from data on disk.", name);
bool noop;
checksums = checkDataPart(shared_from_this(), false, noop, /* is_cancelled */[]{ return false; }, /* throw_on_broken_projection */false);
checksums = checkDataPart(shared_from_this(), false);
writeChecksums(checksums, {});
bytes_on_disk = checksums.getTotalSizeOnDisk();
@ -2222,32 +2196,6 @@ std::optional<String> IMergeTreeDataPart::getStreamNameForColumn(
return getStreamNameOrHash(stream_name, extension, storage_);
}
void IMergeTreeDataPart::markProjectionPartAsBroken(const String & projection_name, const String & message, int code) const
{
auto it = projection_parts.find(projection_name);
if (it == projection_parts.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no projection part '{}'", projection_name);
it->second->setBrokenReason(message, code);
}
bool IMergeTreeDataPart::hasBrokenProjection(const String & projection_name) const
{
auto it = projection_parts.find(projection_name);
if (it == projection_parts.end())
return false;
return it->second->is_broken;
}
void IMergeTreeDataPart::setBrokenReason(const String & message, int code) const
{
std::lock_guard lock(broken_reason_mutex);
if (is_broken)
return;
is_broken = true;
exception = message;
exception_code = code;
}
bool isCompactPart(const MergeTreeDataPartPtr & data_part)
{
return (data_part && data_part->getType() == MergeTreeDataPartType::Compact);

View File

@ -259,12 +259,6 @@ public:
/// Frozen by ALTER TABLE ... FREEZE ... It is used for information purposes in system.parts table.
mutable std::atomic<bool> is_frozen {false};
/// If it is a projection part, it can be broken sometimes.
mutable std::atomic<bool> is_broken {false};
mutable std::string exception;
mutable int exception_code = 0;
mutable std::mutex broken_reason_mutex;
/// Indicates that the part was marked Outdated by PartCheckThread because the part was not committed to ZooKeeper
mutable bool is_unexpected_local_part = false;
@ -424,16 +418,9 @@ public:
void addProjectionPart(const String & projection_name, std::shared_ptr<IMergeTreeDataPart> && projection_part);
void markProjectionPartAsBroken(const String & projection_name, const String & message, int code) const;
bool hasProjection(const String & projection_name) const { return projection_parts.contains(projection_name); }
bool hasBrokenProjection(const String & projection_name) const;
/// Return true, if all projections were loaded successfully and none was marked as broken.
void loadProjections(bool require_columns_checksums, bool check_consistency, bool & has_broken_projection, bool if_not_loaded = false);
void setBrokenReason(const String & message, int code) const;
void loadProjections(bool require_columns_checksums, bool check_consistency, bool if_not_loaded = false);
/// Return set of metadata file names without checksums. For example,
/// columns.txt or checksums.txt itself.
@ -593,7 +580,7 @@ protected:
const IMergeTreeDataPart * parent_part;
String parent_part_name;
mutable std::map<String, std::shared_ptr<IMergeTreeDataPart>> projection_parts;
std::map<String, std::shared_ptr<IMergeTreeDataPart>> projection_parts;
mutable PartMetadataManagerPtr metadata_manager;

View File

@ -731,9 +731,8 @@ bool MergeTask::MergeProjectionsStage::mergeMinMaxIndexAndPrepareProjections() c
MergeTreeData::DataPartsVector projection_parts;
for (const auto & part : global_ctx->future_part->parts)
{
auto actual_projection_parts = part->getProjectionParts();
auto it = actual_projection_parts.find(projection.name);
if (it != actual_projection_parts.end() && !it->second->is_broken)
auto it = part->getProjectionParts().find(projection.name);
if (it != part->getProjectionParts().end())
projection_parts.push_back(it->second);
}
if (projection_parts.size() < global_ctx->future_part->parts.size())

View File

@ -5311,7 +5311,7 @@ MergeTreeData::PartsBackupEntries MergeTreeData::backupParts(
if (hold_table_lock && !table_lock)
table_lock = lockForShare(local_context->getCurrentQueryId(), local_context->getSettingsRef().lock_acquire_timeout);
if (backup_settings.check_projection_parts)
if (backup_settings.check_parts)
part->checkConsistencyWithProjections(/* require_part_metadata= */ true);
BackupEntries backup_entries_from_part;
@ -5323,8 +5323,7 @@ MergeTreeData::PartsBackupEntries MergeTreeData::backupParts(
read_settings,
make_temporary_hard_links,
backup_entries_from_part,
&temp_dirs,
false, false);
&temp_dirs);
auto projection_parts = part->getProjectionParts();
for (const auto & [projection_name, projection_part] : projection_parts)
@ -5337,9 +5336,7 @@ MergeTreeData::PartsBackupEntries MergeTreeData::backupParts(
read_settings,
make_temporary_hard_links,
backup_entries_from_part,
&temp_dirs,
projection_part->is_broken,
backup_settings.allow_backup_broken_projections);
&temp_dirs);
}
if (hold_storage_and_part_ptrs)
@ -7828,39 +7825,21 @@ MovePartsOutcome MergeTreeData::moveParts(const CurrentlyMovingPartsTaggerPtr &
bool MergeTreeData::partsContainSameProjections(const DataPartPtr & left, const DataPartPtr & right, String & out_reason)
{
auto remove_broken_parts_from_consideration = [](auto & parts)
{
std::set<String> broken_projection_parts;
for (const auto & [name, part] : parts)
{
if (part->is_broken)
broken_projection_parts.emplace(name);
}
for (const auto & name : broken_projection_parts)
parts.erase(name);
};
auto left_projection_parts = left->getProjectionParts();
auto right_projection_parts = right->getProjectionParts();
remove_broken_parts_from_consideration(left_projection_parts);
remove_broken_parts_from_consideration(right_projection_parts);
if (left_projection_parts.size() != right_projection_parts.size())
if (left->getProjectionParts().size() != right->getProjectionParts().size())
{
out_reason = fmt::format(
"Parts have different number of projections: {} in part '{}' and {} in part '{}'",
left_projection_parts.size(),
left->getProjectionParts().size(),
left->name,
right_projection_parts.size(),
right->getProjectionParts().size(),
right->name
);
return false;
}
for (const auto & [name, _] : left_projection_parts)
for (const auto & [name, _] : left->getProjectionParts())
{
if (!right_projection_parts.contains(name))
if (!right->hasProjection(name))
{
out_reason = fmt::format(
"The part '{}' doesn't have projection '{}' while part '{}' does", right->name, name, left->name

View File

@ -468,13 +468,8 @@ public:
struct ProjectionPartsVector
{
DataPartsVector data_parts;
DataPartsVector projection_parts;
DataPartStateVector projection_parts_states;
DataPartsVector broken_projection_parts;
DataPartStateVector broken_projection_parts_states;
DataPartsVector data_parts;
};
/// Returns a copy of the list so that the caller shouldn't worry about locks.
@ -489,7 +484,7 @@ public:
const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr) const;
/// Same as above but only returns projection parts
ProjectionPartsVector getProjectionPartsVectorForInternalUsage(
const DataPartStates & affordable_states, MergeTreeData::DataPartStateVector * out_states) const;
const DataPartStates & affordable_states, DataPartStateVector * out_states = nullptr) const;
/// Returns absolutely all parts (and snapshot of their states)

View File

@ -54,8 +54,6 @@ struct MergeTreeDataPartChecksums
bool has(const String & file_name) const { return files.find(file_name) != files.end(); }
bool remove(const String & file_name) { return files.erase(file_name); }
bool empty() const { return files.empty(); }
/// Checks that the set of columns and their checksums are the same. If not, throws an exception.

View File

@ -573,9 +573,7 @@ static std::set<ProjectionDescriptionRawPtr> getProjectionsToRecalculate(
{
bool need_recalculate =
materialized_projections.contains(projection.name)
|| (!is_full_part_storage
&& source_part->hasProjection(projection.name)
&& !source_part->hasBrokenProjection(projection.name));
|| (!is_full_part_storage && source_part->hasProjection(projection.name));
if (need_recalculate)
projections_to_recalc.insert(&projection);
@ -919,8 +917,7 @@ void finalizeMutatedPart(
new_data_part->modification_time = time(nullptr);
/// Load rest projections which are hardlinked
bool noop;
new_data_part->loadProjections(false, false, noop, true /* if_not_loaded */);
new_data_part->loadProjections(false, false, true /* if_not_loaded */);
/// All information about sizes is stored in checksums.
/// It doesn't make sense to touch filesystem for sizes.
@ -1503,9 +1500,7 @@ private:
bool need_recalculate =
ctx->materialized_projections.contains(projection.name)
|| (!is_full_part_storage
&& ctx->source_part->hasProjection(projection.name)
&& !ctx->source_part->hasBrokenProjection(projection.name));
|| (!is_full_part_storage && ctx->source_part->hasProjection(projection.name));
if (need_recalculate)
{
@ -1642,9 +1637,8 @@ private:
void finalize()
{
bool noop;
ctx->new_data_part->minmax_idx = std::move(ctx->minmax_idx);
ctx->new_data_part->loadProjections(false, false, noop, true /* if_not_loaded */);
ctx->new_data_part->loadProjections(false, false, true /* if_not_loaded */);
ctx->mutating_executor.reset();
ctx->mutating_pipeline.reset();

View File

@ -63,7 +63,7 @@ void ReplicatedMergeTreePartCheckThread::enqueuePart(const String & name, time_t
if (parts_set.contains(name))
return;
LOG_TRACE(log, "Enqueueing {} for check after {}s", name, delay_to_check_seconds);
LOG_TRACE(log, "Enqueueing {} for check after after {}s", name, delay_to_check_seconds);
parts_queue.emplace_back(name, std::chrono::steady_clock::now() + std::chrono::seconds(delay_to_check_seconds));
parts_set.insert(name);
task->schedule();
@ -274,7 +274,7 @@ std::pair<bool, MergeTreeDataPartPtr> ReplicatedMergeTreePartCheckThread::findLo
return std::make_pair(exists_in_zookeeper, part);
}
ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const String & part_name, bool throw_on_broken_projection)
ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const String & part_name)
{
ReplicatedCheckResult result;
auto [exists_in_zookeeper, part] = findLocalPart(part_name);
@ -341,7 +341,6 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
/// before the ReplicatedMergeTreePartHeader was introduced.
String part_path = storage.replica_path + "/parts/" + part_name;
String part_znode = zookeeper->get(part_path);
bool is_broken_projection = false;
try
{
@ -363,10 +362,8 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
checkDataPart(
part,
/* require_checksums */true,
is_broken_projection,
[this] { return need_stop.load(); },
throw_on_broken_projection);
true,
[this] { return need_stop.load(); });
if (need_stop)
{
@ -385,27 +382,14 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
if (isRetryableException(std::current_exception()))
throw;
PreformattedMessage message;
if (is_broken_projection)
{
WriteBufferFromOwnString wb;
message = PreformattedMessage::create(
"Part {} has a broken projections. It will be ignored. Broken projections info: {}",
part_name, getCurrentExceptionMessage(false));
LOG_DEBUG(log, message);
result.action = ReplicatedCheckResult::DoNothing;
}
else
{
tryLogCurrentException(log, __PRETTY_FUNCTION__);
tryLogCurrentException(log, __PRETTY_FUNCTION__);
message = PreformattedMessage::create("Part {} looks broken. Removing it and will try to fetch.", part_name);
LOG_ERROR(log, message);
result.action = ReplicatedCheckResult::TryFetchMissing;
}
auto message = PreformattedMessage::create("Part {} looks broken. Removing it and will try to fetch.", part_name);
LOG_ERROR(log, message);
/// Part is broken, let's try to find it and fetch.
result.status = {part_name, false, message};
result.action = ReplicatedCheckResult::TryFetchMissing;
return result;
}
@ -435,12 +419,12 @@ ReplicatedCheckResult ReplicatedMergeTreePartCheckThread::checkPartImpl(const St
}
CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & part_name, std::optional<time_t> * recheck_after, bool throw_on_broken_projection)
CheckResult ReplicatedMergeTreePartCheckThread::checkPartAndFix(const String & part_name, std::optional<time_t> * recheck_after)
{
LOG_INFO(log, "Checking part {}", part_name);
ProfileEvents::increment(ProfileEvents::ReplicatedPartChecks);
ReplicatedCheckResult result = checkPartImpl(part_name, throw_on_broken_projection);
ReplicatedCheckResult result = checkPartImpl(part_name);
switch (result.action)
{
case ReplicatedCheckResult::None: UNREACHABLE();
@ -593,7 +577,7 @@ void ReplicatedMergeTreePartCheckThread::run()
}
std::optional<time_t> recheck_after;
checkPartAndFix(selected->name, &recheck_after, /* throw_on_broken_projection */false);
checkPartAndFix(selected->name, &recheck_after);
if (need_stop)
return;

View File

@ -65,9 +65,9 @@ public:
size_t size() const;
/// Check part by name
CheckResult checkPartAndFix(const String & part_name, std::optional<time_t> * recheck_after = nullptr, bool throw_on_broken_projection = true);
CheckResult checkPartAndFix(const String & part_name, std::optional<time_t> * recheck_after = nullptr);
ReplicatedCheckResult checkPartImpl(const String & part_name, bool throw_on_broken_projection);
ReplicatedCheckResult checkPartImpl(const String & part_name);
std::unique_lock<std::mutex> pausePartsCheck();

View File

@ -43,7 +43,6 @@ namespace ErrorCodes
extern const int NO_FILE_IN_DATA_PART;
extern const int NETWORK_ERROR;
extern const int SOCKET_TIMEOUT;
extern const int BROKEN_PROJECTION;
}
@ -118,9 +117,7 @@ static IMergeTreeDataPart::Checksums checkDataPart(
const NameSet & files_without_checksums,
const ReadSettings & read_settings,
bool require_checksums,
std::function<bool()> is_cancelled,
bool & is_broken_projection,
bool throw_on_broken_projection)
std::function<bool()> is_cancelled)
{
/** Responsibility:
* - read list of columns from columns.txt;
@ -129,7 +126,6 @@ static IMergeTreeDataPart::Checksums checkDataPart(
*/
CurrentMetrics::Increment metric_increment{CurrentMetrics::ReplicatedChecks};
Poco::Logger * log = &Poco::Logger::get("checkDataPart");
NamesAndTypesList columns_txt;
@ -279,55 +275,17 @@ static IMergeTreeDataPart::Checksums checkDataPart(
}
}
std::string broken_projections_message;
for (const auto & [name, projection] : data_part->getProjectionParts())
{
if (is_cancelled())
return {};
auto projection_file = name + ".proj";
if (!throw_on_broken_projection && projection->is_broken)
{
projections_on_disk.erase(projection_file);
checksums_txt.remove(projection_file);
}
IMergeTreeDataPart::Checksums projection_checksums;
try
{
bool noop;
projection_checksums = checkDataPart(
projection, *data_part_storage.getProjection(projection_file),
projection->getColumns(), projection->getType(),
projection->getFileNamesWithoutChecksums(),
read_settings, require_checksums, is_cancelled, noop, /* throw_on_broken_projection */false);
}
catch (...)
{
if (isRetryableException(std::current_exception()))
throw;
if (!projection->is_broken)
{
LOG_TEST(log, "Marking projection {} as broken ({})", name, projection_file);
projection->setBrokenReason(getCurrentExceptionMessage(false), getCurrentExceptionCode());
}
is_broken_projection = true;
if (throw_on_broken_projection)
{
if (!broken_projections_message.empty())
broken_projections_message += "\n";
broken_projections_message += fmt::format(
"Part {} has a broken projection {} (error: {})",
data_part->name, name, getCurrentExceptionMessage(false));
continue;
}
projections_on_disk.erase(projection_file);
checksums_txt.remove(projection_file);
}
auto projection_checksums = checkDataPart(
projection, *data_part_storage.getProjection(projection_file),
projection->getColumns(), projection->getType(),
projection->getFileNamesWithoutChecksums(),
read_settings, require_checksums, is_cancelled);
checksums_data.files[projection_file] = IMergeTreeDataPart::Checksums::Checksum(
projection_checksums.getTotalSizeOnDisk(),
@ -336,11 +294,6 @@ static IMergeTreeDataPart::Checksums checkDataPart(
projections_on_disk.erase(projection_file);
}
if (throw_on_broken_projection && !broken_projections_message.empty())
{
throw Exception(ErrorCodes::BROKEN_PROJECTION, "{}", broken_projections_message);
}
if (require_checksums && !projections_on_disk.empty())
{
throw Exception(ErrorCodes::UNEXPECTED_FILE_IN_DATA_PART,
@ -368,9 +321,7 @@ IMergeTreeDataPart::Checksums checkDataPartInMemory(const DataPartInMemoryPtr &
IMergeTreeDataPart::Checksums checkDataPart(
MergeTreeData::DataPartPtr data_part,
bool require_checksums,
bool & is_broken_projection,
std::function<bool()> is_cancelled,
bool throw_on_broken_projection)
std::function<bool()> is_cancelled)
{
if (auto part_in_memory = asInMemoryPart(data_part))
return checkDataPartInMemory(part_in_memory);
@ -412,9 +363,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
data_part->getFileNamesWithoutChecksums(),
read_settings,
require_checksums,
is_cancelled,
is_broken_projection,
throw_on_broken_projection);
is_cancelled);
};
try
@ -428,9 +377,7 @@ IMergeTreeDataPart::Checksums checkDataPart(
data_part->getFileNamesWithoutChecksums(),
read_settings,
require_checksums,
is_cancelled,
is_broken_projection,
throw_on_broken_projection);
is_cancelled);
}
catch (...)
{

View File

@ -10,9 +10,7 @@ namespace DB
IMergeTreeDataPart::Checksums checkDataPart(
MergeTreeData::DataPartPtr data_part,
bool require_checksums,
bool & is_broken_projection,
std::function<bool()> is_cancelled = []{ return false; },
bool throw_on_broken_projection = false);
std::function<bool()> is_cancelled = []{ return false; });
bool isNotEnoughMemoryErrorCode(int code);
bool isRetryableException(const std::exception_ptr exception_ptr);

View File

@ -1892,7 +1892,7 @@ void registerStorageDistributed(StorageFactory & factory)
}
/// TODO: move some arguments from the arguments to the SETTINGS.
DistributedSettings distributed_settings;
DistributedSettings distributed_settings = context->getDistributedSettings();
if (args.storage_def->settings)
{
distributed_settings.loadFromQuery(*args.storage_def);

View File

@ -2315,12 +2315,11 @@ std::optional<CheckResult> StorageMergeTree::checkDataNext(DataValidationTasksPt
{
/// If the checksums file is not present, calculate the checksums and write them to disk.
static constexpr auto checksums_path = "checksums.txt";
bool noop;
if (part->isStoredOnDisk() && !part->getDataPartStorage().exists(checksums_path))
{
try
{
auto calculated_checksums = checkDataPart(part, false, noop, /* is_cancelled */[]{ return false; }, /* throw_on_broken_projection */true);
auto calculated_checksums = checkDataPart(part, false);
calculated_checksums.checkEqual(part->checksums, true);
auto & part_mutable = const_cast<IMergeTreeDataPart &>(*part);
@ -2341,7 +2340,7 @@ std::optional<CheckResult> StorageMergeTree::checkDataNext(DataValidationTasksPt
{
try
{
checkDataPart(part, true, noop, /* is_cancelled */[]{ return false; }, /* throw_on_broken_projection */true);
checkDataPart(part, true);
return CheckResult(part->name, true, "");
}
catch (...)

View File

@ -8891,11 +8891,12 @@ IStorage::DataValidationTasksPtr StorageReplicatedMergeTree::getCheckTaskList(
std::optional<CheckResult> StorageReplicatedMergeTree::checkDataNext(DataValidationTasksPtr & check_task_list)
{
if (auto part = assert_cast<DataValidationTasks *>(check_task_list.get())->next())
{
try
{
return part_check_thread.checkPartAndFix(part->name, /* recheck_after */nullptr, /* throw_on_broken_projection */true);
return CheckResult(part_check_thread.checkPartAndFix(part->name));
}
catch (const Exception & ex)
{

View File

@ -8,7 +8,6 @@
#include <Common/Throttler.h>
#include <Common/formatReadable.h>
#include <Interpreters/Context.h>
#include <boost/algorithm/string/predicate.hpp>
namespace DB
@ -303,7 +302,7 @@ S3Settings StorageS3Settings::getSettings(const String & endpoint, const String
{
std::advance(possible_prefix_setting, -1);
const auto & [endpoint_prefix, settings] = *possible_prefix_setting;
if (boost::algorithm::starts_with(endpoint, endpoint_prefix) && settings.auth_settings.canBeUsedByUser(user))
if (endpoint.starts_with(endpoint_prefix) && settings.auth_settings.canBeUsedByUser(user))
return possible_prefix_setting->second;
}

View File

@ -83,11 +83,7 @@ StorageSystemProjectionParts::StorageSystemProjectionParts(const StorageID & tab
{"rows_where_ttl_info.expression", std::make_shared<DataTypeArray>(std::make_shared<DataTypeString>())},
{"rows_where_ttl_info.min", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"rows_where_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())},
{"is_broken", std::make_shared<DataTypeUInt8>()},
{"exception_code", std::make_shared<DataTypeInt32>()},
{"exception", std::make_shared<DataTypeString>()},
{"rows_where_ttl_info.max", std::make_shared<DataTypeArray>(std::make_shared<DataTypeDateTime>())}
}
)
{
@ -276,38 +272,12 @@ void StorageSystemProjectionParts::processNextStorage(
add_ttl_info_map(part->ttl_infos.moves_ttl);
if (columns_mask[src_index++])
{
if (part->default_codec)
columns[res_index++]->insert(queryToString(part->default_codec->getCodecDesc()));
else
columns[res_index++]->insertDefault();
}
columns[res_index++]->insert(queryToString(part->default_codec->getCodecDesc()));
add_ttl_info_map(part->ttl_infos.recompression_ttl);
add_ttl_info_map(part->ttl_infos.group_by_ttl);
add_ttl_info_map(part->ttl_infos.rows_where_ttl);
{
if (columns_mask[src_index++])
columns[res_index++]->insert(part->is_broken.load(std::memory_order_relaxed));
if (part->is_broken)
{
std::lock_guard lock(part->broken_reason_mutex);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->exception_code);
if (columns_mask[src_index++])
columns[res_index++]->insert(part->exception);
}
else
{
if (columns_mask[src_index++])
columns[res_index++]->insertDefault();
if (columns_mask[src_index++])
columns[res_index++]->insertDefault();
}
}
/// _state column should be the latest.
/// Do not use part->getState*, it can be changed from different thread
if (has_state_column)

View File

@ -168,13 +168,13 @@ public:
/// Type of path to be fetched
enum class ZkPathType
{
Exact, /// Fetch all nodes under this path
Prefix, /// Fetch all nodes starting with this prefix, recursively (multiple paths may match prefix)
Recurse, /// Fatch all nodes under this path, recursively
Exact, /// Fetch all nodes under this path
Prefix, /// Fetch all nodes starting with this prefix, recursively (multiple paths may match prefix)
Recurse, /// Fetch all nodes under this path, recursively
};
/// List of paths to be feched from zookeeper
using Paths = std::deque<std::pair<String, ZkPathType>>;
/// List of paths to be fetched from zookeeper
using Paths = std::unordered_map<String, ZkPathType>;
class ReadFromSystemZooKeeper final : public SourceStepWithFilter
{
@ -226,6 +226,7 @@ private:
ContextPtr context;
ZooKeeperWithFaultInjection::Ptr zookeeper;
bool started = false;
std::unordered_set<String> visited;
};
@ -375,7 +376,8 @@ static void extractPathImpl(const ActionsDAG::Node & node, Paths & res, ContextP
size_t size = values->size();
for (size_t row = 0; row < size; ++row)
res.emplace_back(values->getDataAt(row).toString(), ZkPathType::Exact);
/// Only inserted if the key doesn't exists already
res.insert({values->getDataAt(row).toString(), ZkPathType::Exact});
}
else if (function_name == "equals")
{
@ -395,7 +397,8 @@ static void extractPathImpl(const ActionsDAG::Node & node, Paths & res, ContextP
if (value->column->size() != 1)
return;
res.emplace_back(value->column->getDataAt(0).toString(), ZkPathType::Exact);
/// Only inserted if the key doesn't exists already
res.insert({value->column->getDataAt(0).toString(), ZkPathType::Exact});
}
else if (allow_unrestricted && function_name == "like")
{
@ -414,7 +417,7 @@ static void extractPathImpl(const ActionsDAG::Node & node, Paths & res, ContextP
String pattern = value->column->getDataAt(0).toString();
bool has_metasymbol = false;
String prefix; // pattern prefix before the first metasymbol occurrence
String prefix{}; // pattern prefix before the first metasymbol occurrence
for (size_t i = 0; i < pattern.size(); i++)
{
char c = pattern[i];
@ -440,7 +443,7 @@ static void extractPathImpl(const ActionsDAG::Node & node, Paths & res, ContextP
prefix.append(1, c);
}
res.emplace_back(prefix, has_metasymbol ? ZkPathType::Prefix : ZkPathType::Exact);
res.insert_or_assign(prefix, has_metasymbol ? ZkPathType::Prefix : ZkPathType::Exact);
}
}
@ -453,8 +456,17 @@ static Paths extractPath(const ActionsDAG::NodeRawConstPtrs & filter_nodes, Cont
for (const auto * node : filter_nodes)
extractPathImpl(*node, res, context, allow_unrestricted);
auto node1 = res.find("/");
auto node2 = res.find("");
if ((node1 != res.end() && node1->second != ZkPathType::Exact) || (node2 != res.end() && node2->second != ZkPathType::Exact))
{
/// If we are already searching everything recursively, remove all other nodes
res.clear();
res.insert({"/", ZkPathType::Recurse});
}
if (res.empty() && allow_unrestricted)
res.emplace_back("/", ZkPathType::Recurse);
res.insert({"/", ZkPathType::Recurse});
return res;
}
@ -521,7 +533,6 @@ Chunk SystemZooKeeperSource::generate()
String path_part;
};
std::vector<ListTask> list_tasks;
std::unordered_set<String> added;
while (!paths.empty())
{
if (query_status)
@ -541,8 +552,9 @@ Chunk SystemZooKeeperSource::generate()
std::vector<String> paths_to_list;
while (!paths.empty() && static_cast<Int64>(list_tasks.size()) < max_inflight_requests)
{
auto [path, path_type] = std::move(paths.front());
paths.pop_front();
auto node = paths.extract(paths.begin());
auto & path = node.key();
auto & path_type = node.mapped();
ListTask task;
task.path = path;
@ -623,7 +635,7 @@ Chunk SystemZooKeeperSource::generate()
// Deduplication
String key = list_task.path_part + '/' + get_task.node;
if (auto [it, inserted] = added.emplace(key); !inserted)
if (auto [it, inserted] = visited.emplace(key); !inserted)
continue;
const Coordination::Stat & stat = res.stat;
@ -649,7 +661,7 @@ Chunk SystemZooKeeperSource::generate()
if (list_task.path_type != ZkPathType::Exact && res.stat.numChildren > 0)
{
paths.emplace_back(key, ZkPathType::Recurse);
paths.insert_or_assign(key, ZkPathType::Recurse);
}
}
}

View File

@ -71,12 +71,9 @@ std::vector<size_t> TableFunctionExecutable::skipAnalysisForArguments(const Quer
const auto & table_function_node_arguments = table_function_node.getArguments().getNodes();
size_t table_function_node_arguments_size = table_function_node_arguments.size();
if (table_function_node_arguments_size <= 3)
return {};
std::vector<size_t> result_indexes;
result_indexes.reserve(table_function_node_arguments_size - 3);
for (size_t i = 3; i < table_function_node_arguments_size; ++i)
result_indexes.reserve(table_function_node_arguments_size - 2);
for (size_t i = 2; i < table_function_node_arguments_size; ++i)
result_indexes.push_back(i);
return result_indexes;

View File

@ -1,7 +1,6 @@
test_build_sets_from_multiple_threads/test.py::test_set
test_concurrent_backups_s3/test.py::test_concurrent_backups
test_distributed_type_object/test.py::test_distributed_type_object
test_executable_table_function/test.py::test_executable_function_input_python
test_merge_table_over_distributed/test.py::test_global_in
test_merge_table_over_distributed/test.py::test_select_table_name_from_merge_over_distributed
test_passing_max_partitions_to_read_remotely/test.py::test_default_database_on_cluster

View File

@ -123,6 +123,13 @@ class Cache:
local_s3_cache = Path(url[7:])
if local_s3_cache.is_file():
shutil.copy2(local_s3_cache, compressed_cache)
else:
logging.warning(
"The local cache file %s does not exist, creating empty directory",
local_s3_cache,
)
self.directory.mkdir(parents=True, exist_ok=True)
return
else:
download_build_with_progress(url, compressed_cache)
except DownloadException as e:
@ -155,7 +162,7 @@ class Cache:
logging.info("Remote cache %s already exist, won't reupload", s3_path)
return
logging.info("Compressing cargo cache")
logging.info("Compressing cache")
archive_path = self.temp_path / self.archive_name
compress_fast(self.directory, archive_path)
logging.info("Uploading %s to S3 path %s", archive_path, s3_path)

View File

@ -1,8 +1,5 @@
import argparse
import concurrent.futures
from copy import deepcopy
from dataclasses import asdict, dataclass
from enum import Enum
import json
import logging
import os
@ -11,17 +8,21 @@ import re
import subprocess
import sys
import time
from copy import deepcopy
from dataclasses import asdict, dataclass
from enum import Enum
from pathlib import Path
from typing import Any, Dict, List, Optional, Sequence, Set, Union
import docker_images_helper
import upload_result_helper
from build_check import get_release_or_pr
from ci_config import CI_CONFIG, Build, Labels, JobNames
from ci_config import CI_CONFIG, Build, JobNames, Labels
from ci_utils import GHActions, is_hex, normalize_string
from clickhouse_helper import (
CiLogsCredentials,
ClickHouseHelper,
InsertException,
get_instance_id,
get_instance_type,
prepare_tests_results_for_clickhouse,
@ -1504,7 +1505,10 @@ def _upload_build_profile_data(
profile_data_file.stat().st_size,
query,
)
ch_helper.insert_file(url, auth, query, profile_data_file)
try:
ch_helper.insert_file(url, auth, query, profile_data_file)
except InsertException:
logging.error("Failed to insert profile data for the build, continue")
query = f"""INSERT INTO binary_sizes
(
@ -1530,7 +1534,10 @@ def _upload_build_profile_data(
binary_sizes_file.stat().st_size,
query,
)
ch_helper.insert_file(url, auth, query, binary_sizes_file)
try:
ch_helper.insert_file(url, auth, query, binary_sizes_file)
except InsertException:
logging.error("Failed to insert binary_size_file for the build, continue")
def _run_test(job_name: str, run_command: str) -> int:

View File

@ -425,13 +425,14 @@ def set_mergeable_check(
) -> None:
commit.create_status(
context=MERGEABLE_NAME,
description=description,
description=format_description(description),
state=state,
target_url=GITHUB_JOB_URL(),
)
def update_mergeable_check(commit: Commit, pr_info: PRInfo, check_name: str) -> None:
"check if the check_name in REQUIRED_CHECKS and then trigger update"
not_run = (
pr_info.labels.intersection({SKIP_MERGEABLE_CHECK_LABEL, "release"})
or check_name not in REQUIRED_CHECKS
@ -445,7 +446,11 @@ def update_mergeable_check(commit: Commit, pr_info: PRInfo, check_name: str) ->
logging.info("Update Mergeable Check by %s", check_name)
statuses = get_commit_filtered_statuses(commit)
trigger_mergeable_check(commit, statuses)
def trigger_mergeable_check(commit: Commit, statuses: CommitStatuses) -> None:
"""calculate and update MERGEABLE_NAME"""
required_checks = [
status for status in statuses if status.context in REQUIRED_CHECKS
]

View File

@ -4,10 +4,14 @@ import argparse
import logging
import subprocess
import sys
from distutils.version import StrictVersion
from pathlib import Path
from typing import List, Tuple
# isort: off
from pip._vendor.packaging.version import Version
# isort: on
from build_download_helper import download_builds_filter
from docker_images_helper import DockerImage, get_docker_image, pull_image
from env_helper import REPORT_PATH, TEMP_PATH
@ -38,7 +42,7 @@ def process_glibc_check(log_path: Path, max_glibc_version: str) -> TestResults:
_, version = symbol_with_glibc.split("@GLIBC_")
if version == "PRIVATE":
test_results.append(TestResult(symbol_with_glibc, "FAIL"))
elif StrictVersion(version) > max_glibc_version:
elif Version(version) > Version(max_glibc_version):
test_results.append(TestResult(symbol_with_glibc, "FAIL"))
if not test_results:
test_results.append(TestResult("glibc check", "OK"))

View File

@ -1,13 +0,0 @@
<clickhouse>
<storage_configuration>
<disks>
<backups>
<type>local</type>
<path>/var/lib/clickhouse/disks/backups/</path>
</backups>
</disks>
</storage_configuration>
<backups>
<allowed_disk>backups</allowed_disk>
</backups>
</clickhouse>

View File

@ -1,576 +0,0 @@
import time
import pytest
import logging
import string
import random
from helpers.cluster import ClickHouseCluster
cluster = ClickHouseCluster(__file__)
@pytest.fixture(scope="module")
def cluster():
try:
cluster = ClickHouseCluster(__file__)
cluster.add_instance(
"node",
main_configs=["config.d/backups.xml"],
stay_alive=True,
with_zookeeper=True,
)
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def create_table(node, table, replica, data_prefix="", aggressive_merge=True):
if data_prefix == "":
data_prefix = table
if aggressive_merge:
vertical_merge_algorithm_min_rows_to_activate = 1
vertical_merge_algorithm_min_columns_to_activate = 1
max_parts_to_merge_at_once = 3
else:
vertical_merge_algorithm_min_rows_to_activate = 100000
vertical_merge_algorithm_min_columns_to_activate = 100
max_parts_to_merge_at_once = 3
node.query(
f"""
DROP TABLE IF EXISTS {table} SYNC;
CREATE TABLE {table}
(
a String,
b String,
c Int64,
d Int64,
e Int64,
PROJECTION proj1
(
SELECT c ORDER BY d
),
PROJECTION proj2
(
SELECT d ORDER BY c
)
)
ENGINE = ReplicatedMergeTree('/test_broken_projection_{data_prefix}/data/', '{replica}') ORDER BY a
SETTINGS min_bytes_for_wide_part = 0,
max_parts_to_merge_at_once={max_parts_to_merge_at_once},
enable_vertical_merge_algorithm=0,
vertical_merge_algorithm_min_rows_to_activate = {vertical_merge_algorithm_min_rows_to_activate},
vertical_merge_algorithm_min_columns_to_activate = {vertical_merge_algorithm_min_columns_to_activate},
compress_primary_key=0;
"""
)
def insert(node, table, offset, size):
node.query(
f"""
INSERT INTO {table}
SELECT number, number, number, number, number%2 FROM numbers({offset}, {size})
SETTINGS insert_keeper_fault_injection_probability=0.0;
"""
)
def get_parts(node, table):
return (
node.query(
f"""
SELECT name
FROM system.parts
WHERE table='{table}' AND database=currentDatabase() AND active = 1
ORDER BY name;"
"""
)
.strip()
.split("\n")
)
def bash(node, command):
node.exec_in_container(["bash", "-c", command], privileged=True, user="root")
def break_projection(node, table, part, parent_part, break_type):
part_path = node.query(
f"""
SELECT path
FROM system.projection_parts
WHERE table='{table}'
AND database=currentDatabase()
AND active=1
AND part_name='{part}'
AND parent_name='{parent_part}'
ORDER BY modification_time DESC
LIMIT 1;
"""
).strip()
node.query(
f"select throwIf(substring('{part_path}', 1, 1) != '/', 'Path is relative: {part_path}')"
)
if break_type == "data":
bash(node, f"rm '{part_path}/d.bin'")
bash(node, f"rm '{part_path}/c.bin'")
elif break_type == "metadata":
bash(node, f"rm '{part_path}/columns.txt'")
elif break_type == "part":
bash(node, f"rm -r '{part_path}'")
def break_part(node, table, part):
part_path = node.query(
f"""
SELECT path
FROM system.parts
WHERE table='{table}'
AND database=currentDatabase()
AND active=1
AND part_name='{part}'
ORDER BY modification_time DESC
LIMIT 1;
"""
).strip()
node.query(
f"select throwIf(substring('{part_path}', 1, 1) != '/', 'Path is relative: {part_path}')"
)
bash(node, f"rm '{part_path}/columns.txt'")
def get_broken_projections_info(node, table):
return node.query(
f"""
SELECT parent_name, name, errors.name FROM
(
SELECT parent_name, name, exception_code
FROM system.projection_parts
WHERE table='{table}'
AND database=currentDatabase()
AND is_broken = 1
) AS parts_info
INNER JOIN system.errors AS errors
ON parts_info.exception_code = errors.code
ORDER BY parent_name, name
"""
).strip()
def get_projections_info(node, table):
return node.query(
f"""
SELECT parent_name, name, is_broken
FROM system.projection_parts
WHERE table='{table}'
AND active = 1
AND database=currentDatabase()
ORDER BY parent_name, name
"""
).strip()
def optimize(node, table, final, no_wait):
query = f"OPTIMIZE TABLE {table}"
if final:
query += " FINAL"
if no_wait:
query += " SETTINGS alter_sync=0"
node.query(query)
def reattach(node, table):
node.query(
f"""
DETACH TABLE {table};
ATTACH TABLE {table};
"""
)
def materialize_projection(node, table, proj):
node.query(
f"ALTER TABLE {table} MATERIALIZE PROJECTION {proj} SETTINGS mutations_sync=2"
)
def check_table_full(node, table):
return node.query(
f"CHECK TABLE {table} SETTINGS check_query_single_value_result = 0;"
).strip()
def random_str(length=6):
alphabet = string.ascii_lowercase + string.digits
return "".join(random.SystemRandom().choice(alphabet) for _ in range(length))
def check(node, table, check_result, expect_broken_part="", expected_error=""):
if expect_broken_part == "proj1":
assert expected_error in node.query_and_get_error(
f"SELECT c FROM '{table}' WHERE d == 12 ORDER BY c"
)
else:
query_id = node.query(
f"SELECT queryID() FROM (SELECT c FROM '{table}' WHERE d == 12 ORDER BY c)"
).strip()
node.query("SYSTEM FLUSH LOGS")
res = node.query(
f"""
SELECT query, splitByChar('.', arrayJoin(projections))[-1]
FROM system.query_log
WHERE query_id='{query_id}' AND type='QueryFinish'
"""
)
if res == "":
res = node.query(
"""
SELECT query_id, query, splitByChar('.', arrayJoin(projections))[-1]
FROM system.query_log ORDER BY query_start_time_microseconds DESC
"""
)
print(f"LOG: {res}")
assert False
assert "proj1" in res
if expect_broken_part == "proj2":
assert expected_error in node.query_and_get_error(
f"SELECT d FROM '{table}' WHERE c == 12 ORDER BY d"
)
else:
query_id = node.query(
f"SELECT queryID() FROM (SELECT d FROM '{table}' WHERE c == 12 ORDER BY d)"
).strip()
node.query("SYSTEM FLUSH LOGS")
res = node.query(
f"""
SELECT query, splitByChar('.', arrayJoin(projections))[-1]
FROM system.query_log
WHERE query_id='{query_id}' AND type='QueryFinish'
"""
)
if res == "":
res = node.query(
"""
SELECT query_id, query, splitByChar('.', arrayJoin(projections))[-1]
FROM system.query_log ORDER BY query_start_time_microseconds DESC
"""
)
print(f"LOG: {res}")
assert False
assert "proj2" in res
assert check_result == int(node.query(f"CHECK TABLE {table}"))
def test_broken_ignored(cluster):
node = cluster.instances["node"]
table_name = "test1"
create_table(node, table_name, 1)
insert(node, table_name, 0, 5)
insert(node, table_name, 5, 5)
insert(node, table_name, 10, 5)
insert(node, table_name, 15, 5)
assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
node, table_name
)
# Break metadata (columns.txt) file of projection 'proj1'
break_projection(node, table_name, "proj1", "all_2_2_0", "metadata")
# Do select and after "check table" query.
# Select works because it does not read columns.txt.
# But expect check table result as 0.
check(node, table_name, 0)
# Projection 'proj1' from part all_2_2_0 will now appear in broken parts info
# because it was marked broken during "check table" query.
assert "all_2_2_0\tproj1\tFILE_DOESNT_EXIST" in get_broken_projections_info(
node, table_name
)
# Check table query will also show a list of parts which have broken projections.
assert "all_2_2_0" in check_table_full(node, table_name)
# Break data file of projection 'proj2' for part all_2_2_0
break_projection(node, table_name, "proj2", "all_2_2_0", "data")
# It will not yet appear in broken projections info.
assert "proj2" not in get_broken_projections_info(node, table_name)
# Select now fails with error "File doesn't exist"
check(node, table_name, 0, "proj2", "FILE_DOESNT_EXIST")
# Projection 'proj2' from part all_2_2_0 will now appear in broken parts info.
assert "all_2_2_0\tproj2\tNO_FILE_IN_DATA_PART" in get_broken_projections_info(
node, table_name
)
# Second select works, because projection is now marked as broken.
check(node, table_name, 0)
# Break data file of projection 'proj2' for part all_3_3_0
break_projection(node, table_name, "proj2", "all_3_3_0", "data")
# It will not yet appear in broken projections info.
assert "all_3_3_0" not in get_broken_projections_info(node, table_name)
insert(node, table_name, 20, 5)
insert(node, table_name, 25, 5)
# Part all_3_3_0 has 'proj' and 'proj2' projections, but 'proj2' is broken and server does NOT know it yet.
# Parts all_4_4_0 and all_5_5_0 have both non-broken projections.
# So a merge will be create for future part all_3_5_1.
# During merge it will fail to read from 'proj2' of part all_3_3_0 and proj2 will be marked broken.
# Merge will be retried and on second attempt it will succeed.
# The result part all_3_5_1 will have only 1 projection - 'proj', because
# it will skip 'proj2' as it will see that one part does not have it anymore in the set of valid projections.
optimize(node, table_name, 0, 1)
time.sleep(5)
# table_uuid=node.query(f"SELECT uuid FROM system.tables WHERE table='{table_name}' and database=currentDatabase()").strip()
# assert 0 < int(
# node.query(
# f"""
# SYSTEM FLUSH LOGS;
# SELECT count() FROM system.text_log
# WHERE level='Error'
# AND logger_name='MergeTreeBackgroundExecutor'
# AND message like 'Exception while executing background task %{table_uuid}:all_3_5_1%%Cannot open file%proj2.proj/c.bin%'
# """)
# )
assert "all_3_3_0" in get_broken_projections_info(node, table_name)
check(node, table_name, 0)
def test_materialize_broken_projection(cluster):
node = cluster.instances["node"]
table_name = "test2"
create_table(node, table_name, 1)
insert(node, table_name, 0, 5)
insert(node, table_name, 5, 5)
insert(node, table_name, 10, 5)
insert(node, table_name, 15, 5)
assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
node, table_name
)
break_projection(node, table_name, "proj1", "all_1_1_0", "metadata")
reattach(node, table_name)
assert "all_1_1_0\tproj1\tNO_FILE_IN_DATA_PART" in get_broken_projections_info(
node, table_name
)
assert "Part all_1_1_0 has a broken projection proj1" in check_table_full(
node, table_name
)
break_projection(node, table_name, "proj2", "all_1_1_0", "data")
reattach(node, table_name)
assert "all_1_1_0\tproj2\tFILE_DOESNT_EXIST" in get_broken_projections_info(
node, table_name
)
assert "Part all_1_1_0 has a broken projection proj2" in check_table_full(
node, table_name
)
materialize_projection(node, table_name, "proj1")
assert "has a broken projection" not in check_table_full(node, table_name)
def test_broken_ignored_replicated(cluster):
node = cluster.instances["node"]
table_name = "test3"
table_name2 = "test3_replica"
create_table(node, table_name, 1)
insert(node, table_name, 0, 5)
insert(node, table_name, 5, 5)
insert(node, table_name, 10, 5)
insert(node, table_name, 15, 5)
check(node, table_name, 1)
create_table(node, table_name2, 2, table_name)
check(node, table_name2, 1)
break_projection(node, table_name, "proj1", "all_0_0_0", "data")
assert "Part all_0_0_0 has a broken projection proj1" in check_table_full(
node, table_name
)
break_part(node, table_name, "all_0_0_0")
node.query(f"SYSTEM SYNC REPLICA {table_name}")
assert "has a broken projection" not in check_table_full(node, table_name)
def get_random_string(string_length=8):
alphabet = string.ascii_letters + string.digits
return "".join((random.choice(alphabet) for _ in range(string_length)))
def test_broken_projections_in_backups_1(cluster):
node = cluster.instances["node"]
table_name = "test4"
create_table(node, table_name, 1, aggressive_merge=False, data_prefix=table_name)
node.query("SYSTEM STOP MERGES")
insert(node, table_name, 0, 5)
insert(node, table_name, 5, 5)
insert(node, table_name, 10, 5)
insert(node, table_name, 15, 5)
assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
node, table_name
)
check(node, table_name, 1)
break_projection(node, table_name, "proj1", "all_2_2_0", "data")
check(node, table_name, 0, "proj1", "FILE_DOESNT_EXIST")
assert "all_2_2_0\tproj1\tNO_FILE_IN_DATA_PART" in get_broken_projections_info(
node, table_name
)
backup_name = f"b1-{get_random_string()}"
assert "BACKUP_CREATED" in node.query(
f"""
set backup_restore_keeper_fault_injection_probability=0.0;
backup table {table_name} to Disk('backups', '{backup_name}') settings check_projection_parts=false;
"""
)
assert "RESTORED" in node.query(
f"""
drop table {table_name} sync;
set backup_restore_keeper_fault_injection_probability=0.0;
restore table {table_name} from Disk('backups', '{backup_name}');
"""
)
node.query("SYSTEM STOP MERGES")
check(node, table_name, 1)
assert "" == get_broken_projections_info(node, table_name)
def test_broken_projections_in_backups_2(cluster):
node = cluster.instances["node"]
table_name = "test5"
create_table(node, table_name, 1, aggressive_merge=False, data_prefix=table_name)
insert(node, table_name, 0, 5)
insert(node, table_name, 5, 5)
insert(node, table_name, 10, 5)
insert(node, table_name, 15, 5)
assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
node, table_name
)
check(node, table_name, 1)
break_projection(node, table_name, "proj2", "all_2_2_0", "part")
check(node, table_name, 0, "proj2", "ErrnoException")
assert "all_2_2_0\tproj2\tFILE_DOESNT_EXIST" == get_broken_projections_info(
node, table_name
)
assert "FILE_DOESNT_EXIST" in node.query_and_get_error(
f"""
set backup_restore_keeper_fault_injection_probability=0.0;
backup table {table_name} to Disk('backups', 'b2')
"""
)
materialize_projection(node, table_name, "proj2")
check(node, table_name, 1)
backup_name = f"b3-{get_random_string()}"
assert "BACKUP_CREATED" in node.query(
f"""
set backup_restore_keeper_fault_injection_probability=0.0;
backup table {table_name} to Disk('backups', '{backup_name}') settings check_projection_parts=false;
"""
)
assert "RESTORED" in node.query(
f"""
drop table {table_name} sync;
set backup_restore_keeper_fault_injection_probability=0.0;
restore table {table_name} from Disk('backups', '{backup_name}');
"""
)
check(node, table_name, 1)
def test_broken_projections_in_backups_3(cluster):
node = cluster.instances["node"]
table_name = "test6"
create_table(node, table_name, 1, aggressive_merge=False, data_prefix=table_name)
node.query("SYSTEM STOP MERGES")
insert(node, table_name, 0, 5)
insert(node, table_name, 5, 5)
insert(node, table_name, 10, 5)
insert(node, table_name, 15, 5)
assert ["all_0_0_0", "all_1_1_0", "all_2_2_0", "all_3_3_0"] == get_parts(
node, table_name
)
check(node, table_name, 1)
break_projection(node, table_name, "proj1", "all_1_1_0", "part")
assert "Part all_1_1_0 has a broken projection proj1" in check_table_full(
node, table_name
)
assert "all_1_1_0\tproj1\tFILE_DOESNT_EXIST" == get_broken_projections_info(
node, table_name
)
backup_name = f"b4-{get_random_string()}"
assert "BACKUP_CREATED" in node.query(
f"""
set backup_restore_keeper_fault_injection_probability=0.0;
backup table {table_name} to Disk('backups', '{backup_name}') settings check_projection_parts=false, allow_backup_broken_projections=true;
"""
)
assert "RESTORED" in node.query(
f"""
drop table {table_name} sync;
set backup_restore_keeper_fault_injection_probability=0.0;
restore table {table_name} from Disk('backups', '{backup_name}');
"""
)
check(node, table_name, 0)
assert "all_1_1_0\tproj1\tNO_FILE_IN_DATA_PART" == get_broken_projections_info(
node, table_name
)

View File

@ -0,0 +1,5 @@
<clickhouse>
<distributed>
<flush_on_detach>0</flush_on_detach>
</distributed>
</clickhouse>

View File

@ -0,0 +1,49 @@
import pytest
from helpers.cluster import ClickHouseCluster
import logging
cluster = ClickHouseCluster(__file__)
node = cluster.add_instance("node", main_configs=["configs/overrides.xml"])
@pytest.fixture(scope="module")
def start_cluster():
try:
logging.info("Starting cluster...")
cluster.start()
logging.info("Cluster started")
yield cluster
finally:
cluster.shutdown()
def test_distibuted_settings(start_cluster):
node.query("")
node.query(
"""
CREATE TABLE data_1 (key Int) ENGINE Memory();
CREATE TABLE dist_1 as data_1 ENGINE Distributed(default, default, data_1) SETTINGS flush_on_detach = true;
SYSTEM STOP DISTRIBUTED SENDS dist_1;
INSERT INTO dist_1 SETTINGS prefer_localhost_replica=0 VALUES (1);
DETACH TABLE dist_1;
"""
)
assert "flush_on_detach = 1" in node.query("SHOW CREATE dist_1")
# flush_on_detach=true, so data_1 should have 1 row
assert int(node.query("SELECT count() FROM data_1")) == 1
node.query(
"""
CREATE TABLE data_2 (key Int) ENGINE Memory();
CREATE TABLE dist_2 as data_2 ENGINE Distributed(default, default, data_2);
SYSTEM STOP DISTRIBUTED SENDS dist_2;
INSERT INTO dist_2 SETTINGS prefer_localhost_replica=0 VALUES (2);
DETACH TABLE dist_2;
"""
)
## Settings are not added to CREATE (only specific one, like index_granularity for MergeTree)
# assert "flush_on_detach = 0" in node.query("SHOW CREATE dist_2")
# But settins are applied (flush_on_detach=false in config, so data_2 should not have any rows)
assert int(node.query("SELECT count() FROM data_2")) == 0

View File

@ -46,7 +46,6 @@ ALTER DATABASE SETTINGS ['ALTER DATABASE SETTING','ALTER MODIFY DATABASE SETTING
ALTER NAMED COLLECTION [] NAMED_COLLECTION NAMED COLLECTION ADMIN
ALTER TABLE [] \N ALTER
ALTER DATABASE [] \N ALTER
ALTER VIEW REFRESH ['ALTER LIVE VIEW REFRESH','REFRESH VIEW'] VIEW ALTER VIEW
ALTER VIEW MODIFY QUERY ['ALTER TABLE MODIFY QUERY'] VIEW ALTER VIEW
ALTER VIEW MODIFY REFRESH ['ALTER TABLE MODIFY QUERY'] VIEW ALTER VIEW
ALTER VIEW [] \N ALTER

View File

@ -686,9 +686,6 @@ CREATE TABLE system.projection_parts
`rows_where_ttl_info.expression` Array(String),
`rows_where_ttl_info.min` Array(DateTime),
`rows_where_ttl_info.max` Array(DateTime),
`is_broken` UInt8,
`exception_code` Int32,
`exception` String,
`bytes` UInt64 ALIAS bytes_on_disk,
`marks_size` UInt64 ALIAS marks_bytes,
`part_name` String ALIAS name

View File

@ -13,7 +13,7 @@ ${CLICKHOUSE_CLIENT} -n --query="CREATE TABLE sample_table (
)
ENGINE ReplicatedMergeTree('/clickhouse/$CLICKHOUSE_TEST_ZOOKEEPER_PREFIX/02221_system_zookeeper_unrestricted_like', '1')
ORDER BY tuple();
DROP TABLE IF EXISTS sample_table;"
DROP TABLE IF EXISTS sample_table SYNC;"
${CLICKHOUSE_CLIENT} -n --query "CREATE TABLE sample_table_2 (

View File

@ -123,7 +123,7 @@ expect -- "| dt_tz2 | DATETIME | NO | | NULL | |
expect -- "| enm | TEXT | NO | | NULL | |"
expect -- "| f32 | FLOAT | NO | | NULL | |"
expect -- "| f64 | DOUBLE | NO | | NULL | |"
expect -- "| fs | BLOB | NO | | NULL | |"
expect -- "| fs | TEXT | NO | | NULL | |"
expect -- "| i128 | TEXT | NO | | NULL | |"
expect -- "| i16 | SMALLINT | NO | | NULL | |"
expect -- "| i256 | TEXT | NO | | NULL | |"
@ -132,74 +132,8 @@ expect -- "| i64 | BIGINT | NO | | NULL | |
expect -- "| i8 | TINYINT | NO | | NULL | |"
expect -- "| ip4 | TEXT | NO | | NULL | |"
expect -- "| ip6 | TEXT | NO | | NULL | |"
expect -- "| lfs | BLOB | NO | | NULL | |"
expect -- "| lnfs | BLOB | YES | | NULL | |"
expect -- "| lns | BLOB | YES | | NULL | |"
expect -- "| ls | BLOB | NO | | NULL | |"
expect -- "| m | JSON | NO | | NULL | |"
expect -- "| m_complex | JSON | NO | | NULL | |"
expect -- "| mpg | TEXT | NO | | NULL | |"
expect -- "| ndt64 | DATETIME | YES | | NULL | |"
expect -- "| ndt64_tz | DATETIME | YES | | NULL | |"
expect -- "| nested.col1 | TEXT | NO | | NULL | |"
expect -- "| nested.col2 | TEXT | NO | | NULL | |"
expect -- "| nfs | BLOB | YES | | NULL | |"
expect -- "| ns | BLOB | YES | | NULL | |"
expect -- "| o | JSON | NO | | NULL | |"
expect -- "| p | TEXT | NO | | NULL | |"
expect -- "| pg | TEXT | NO | | NULL | |"
expect -- "| r | TEXT | NO | | NULL | |"
expect -- "| s | BLOB | NO | | NULL | |"
expect -- "| sagg | TEXT | NO | | NULL | |"
expect -- "| t | JSON | NO | | NULL | |"
expect -- "| ui128 | TEXT | NO | | NULL | |"
expect -- "| ui16 | SMALLINT UNSIGNED | NO | | NULL | |"
expect -- "| ui256 | TEXT | NO | | NULL | |"
expect -- "| ui32 | INTEGER UNSIGNED | NO | | NULL | |"
expect -- "| ui64 | BIGINT UNSIGNED | NO | | NULL | |"
expect -- "| ui8 | TINYINT UNSIGNED | NO | | NULL | |"
expect -- "| uuid | CHAR | NO | | NULL | |"
expect -- "+---------------+-------------------+------+------+---------+-------+"
send -- "SHOW COLUMNS FROM tab SETTINGS mysql_map_string_to_text_in_show_columns=1;\r"
expect -- "+---------------+-------------------+------+------+---------+-------+"
expect -- "| field | type | null | key | default | extra |"
expect -- "+---------------+-------------------+------+------+---------+-------+"
expect -- "| a | TEXT | NO | | NULL | |"
expect -- "| agg | TEXT | NO | | NULL | |"
expect -- "| b | TINYINT | NO | | NULL | |"
expect -- "| d | DATE | NO | | NULL | |"
expect -- "| d32 | DATE | NO | | NULL | |"
expect -- "| dec128 | DECIMAL(38, 2) | NO | | NULL | |"
expect -- "| dec128_native | DECIMAL(35, 30) | NO | | NULL | |"
expect -- "| dec128_text | TEXT | NO | | NULL | |"
expect -- "| dec256 | TEXT | NO | | NULL | |"
expect -- "| dec256_native | DECIMAL(65, 2) | NO | | NULL | |"
expect -- "| dec256_text | TEXT | NO | | NULL | |"
expect -- "| dec32 | DECIMAL(9, 2) | NO | | NULL | |"
expect -- "| dec64 | DECIMAL(18, 2) | NO | | NULL | |"
expect -- "| dt | DATETIME | NO | | NULL | |"
expect -- "| dt64 | DATETIME | NO | | NULL | |"
expect -- "| dt64_3_tz1 | DATETIME | NO | | NULL | |"
expect -- "| dt64_3_tz2 | DATETIME | NO | | NULL | |"
expect -- "| dt64_6 | DATETIME | NO | | NULL | |"
expect -- "| dt64_9 | DATETIME | NO | | NULL | |"
expect -- "| dt_tz1 | DATETIME | NO | | NULL | |"
expect -- "| dt_tz2 | DATETIME | NO | | NULL | |"
expect -- "| enm | TEXT | NO | | NULL | |"
expect -- "| f32 | FLOAT | NO | | NULL | |"
expect -- "| f64 | DOUBLE | NO | | NULL | |"
expect -- "| fs | BLOB | NO | | NULL | |"
expect -- "| i128 | TEXT | NO | | NULL | |"
expect -- "| i16 | SMALLINT | NO | | NULL | |"
expect -- "| i256 | TEXT | NO | | NULL | |"
expect -- "| i32 | INTEGER | NO | | NULL | |"
expect -- "| i64 | BIGINT | NO | | NULL | |"
expect -- "| i8 | TINYINT | NO | | NULL | |"
expect -- "| ip4 | TEXT | NO | | NULL | |"
expect -- "| ip6 | TEXT | NO | | NULL | |"
expect -- "| lfs | BLOB | NO | | NULL | |"
expect -- "| lnfs | BLOB | YES | | NULL | |"
expect -- "| lfs | TEXT | NO | | NULL | |"
expect -- "| lnfs | TEXT | YES | | NULL | |"
expect -- "| lns | TEXT | YES | | NULL | |"
expect -- "| ls | TEXT | NO | | NULL | |"
expect -- "| m | JSON | NO | | NULL | |"
@ -209,7 +143,7 @@ expect -- "| ndt64 | DATETIME | YES | | NULL | |
expect -- "| ndt64_tz | DATETIME | YES | | NULL | |"
expect -- "| nested.col1 | TEXT | NO | | NULL | |"
expect -- "| nested.col2 | TEXT | NO | | NULL | |"
expect -- "| nfs | BLOB | YES | | NULL | |"
expect -- "| nfs | TEXT | YES | | NULL | |"
expect -- "| ns | TEXT | YES | | NULL | |"
expect -- "| o | JSON | NO | | NULL | |"
expect -- "| p | TEXT | NO | | NULL | |"
@ -227,7 +161,7 @@ expect -- "| ui8 | TINYINT UNSIGNED | NO | | NULL | |
expect -- "| uuid | CHAR | NO | | NULL | |"
expect -- "+---------------+-------------------+------+------+---------+-------+"
send -- "SHOW COLUMNS FROM tab SETTINGS mysql_map_fixed_string_to_text_in_show_columns=1;\r"
send -- "SHOW COLUMNS FROM tab SETTINGS mysql_map_string_to_text_in_show_columns=0;\r"
expect -- "+---------------+-------------------+------+------+---------+-------+"
expect -- "| field | type | null | key | default | extra |"
expect -- "+---------------+-------------------+------+------+---------+-------+"
@ -293,6 +227,73 @@ expect -- "| ui8 | TINYINT UNSIGNED | NO | | NULL | |
expect -- "| uuid | CHAR | NO | | NULL | |"
expect -- "+---------------+-------------------+------+------+---------+-------+"
send -- "SHOW COLUMNS FROM tab SETTINGS mysql_map_fixed_string_to_text_in_show_columns=0;\r"
expect -- "+---------------+-------------------+------+------+---------+-------+"
expect -- "| field | type | null | key | default | extra |"
expect -- "+---------------+-------------------+------+------+---------+-------+"
expect -- "| a | TEXT | NO | | NULL | |"
expect -- "| agg | TEXT | NO | | NULL | |"
expect -- "| b | TINYINT | NO | | NULL | |"
expect -- "| d | DATE | NO | | NULL | |"
expect -- "| d32 | DATE | NO | | NULL | |"
expect -- "| dec128 | DECIMAL(38, 2) | NO | | NULL | |"
expect -- "| dec128_native | DECIMAL(35, 30) | NO | | NULL | |"
expect -- "| dec128_text | TEXT | NO | | NULL | |"
expect -- "| dec256 | TEXT | NO | | NULL | |"
expect -- "| dec256_native | DECIMAL(65, 2) | NO | | NULL | |"
expect -- "| dec256_text | TEXT | NO | | NULL | |"
expect -- "| dec32 | DECIMAL(9, 2) | NO | | NULL | |"
expect -- "| dec64 | DECIMAL(18, 2) | NO | | NULL | |"
expect -- "| dt | DATETIME | NO | | NULL | |"
expect -- "| dt64 | DATETIME | NO | | NULL | |"
expect -- "| dt64_3_tz1 | DATETIME | NO | | NULL | |"
expect -- "| dt64_3_tz2 | DATETIME | NO | | NULL | |"
expect -- "| dt64_6 | DATETIME | NO | | NULL | |"
expect -- "| dt64_9 | DATETIME | NO | | NULL | |"
expect -- "| dt_tz1 | DATETIME | NO | | NULL | |"
expect -- "| dt_tz2 | DATETIME | NO | | NULL | |"
expect -- "| enm | TEXT | NO | | NULL | |"
expect -- "| f32 | FLOAT | NO | | NULL | |"
expect -- "| f64 | DOUBLE | NO | | NULL | |"
expect -- "| fs | BLOB | NO | | NULL | |"
expect -- "| i128 | TEXT | NO | | NULL | |"
expect -- "| i16 | SMALLINT | NO | | NULL | |"
expect -- "| i256 | TEXT | NO | | NULL | |"
expect -- "| i32 | INTEGER | NO | | NULL | |"
expect -- "| i64 | BIGINT | NO | | NULL | |"
expect -- "| i8 | TINYINT | NO | | NULL | |"
expect -- "| ip4 | TEXT | NO | | NULL | |"
expect -- "| ip6 | TEXT | NO | | NULL | |"
expect -- "| lfs | BLOB | NO | | NULL | |"
expect -- "| lnfs | BLOB | YES | | NULL | |"
expect -- "| lns | TEXT | YES | | NULL | |"
expect -- "| ls | TEXT | NO | | NULL | |"
expect -- "| m | JSON | NO | | NULL | |"
expect -- "| m_complex | JSON | NO | | NULL | |"
expect -- "| mpg | TEXT | NO | | NULL | |"
expect -- "| ndt64 | DATETIME | YES | | NULL | |"
expect -- "| ndt64_tz | DATETIME | YES | | NULL | |"
expect -- "| nested.col1 | TEXT | NO | | NULL | |"
expect -- "| nested.col2 | TEXT | NO | | NULL | |"
expect -- "| nfs | BLOB | YES | | NULL | |"
expect -- "| ns | TEXT | YES | | NULL | |"
expect -- "| o | JSON | NO | | NULL | |"
expect -- "| p | TEXT | NO | | NULL | |"
expect -- "| pg | TEXT | NO | | NULL | |"
expect -- "| r | TEXT | NO | | NULL | |"
expect -- "| s | TEXT | NO | | NULL | |"
expect -- "| sagg | TEXT | NO | | NULL | |"
expect -- "| t | JSON | NO | | NULL | |"
expect -- "| ui128 | TEXT | NO | | NULL | |"
expect -- "| ui16 | SMALLINT UNSIGNED | NO | | NULL | |"
expect -- "| ui256 | TEXT | NO | | NULL | |"
expect -- "| ui32 | INTEGER UNSIGNED | NO | | NULL | |"
expect -- "| ui64 | BIGINT UNSIGNED | NO | | NULL | |"
expect -- "| ui8 | TINYINT UNSIGNED | NO | | NULL | |"
expect -- "| uuid | CHAR | NO | | NULL | |"
expect -- "+---------------+-------------------+------+------+---------+-------+"
send -- "DROP TABLE tab;"
send -- "quit;\r"

View File

@ -0,0 +1,2 @@
b count()
2 1

View File

@ -0,0 +1,14 @@
#!/usr/bin/env bash
# Tags: no-fasttest
# Tag no-fasttest: requires mysql client
CURDIR=$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)
# shellcheck source=../shell_config.sh
. "$CURDIR"/../shell_config.sh
# Some BI tools which connect to ClickHouse's MySQL port, run queries which succeed only with (the analyzer enabled)
# or (without analyzer and setting prefer_column_name_to_alias = 1). Since the setting is too impactful to enable it
# globally, it is enabled only by the MySQL handler internally as a workaround. Run a query from Bug 56173 to verify.
#
# When the analyzer is the new default, the test and the workaround can be deleted.
${MYSQL_CLIENT} --execute "select a + b as b, count() from (select 1 as a, 1 as b) group by a + b";

View File

@ -0,0 +1,21 @@
[]
[]
[NULL]
[NULL]
[]
[[1,2,4,5]]
[]
[1,4,5]
[]
[]
1000000
999999
[9]
['a','c']
1000000
999999
['1']
[]
['2023-01-01 00:00:00']
['2023-01-01']
['2023-01-01']

View File

@ -0,0 +1,91 @@
DROP TABLE IF EXISTS test_empty;
CREATE TABLE test_empty (a Array(Int64)) engine=MergeTree ORDER BY a;
INSERT INTO test_empty VALUES ([]);
SELECT groupArrayIntersect(*) FROM test_empty;
INSERT INTO test_empty VALUES ([1]);
SELECT groupArrayIntersect(*) FROM test_empty;
DROP TABLE test_empty;
DROP TABLE IF EXISTS test_null;
CREATE TABLE test_null (a Array(Nullable(Int64))) engine=MergeTree ORDER BY a SETTINGS allow_nullable_key=1;
INSERT INTO test_null VALUES ([NULL, NULL]);
SELECT groupArrayIntersect(*) FROM test_null;
INSERT INTO test_null VALUES ([NULL]);
SELECT groupArrayIntersect(*) FROM test_null;
INSERT INTO test_null VALUES ([1,2]);
SELECT groupArrayIntersect(*) FROM test_null;
DROP TABLE test_null;
DROP TABLE IF EXISTS test_nested_arrays;
CREATE TABLE test_nested_arrays (a Array(Array(Int64))) engine=MergeTree ORDER BY a;
INSERT INTO test_nested_arrays VALUES ([[1,2,3,4,5,6], [1,2,4,5]]);
INSERT INTO test_nested_arrays VALUES ([[1,2,4,5]]);
SELECT groupArrayIntersect(*) FROM test_nested_arrays;
INSERT INTO test_nested_arrays VALUES ([[1,4,3,0,5,5,5]]);
SELECT groupArrayIntersect(*) FROM test_nested_arrays;
DROP TABLE test_nested_arrays;
DROP TABLE IF EXISTS test_numbers;
CREATE TABLE test_numbers (a Array(Int64)) engine=MergeTree ORDER BY a;
INSERT INTO test_numbers VALUES ([1,2,3,4,5,6]);
INSERT INTO test_numbers VALUES ([1,2,4,5]);
INSERT INTO test_numbers VALUES ([1,4,3,0,5,5,5]);
SELECT groupArrayIntersect(*) FROM test_numbers;
INSERT INTO test_numbers VALUES ([9]);
SELECT groupArrayIntersect(*) FROM test_numbers;
DROP TABLE test_numbers;
DROP TABLE IF EXISTS test_big_numbers_sep;
CREATE TABLE test_big_numbers_sep (a Array(Int64)) engine=MergeTree ORDER BY a;
INSERT INTO test_big_numbers_sep SELECT array(number) FROM numbers_mt(1000000);
SELECT groupArrayIntersect(*) FROM test_big_numbers_sep;
DROP TABLE test_big_numbers_sep;
DROP TABLE IF EXISTS test_big_numbers;
CREATE TABLE test_big_numbers (a Array(Int64)) engine=MergeTree ORDER BY a;
INSERT INTO test_big_numbers SELECT range(1000000);
SELECT length(groupArrayIntersect(*)) FROM test_big_numbers;
INSERT INTO test_big_numbers SELECT range(999999);
SELECT length(groupArrayIntersect(*)) FROM test_big_numbers;
INSERT INTO test_big_numbers VALUES ([9]);
SELECT groupArrayIntersect(*) FROM test_big_numbers;
DROP TABLE test_big_numbers;
DROP TABLE IF EXISTS test_string;
CREATE TABLE test_string (a Array(String)) engine=MergeTree ORDER BY a;
INSERT INTO test_string VALUES (['a', 'b', 'c', 'd', 'e', 'f']);
INSERT INTO test_string VALUES (['a', 'aa', 'b', 'bb', 'c', 'cc', 'd', 'dd', 'f', 'ff']);
INSERT INTO test_string VALUES (['ae', 'ab', 'a', 'bb', 'c']);
SELECT groupArrayIntersect(*) FROM test_string;
DROP TABLE test_string;
DROP TABLE IF EXISTS test_big_string;
CREATE TABLE test_big_string (a Array(String)) engine=MergeTree ORDER BY a;
INSERT INTO test_big_string SELECT groupArray(toString(number)) FROM numbers_mt(1000000);
SELECT length(groupArrayIntersect(*)) FROM test_big_string;
INSERT INTO test_big_string SELECT groupArray(toString(number)) FROM numbers_mt(999999);
SELECT length(groupArrayIntersect(*)) FROM test_big_string;
INSERT INTO test_big_string VALUES (['1']);
SELECT groupArrayIntersect(*) FROM test_big_string;
INSERT INTO test_big_string VALUES (['a']);
SELECT groupArrayIntersect(*) FROM test_big_string;
DROP TABLE test_big_string;
DROP TABLE IF EXISTS test_datetime;
CREATE TABLE test_datetime (a Array(DateTime)) engine=MergeTree ORDER BY a;
INSERT INTO test_datetime VALUES ([toDateTime('2023-01-01 00:00:00'), toDateTime('2023-01-01 01:02:03'), toDateTime('2023-01-01 02:03:04')]);
INSERT INTO test_datetime VALUES ([toDateTime('2023-01-01 00:00:00'), toDateTime('2023-01-01 01:02:04'), toDateTime('2023-01-01 02:03:05')]);
SELECT groupArrayIntersect(*) from test_datetime;
DROP TABLE test_datetime;
DROP TABLE IF EXISTS test_date32;
CREATE TABLE test_date32 (a Array(Date32)) engine=MergeTree ORDER BY a;
INSERT INTO test_date32 VALUES ([toDate32('2023-01-01 00:00:00'), toDate32('2023-01-01 00:00:01')]);
SELECT groupArrayIntersect(*) from test_date32;
DROP TABLE test_date32;
DROP TABLE IF EXISTS test_date;
CREATE TABLE test_date (a Array(Date)) engine=MergeTree ORDER BY a;
INSERT INTO test_date VALUES ([toDate('2023-01-01 00:00:00'), toDate('2023-01-01 00:00:01')]);
SELECT groupArrayIntersect(*) from test_date;
DROP TABLE test_date;

View File

@ -0,0 +1,2 @@
2024-02-20 16:53:57.105
2024-02-21 12:00:00.000

View File

@ -0,0 +1,5 @@
-- Tags: no-fasttest
SET session_timezone='Europe/Madrid'; -- disable time zone randomization in CI
SELECT if(length(x) = 26, ULIDStringToDateTime(x, 'Europe/Madrid'), toDateTime('2024-02-21 12:00:00', 'Europe/Madrid')) AS datetime
FROM values('x String', '01HQ3KJJKHRWP357YVYBX32WHY', '01HQ3KJJKH')

View File

@ -23,7 +23,7 @@ static void checkByCompressedReadBuffer(const std::string & mrk_path, const std:
DB::CompressedReadBufferFromFile bin_in(DB::createReadBufferFromFileBase(bin_path, /* settings= */ {}));
DB::WriteBufferFromFileDescriptor out(STDOUT_FILENO);
bool mrk2_format = boost::algorithm::ends_with(mrk_path, ".mrk2");
bool mrk2_format = mrk_path.ends_with(".mrk2");
for (size_t mark_num = 0; !mrk_in.eof(); ++mark_num)
{

View File

@ -1615,6 +1615,8 @@ greaterorequals
greenspace
groupArray
groupArrayInsertAt
grouparrayintersect
groupArrayIntersect
groupArrayLast
groupArrayMovingAvg
groupArrayMovingSum
@ -1742,6 +1744,7 @@ isValidJSON
isValidUTF
isZeroOrNull
iteratively
iTerm
jaccard
jaccardIndex
jaroSimilarity
@ -2314,6 +2317,7 @@ shardNum
sharded
sharding
shortcircuit
Shortkeys
shortkeys
shoutout
simdjson