Merge branch 'master' of github.com:ClickHouse/ClickHouse into divanik/generate_series_function

This commit is contained in:
divanik 2024-03-14 11:41:14 +00:00
commit 9c77aa9510
155 changed files with 2609 additions and 930 deletions

View File

@ -108,16 +108,22 @@
{
[[noreturn]] void abortOnFailedAssertion(const String & description);
}
#define chassert(x) do { static_cast<bool>(x) ? void(0) : ::DB::abortOnFailedAssertion(#x); } while (0)
#define chassert_1(x, ...) do { static_cast<bool>(x) ? void(0) : ::DB::abortOnFailedAssertion(#x); } while (0)
#define chassert_2(x, comment, ...) do { static_cast<bool>(x) ? void(0) : ::DB::abortOnFailedAssertion(comment); } while (0)
#define UNREACHABLE() abort()
// clang-format off
#else
/// Here sizeof() trick is used to suppress unused warning for result,
/// since simple "(void)x" will evaluate the expression, while
/// "sizeof(!(x))" will not.
#define chassert(x) (void)sizeof(!(x))
#define chassert_1(x, ...) (void)sizeof(!(x))
#define chassert_2(x, comment, ...) (void)sizeof(!(x))
#define UNREACHABLE() __builtin_unreachable()
#endif
#define CHASSERT_DISPATCH(_1,_2, N,...) N(_1, _2)
#define CHASSERT_INVOKE(tuple) CHASSERT_DISPATCH tuple
#define chassert(...) CHASSERT_INVOKE((__VA_ARGS__, chassert_2, chassert_1))
#endif
/// Macros for Clang Thread Safety Analysis (TSA). They can be safely ignored by other compilers.

View File

@ -93,7 +93,7 @@ void TCPServerDispatcher::release()
void TCPServerDispatcher::run()
{
AutoPtr<TCPServerDispatcher> guard(this, true); // ensure object stays alive
AutoPtr<TCPServerDispatcher> guard(this); // ensure object stays alive
int idleTime = (int) _pParams->getThreadIdleTime().totalMilliseconds();
@ -149,11 +149,13 @@ void TCPServerDispatcher::enqueue(const StreamSocket& socket)
{
try
{
this->duplicate();
_threadPool.startWithPriority(_pParams->getThreadPriority(), *this, threadName);
++_currentThreads;
}
catch (Poco::Exception& exc)
{
this->release();
++_refusedConnections;
std::cerr << "Got exception while starting thread for connection. Error code: "
<< exc.code() << ", message: '" << exc.displayText() << "'" << std::endl;

2
contrib/curl vendored

@ -1 +1 @@
Subproject commit 5ce164e0e9290c96eb7d502173426c0a135ec008
Subproject commit 1a05e833f8f7140628b27882b10525fd9ec4b873

View File

@ -78,8 +78,8 @@ It is recommended to use official pre-compiled `deb` packages for Debian or Ubun
#### Setup the Debian repository
``` bash
sudo apt-get install -y apt-transport-https ca-certificates dirmngr
sudo gpg --no-default-keyring --keyring /usr/share/keyrings/clickhouse-keyring.gpg --keyserver hkp://keyserver.ubuntu.com:80 --recv-keys 8919F6BD2B48D754
sudo apt-get install -y apt-transport-https ca-certificates curl gnupg
curl -fsSL 'https://packages.clickhouse.com/rpm/lts/repodata/repomd.xml.key' | sudo gpg --dearmor -o /usr/share/keyrings/clickhouse-keyring.gpg
echo "deb [signed-by=/usr/share/keyrings/clickhouse-keyring.gpg] https://packages.clickhouse.com/deb stable main" | sudo tee \
/etc/apt/sources.list.d/clickhouse.list

View File

@ -16,7 +16,9 @@ ClickHouse also supports:
## NULL Processing
During aggregation, all `NULL`s are skipped. If the aggregation has several parameters it will ignore any row in which one or more of the parameters are NULL.
During aggregation, all `NULL` arguments are skipped. If the aggregation has several arguments it will ignore any row in which one or more of them are NULL.
There is an exception to this rule, which are the functions [`first_value`](../../sql-reference/aggregate-functions/reference/first_value.md), [`last_value`](../../sql-reference/aggregate-functions/reference/last_value.md) and their aliases when followed by the modifier `RESPECT NULLS`: `FIRST_VALUE(b) RESPECT NULLS`.
**Examples:**
@ -85,3 +87,50 @@ FROM t_null_big;
│ [2,2,3] │ [2,NULL,2,3,NULL] │
└───────────────┴───────────────────────────────────────┘
```
Note that aggregations are skipped when the columns are used as arguments to an aggregated function. For example [`count`](../../sql-reference/aggregate-functions/reference/count.md) without parameters (`count()`) or with constant ones (`count(1)`) will count all rows in the block (independently of the value of the GROUP BY column as it's not an argument), while `count(column)` will only return the number of rows where column is not NULL.
```sql
SELECT
v,
count(1),
count(v)
FROM
(
SELECT if(number < 10, NULL, number % 3) AS v
FROM numbers(15)
)
GROUP BY v
┌────v─┬─count()─┬─count(v)─┐
│ ᴺᵁᴸᴸ │ 10 │ 0 │
│ 0 │ 1 │ 1 │
│ 1 │ 2 │ 2 │
│ 2 │ 2 │ 2 │
└──────┴─────────┴──────────┘
```
And here is an example of of first_value with `RESPECT NULLS` where we can see that NULL inputs are respected and it will return the first value read, whether it's NULL or not:
```sql
SELECT
col || '_' || ((col + 1) * 5 - 1) as range,
first_value(odd_or_null) as first,
first_value(odd_or_null) IGNORE NULLS as first_ignore_null,
first_value(odd_or_null) RESPECT NULLS as first_respect_nulls
FROM
(
SELECT
intDiv(number, 5) AS col,
if(number % 2 == 0, NULL, number) as odd_or_null
FROM numbers(15)
)
GROUP BY col
ORDER BY col
┌─range─┬─first─┬─first_ignore_null─┬─first_respect_nulls─┐
│ 0_4 │ 1 │ 1 │ ᴺᵁᴸᴸ │
│ 1_9 │ 5 │ 5 │ 5 │
│ 2_14 │ 11 │ 11 │ ᴺᵁᴸᴸ │
└───────┴───────┴───────────────────┴─────────────────────┘
```

View File

@ -1,16 +1,99 @@
---
slug: /en/sql-reference/aggregate-functions/reference/varpop
title: "varPop"
slug: "/en/sql-reference/aggregate-functions/reference/varpop"
sidebar_position: 32
---
# varPop(x)
This page covers the `varPop` and `varPopStable` functions available in ClickHouse.
Calculates the amount `Σ((x - x̅)^2) / n`, where `n` is the sample size and `x̅`is the average value of `x`.
## varPop
In other words, dispersion for a set of values. Returns `Float64`.
Calculates the population covariance between two data columns. The population covariance measures the degree to which two variables vary together. Calculates the amount `Σ((x - x̅)^2) / n`, where `n` is the sample size and `x̅`is the average value of `x`.
Alias: `VAR_POP`.
**Syntax**
:::note
This function uses a numerically unstable algorithm. If you need [numerical stability](https://en.wikipedia.org/wiki/Numerical_stability) in calculations, use the `varPopStable` function. It works slower but provides a lower computational error.
:::
```sql
covarPop(x, y)
```
**Parameters**
- `x`: The first data column. [Numeric](../../../native-protocol/columns.md)
- `y`: The second data column. [Numeric](../../../native-protocol/columns.md)
**Returned value**
Returns an integer of type `Float64`.
**Implementation details**
This function uses a numerically unstable algorithm. If you need numerical stability in calculations, use the slower but more stable [`varPopStable` function](#varPopStable).
**Example**
```sql
DROP TABLE IF EXISTS test_data;
CREATE TABLE test_data
(
x Int32,
y Int32
)
ENGINE = Memory;
INSERT INTO test_data VALUES (1, 2), (2, 3), (3, 5), (4, 6), (5, 8);
SELECT
covarPop(x, y) AS covar_pop
FROM test_data;
```
```response
3
```
## varPopStable
Calculates population covariance between two data columns using a stable, numerically accurate method to calculate the variance. This function is designed to provide reliable results even with large datasets or values that might cause numerical instability in other implementations.
**Syntax**
```sql
covarPopStable(x, y)
```
**Parameters**
- `x`: The first data column. [String literal](../syntax#syntax-string-literal)
- `y`: The second data column. [Expression](../syntax#syntax-expressions)
**Returned value**
Returns an integer of type `Float64`.
**Implementation details**
Unlike [`varPop()`](#varPop), this function uses a stable, numerically accurate algorithm to calculate the population variance to avoid issues like catastrophic cancellation or loss of precision. This function also handles `NaN` and `Inf` values correctly, excluding them from calculations.
**Example**
Query:
```sql
DROP TABLE IF EXISTS test_data;
CREATE TABLE test_data
(
x Int32,
y Int32
)
ENGINE = Memory;
INSERT INTO test_data VALUES (1, 2), (2, 9), (9, 5), (4, 6), (5, 8);
SELECT
covarPopStable(x, y) AS covar_pop_stable
FROM test_data;
```
```response
0.5999999999999999
```

View File

@ -1,18 +1,128 @@
---
title: "varSamp"
slug: /en/sql-reference/aggregate-functions/reference/varsamp
sidebar_position: 33
---
# varSamp
This page contains information on the `varSamp` and `varSampStable` ClickHouse functions.
Calculates the amount `Σ((x - x̅)^2) / (n - 1)`, where `n` is the sample size and `x̅`is the average value of `x`.
## varSamp
It represents an unbiased estimate of the variance of a random variable if passed values from its sample.
Calculate the sample variance of a data set.
Returns `Float64`. When `n <= 1`, returns `+∞`.
**Syntax**
Alias: `VAR_SAMP`.
```sql
varSamp(expr)
```
:::note
This function uses a numerically unstable algorithm. If you need [numerical stability](https://en.wikipedia.org/wiki/Numerical_stability) in calculations, use the `varSampStable` function. It works slower but provides a lower computational error.
:::
**Parameters**
- `expr`: An expression representing the data set for which you want to calculate the sample variance. [Expression](../syntax#syntax-expressions)
**Returned value**
Returns a Float64 value representing the sample variance of the input data set.
**Implementation details**
The `varSamp()` function calculates the sample variance using the following formula:
```plaintext
∑(x - mean(x))^2 / (n - 1)
```
Where:
- `x` is each individual data point in the data set.
- `mean(x)` is the arithmetic mean of the data set.
- `n` is the number of data points in the data set.
The function assumes that the input data set represents a sample from a larger population. If you want to calculate the variance of the entire population (when you have the complete data set), you should use the [`varPop()` function](./varpop#varpop) instead.
This function uses a numerically unstable algorithm. If you need numerical stability in calculations, use the slower but more stable [`varSampStable` function](#varSampStable).
**Example**
Query:
```sql
CREATE TABLE example_table
(
id UInt64,
value Float64
)
ENGINE = MergeTree
ORDER BY id;
INSERT INTO example_table VALUES (1, 10.5), (2, 12.3), (3, 9.8), (4, 11.2), (5, 10.7);
SELECT varSamp(value) FROM example_table;
```
Response:
```response
0.8650000000000091
```
## varSampStable
Calculate the sample variance of a data set using a numerically stable algorithm.
**Syntax**
```sql
varSampStable(expr)
```
**Parameters**
- `expr`: An expression representing the data set for which you want to calculate the sample variance. [Expression](../syntax#syntax-expressions)
**Returned value**
The `varSampStable()` function returns a Float64 value representing the sample variance of the input data set.
**Implementation details**
The `varSampStable()` function calculates the sample variance using the same formula as the [`varSamp()`](#varSamp function):
```plaintext
∑(x - mean(x))^2 / (n - 1)
```
Where:
- `x` is each individual data point in the data set.
- `mean(x)` is the arithmetic mean of the data set.
- `n` is the number of data points in the data set.
The difference between `varSampStable()` and `varSamp()` is that `varSampStable()` is designed to provide a more deterministic and stable result when dealing with floating-point arithmetic. It uses an algorithm that minimizes the accumulation of rounding errors, which can be particularly important when dealing with large data sets or data with a wide range of values.
Like `varSamp()`, the `varSampStable()` function assumes that the input data set represents a sample from a larger population. If you want to calculate the variance of the entire population (when you have the complete data set), you should use the [`varPopStable()` function](./varpop#varpopstable) instead.
**Example**
Query:
```sql
CREATE TABLE example_table
(
id UInt64,
value Float64
)
ENGINE = MergeTree
ORDER BY id;
INSERT INTO example_table VALUES (1, 10.5), (2, 12.3), (3, 9.8), (4, 11.2), (5, 10.7);
SELECT varSampStable(value) FROM example_table;
```
Response:
```response
0.865
```
This query calculates the sample variance of the `value` column in the `example_table` using the `varSampStable()` function. The result shows that the sample variance of the values `[10.5, 12.3, 9.8, 11.2, 10.7]` is approximately 0.865, which may differ slightly from the result of `varSamp()` due to the more precise handling of floating-point arithmetic.

View File

@ -10,6 +10,8 @@ sidebar_label: Nullable
Returns whether the argument is [NULL](../../sql-reference/syntax.md#null).
See also operator [`IS NULL`](../operators/index.md#is_null).
``` sql
isNull(x)
```
@ -54,6 +56,8 @@ Result:
Returns whether the argument is not [NULL](../../sql-reference/syntax.md#null-literal).
See also operator [`IS NOT NULL`](../operators/index.md#is_not_null).
``` sql
isNotNull(x)
```

View File

@ -5,80 +5,372 @@ sidebar_label: JSON
---
There are two sets of functions to parse JSON.
- `visitParam*` (`simpleJSON*`) is made to parse a special very limited subset of a JSON, but these functions are extremely fast.
- `simpleJSON*` (`visitParam*`) is made to parse a special very limited subset of a JSON, but these functions are extremely fast.
- `JSONExtract*` is made to parse normal JSON.
# visitParam functions
# simpleJSON/visitParam functions
ClickHouse has special functions for working with simplified JSON. All these JSON functions are based on strong assumptions about what the JSON can be, but they try to do as little as possible to get the job done.
The following assumptions are made:
1. The field name (function argument) must be a constant.
2. The field name is somehow canonically encoded in JSON. For example: `visitParamHas('{"abc":"def"}', 'abc') = 1`, but `visitParamHas('{"\\u0061\\u0062\\u0063":"def"}', 'abc') = 0`
2. The field name is somehow canonically encoded in JSON. For example: `simpleJSONHas('{"abc":"def"}', 'abc') = 1`, but `simpleJSONHas('{"\\u0061\\u0062\\u0063":"def"}', 'abc') = 0`
3. Fields are searched for on any nesting level, indiscriminately. If there are multiple matching fields, the first occurrence is used.
4. The JSON does not have space characters outside of string literals.
## visitParamHas(params, name)
## simpleJSONHas
Checks whether there is a field with the `name` name.
Checks whether there is a field named `field_name`. The result is `UInt8`.
Alias: `simpleJSONHas`.
**Syntax**
## visitParamExtractUInt(params, name)
Parses UInt64 from the value of the field named `name`. If this is a string field, it tries to parse a number from the beginning of the string. If the field does not exist, or it exists but does not contain a number, it returns 0.
Alias: `simpleJSONExtractUInt`.
## visitParamExtractInt(params, name)
The same as for Int64.
Alias: `simpleJSONExtractInt`.
## visitParamExtractFloat(params, name)
The same as for Float64.
Alias: `simpleJSONExtractFloat`.
## visitParamExtractBool(params, name)
Parses a true/false value. The result is UInt8.
Alias: `simpleJSONExtractBool`.
## visitParamExtractRaw(params, name)
Returns the value of a field, including separators.
Alias: `simpleJSONExtractRaw`.
Examples:
``` sql
visitParamExtractRaw('{"abc":"\\n\\u0000"}', 'abc') = '"\\n\\u0000"';
visitParamExtractRaw('{"abc":{"def":[1,2,3]}}', 'abc') = '{"def":[1,2,3]}';
```sql
simpleJSONHas(json, field_name)
```
## visitParamExtractString(params, name)
**Parameters**
Parses the string in double quotes. The value is unescaped. If unescaping failed, it returns an empty string.
- `json`: The JSON in which the field is searched for. [String](../../sql-reference/data-types/string.md#string)
- `field_name`: The name of the field to search for. [String literal](../syntax#string)
Alias: `simpleJSONExtractString`.
**Returned value**
Examples:
It returns `1` if the field exists, `0` otherwise.
``` sql
visitParamExtractString('{"abc":"\\n\\u0000"}', 'abc') = '\n\0';
visitParamExtractString('{"abc":"\\u263a"}', 'abc') = '☺';
visitParamExtractString('{"abc":"\\u263"}', 'abc') = '';
visitParamExtractString('{"abc":"hello}', 'abc') = '';
**Example**
Query:
```sql
CREATE TABLE jsons
(
`json` String
)
ENGINE = Memory;
INSERT INTO jsons VALUES ('{"foo":"true","qux":1}');
SELECT simpleJSONHas(json, 'foo') FROM jsons;
SELECT simpleJSONHas(json, 'bar') FROM jsons;
```
```response
1
0
```
## simpleJSONExtractUInt
Parses `UInt64` from the value of the field named `field_name`. If this is a string field, it tries to parse a number from the beginning of the string. If the field does not exist, or it exists but does not contain a number, it returns `0`.
**Syntax**
```sql
simpleJSONExtractUInt(json, field_name)
```
**Parameters**
- `json`: The JSON in which the field is searched for. [String](../../sql-reference/data-types/string.md#string)
- `field_name`: The name of the field to search for. [String literal](../syntax#string)
**Returned value**
It returns the number parsed from the field if the field exists and contains a number, `0` otherwise.
**Example**
Query:
```sql
CREATE TABLE jsons
(
`json` String
)
ENGINE = Memory;
INSERT INTO jsons VALUES ('{"foo":"4e3"}');
INSERT INTO jsons VALUES ('{"foo":3.4}');
INSERT INTO jsons VALUES ('{"foo":5}');
INSERT INTO jsons VALUES ('{"foo":"not1number"}');
INSERT INTO jsons VALUES ('{"baz":2}');
SELECT simpleJSONExtractUInt(json, 'foo') FROM jsons ORDER BY json;
```
```response
0
4
0
3
5
```
## simpleJSONExtractInt
Parses `Int64` from the value of the field named `field_name`. If this is a string field, it tries to parse a number from the beginning of the string. If the field does not exist, or it exists but does not contain a number, it returns `0`.
**Syntax**
```sql
simpleJSONExtractInt(json, field_name)
```
**Parameters**
- `json`: The JSON in which the field is searched for. [String](../../sql-reference/data-types/string.md#string)
- `field_name`: The name of the field to search for. [String literal](../syntax#string)
**Returned value**
It returns the number parsed from the field if the field exists and contains a number, `0` otherwise.
**Example**
Query:
```sql
CREATE TABLE jsons
(
`json` String
)
ENGINE = Memory;
INSERT INTO jsons VALUES ('{"foo":"-4e3"}');
INSERT INTO jsons VALUES ('{"foo":-3.4}');
INSERT INTO jsons VALUES ('{"foo":5}');
INSERT INTO jsons VALUES ('{"foo":"not1number"}');
INSERT INTO jsons VALUES ('{"baz":2}');
SELECT simpleJSONExtractInt(json, 'foo') FROM jsons ORDER BY json;
```
```response
0
-4
0
-3
5
```
## simpleJSONExtractFloat
Parses `Float64` from the value of the field named `field_name`. If this is a string field, it tries to parse a number from the beginning of the string. If the field does not exist, or it exists but does not contain a number, it returns `0`.
**Syntax**
```sql
simpleJSONExtractFloat(json, field_name)
```
**Parameters**
- `json`: The JSON in which the field is searched for. [String](../../sql-reference/data-types/string.md#string)
- `field_name`: The name of the field to search for. [String literal](../syntax#string)
**Returned value**
It returns the number parsed from the field if the field exists and contains a number, `0` otherwise.
**Example**
Query:
```sql
CREATE TABLE jsons
(
`json` String
)
ENGINE = Memory;
INSERT INTO jsons VALUES ('{"foo":"-4e3"}');
INSERT INTO jsons VALUES ('{"foo":-3.4}');
INSERT INTO jsons VALUES ('{"foo":5}');
INSERT INTO jsons VALUES ('{"foo":"not1number"}');
INSERT INTO jsons VALUES ('{"baz":2}');
SELECT simpleJSONExtractFloat(json, 'foo') FROM jsons ORDER BY json;
```
```response
0
-4000
0
-3.4
5
```
## simpleJSONExtractBool
Parses a true/false value from the value of the field named `field_name`. The result is `UInt8`.
**Syntax**
```sql
simpleJSONExtractBool(json, field_name)
```
**Parameters**
- `json`: The JSON in which the field is searched for. [String](../../sql-reference/data-types/string.md#string)
- `field_name`: The name of the field to search for. [String literal](../syntax#string)
**Returned value**
It returns `1` if the value of the field is `true`, `0` otherwise. This means this function will return `0` including (and not only) in the following cases:
- If the field doesn't exists.
- If the field contains `true` as a string, e.g.: `{"field":"true"}`.
- If the field contains `1` as a numerical value.
**Example**
Query:
```sql
CREATE TABLE jsons
(
`json` String
)
ENGINE = Memory;
INSERT INTO jsons VALUES ('{"foo":false,"bar":true}');
INSERT INTO jsons VALUES ('{"foo":"true","qux":1}');
SELECT simpleJSONExtractBool(json, 'bar') FROM jsons ORDER BY json;
SELECT simpleJSONExtractBool(json, 'foo') FROM jsons ORDER BY json;
```
```response
0
1
0
0
```
## simpleJSONExtractRaw
Returns the value of the field named `field_name` as a `String`, including separators.
**Syntax**
```sql
simpleJSONExtractRaw(json, field_name)
```
**Parameters**
- `json`: The JSON in which the field is searched for. [String](../../sql-reference/data-types/string.md#string)
- `field_name`: The name of the field to search for. [String literal](../syntax#string)
**Returned value**
It returns the value of the field as a [`String`](../../sql-reference/data-types/string.md#string), including separators if the field exists, or an empty `String` otherwise.
**Example**
Query:
```sql
CREATE TABLE jsons
(
`json` String
)
ENGINE = Memory;
INSERT INTO jsons VALUES ('{"foo":"-4e3"}');
INSERT INTO jsons VALUES ('{"foo":-3.4}');
INSERT INTO jsons VALUES ('{"foo":5}');
INSERT INTO jsons VALUES ('{"foo":{"def":[1,2,3]}}');
INSERT INTO jsons VALUES ('{"baz":2}');
SELECT simpleJSONExtractRaw(json, 'foo') FROM jsons ORDER BY json;
```
```response
"-4e3"
-3.4
5
{"def":[1,2,3]}
```
## simpleJSONExtractString
Parses `String` in double quotes from the value of the field named `field_name`.
**Syntax**
```sql
simpleJSONExtractString(json, field_name)
```
**Parameters**
- `json`: The JSON in which the field is searched for. [String](../../sql-reference/data-types/string.md#string)
- `field_name`: The name of the field to search for. [String literal](../syntax#string)
**Returned value**
It returns the value of a field as a [`String`](../../sql-reference/data-types/string.md#string), including separators. The value is unescaped. It returns an empty `String`: if the field doesn't contain a double quoted string, if unescaping fails or if the field doesn't exist.
**Implementation details**
There is currently no support for code points in the format `\uXXXX\uYYYY` that are not from the basic multilingual plane (they are converted to CESU-8 instead of UTF-8).
**Example**
Query:
```sql
CREATE TABLE jsons
(
`json` String
)
ENGINE = Memory;
INSERT INTO jsons VALUES ('{"foo":"\\n\\u0000"}');
INSERT INTO jsons VALUES ('{"foo":"\\u263"}');
INSERT INTO jsons VALUES ('{"foo":"\\u263a"}');
INSERT INTO jsons VALUES ('{"foo":"hello}');
SELECT simpleJSONExtractString(json, 'foo') FROM jsons ORDER BY json;
```
```response
\n\0
```
## visitParamHas
This function is [an alias of `simpleJSONHas`](./json-functions#simplejsonhas).
## visitParamExtractUInt
This function is [an alias of `simpleJSONExtractUInt`](./json-functions#simplejsonextractuint).
## visitParamExtractInt
This function is [an alias of `simpleJSONExtractInt`](./json-functions#simplejsonextractint).
## visitParamExtractFloat
This function is [an alias of `simpleJSONExtractFloat`](./json-functions#simplejsonextractfloat).
## visitParamExtractBool
This function is [an alias of `simpleJSONExtractBool`](./json-functions#simplejsonextractbool).
## visitParamExtractRaw
This function is [an alias of `simpleJSONExtractRaw`](./json-functions#simplejsonextractraw).
## visitParamExtractString
This function is [an alias of `simpleJSONExtractString`](./json-functions#simplejsonextractstring).
# JSONExtract functions
The following functions are based on [simdjson](https://github.com/lemire/simdjson) designed for more complex JSON parsing requirements.

View File

@ -299,6 +299,18 @@ sin(x)
Type: [Float*](../../sql-reference/data-types/float.md).
**Example**
Query:
```sql
SELECT sin(1.23);
```
```response
0.9424888019316975
```
## cos
Returns the cosine of the argument.

File diff suppressed because it is too large Load Diff

View File

@ -588,8 +588,41 @@ Result:
## substringUTF8
Like `substring` but for Unicode code points. Assumes that the string contains valid UTF-8 encoded text. If this assumption is violated, no exception is thrown and the result is undefined.
Returns the substring of a string `s` which starts at the specified byte index `offset` for Unicode code points. Byte counting starts from `1`. If `offset` is `0`, an empty string is returned. If `offset` is negative, the substring starts `pos` characters from the end of the string, rather than from the beginning. An optional argument `length` specifies the maximum number of bytes the returned substring may have.
Assumes that the string contains valid UTF-8 encoded text. If this assumption is violated, no exception is thrown and the result is undefined.
**Syntax**
```sql
substringUTF8(s, offset[, length])
```
**Arguments**
- `s`: The string to calculate a substring from. [String](../../sql-reference/data-types/string.md), [FixedString](../../sql-reference/data-types/fixedstring.md) or [Enum](../../sql-reference/data-types/enum.md)
- `offset`: The starting position of the substring in `s` . [(U)Int*](../../sql-reference/data-types/int-uint.md).
- `length`: The maximum length of the substring. [(U)Int*](../../sql-reference/data-types/int-uint.md). Optional.
**Returned value**
A substring of `s` with `length` many bytes, starting at index `offset`.
**Implementation details**
Assumes that the string contains valid UTF-8 encoded text. If this assumption is violated, no exception is thrown and the result is undefined.
**Example**
```sql
SELECT 'Täglich grüßt das Murmeltier.' AS str,
substringUTF8(str, 9),
substringUTF8(str, 9, 5)
```
```response
Täglich grüßt das Murmeltier. grüßt das Murmeltier. grüßt
```
## substringIndex
@ -624,7 +657,39 @@ Result:
## substringIndexUTF8
Like `substringIndex` but for Unicode code points. Assumes that the string contains valid UTF-8 encoded text. If this assumption is violated, no exception is thrown and the result is undefined.
Returns the substring of `s` before `count` occurrences of the delimiter `delim`, specifically for Unicode code points.
Assumes that the string contains valid UTF-8 encoded text. If this assumption is violated, no exception is thrown and the result is undefined.
**Syntax**
```sql
substringIndexUTF8(s, delim, count)
```
**Arguments**
- `s`: The string to extract substring from. [String](../../sql-reference/data-types/string.md).
- `delim`: The character to split. [String](../../sql-reference/data-types/string.md).
- `count`: The number of occurrences of the delimiter to count before extracting the substring. If count is positive, everything to the left of the final delimiter (counting from the left) is returned. If count is negative, everything to the right of the final delimiter (counting from the right) is returned. [UInt or Int](../data-types/int-uint.md)
**Returned value**
A substring [String](../../sql-reference/data-types/string.md) of `s` before `count` occurrences of `delim`.
**Implementation details**
Assumes that the string contains valid UTF-8 encoded text. If this assumption is violated, no exception is thrown and the result is undefined.
**Example**
```sql
SELECT substringIndexUTF8('www.straßen-in-europa.de', '.', 2)
```
```response
www.straßen-in-europa
```
## appendTrailingCharIfAbsent

View File

@ -353,7 +353,7 @@ For efficiency, the `and` and `or` functions accept any number of arguments. The
ClickHouse supports the `IS NULL` and `IS NOT NULL` operators.
### IS NULL
### IS NULL {#is_null}
- For [Nullable](../../sql-reference/data-types/nullable.md) type values, the `IS NULL` operator returns:
- `1`, if the value is `NULL`.
@ -374,7 +374,7 @@ SELECT x+100 FROM t_null WHERE y IS NULL
└──────────────┘
```
### IS NOT NULL
### IS NOT NULL {#is_not_null}
- For [Nullable](../../sql-reference/data-types/nullable.md) type values, the `IS NOT NULL` operator returns:
- `0`, if the value is `NULL`.

View File

@ -5,7 +5,7 @@ sidebar_label: cluster
title: "cluster, clusterAllReplicas"
---
Allows to access all shards in an existing cluster which configured in `remote_servers` section without creating a [Distributed](../../engines/table-engines/special/distributed.md) table. One replica of each shard is queried.
Allows to access all shards (configured in the `remote_servers` section) of a cluster without creating a [Distributed](../../engines/table-engines/special/distributed.md) table. Only one replica of each shard is queried.
`clusterAllReplicas` function — same as `cluster`, but all replicas are queried. Each replica in a cluster is used as a separate shard/connection.

View File

@ -50,6 +50,7 @@
#include <Functions/registerFunctions.h>
#include <AggregateFunctions/registerAggregateFunctions.h>
#include <Formats/registerFormats.h>
#include <Formats/FormatFactory.h>
namespace fs = std::filesystem;
using namespace std::literals;
@ -1137,6 +1138,13 @@ void Client::processOptions(const OptionsDescription & options_description,
}
static bool checkIfStdoutIsRegularFile()
{
struct stat file_stat;
return fstat(STDOUT_FILENO, &file_stat) == 0 && S_ISREG(file_stat.st_mode);
}
void Client::processConfig()
{
if (!queries.empty() && config().has("queries-file"))
@ -1173,7 +1181,14 @@ void Client::processConfig()
pager = config().getString("pager", "");
is_default_format = !config().has("vertical") && !config().has("format");
if (config().has("vertical"))
if (is_default_format && checkIfStdoutIsRegularFile())
{
is_default_format = false;
std::optional<String> format_from_file_name;
format_from_file_name = FormatFactory::instance().tryGetFormatFromFileDescriptor(STDOUT_FILENO);
format = format_from_file_name ? *format_from_file_name : "TabSeparated";
}
else if (config().has("vertical"))
format = config().getString("format", "Vertical");
else
format = config().getString("format", is_interactive ? "PrettyCompact" : "TabSeparated");

View File

@ -327,6 +327,14 @@ static bool checkIfStdinIsRegularFile()
return fstat(STDIN_FILENO, &file_stat) == 0 && S_ISREG(file_stat.st_mode);
}
static bool checkIfStdoutIsRegularFile()
{
struct stat file_stat;
return fstat(STDOUT_FILENO, &file_stat) == 0 && S_ISREG(file_stat.st_mode);
}
std::string LocalServer::getInitialCreateTableQuery()
{
if (!config().has("table-structure") && !config().has("table-file") && !config().has("table-data-format") && (!checkIfStdinIsRegularFile() || queries.empty()))
@ -638,7 +646,14 @@ void LocalServer::processConfig()
if (config().has("macros"))
global_context->setMacros(std::make_unique<Macros>(config(), "macros", log));
format = config().getString("output-format", config().getString("format", is_interactive ? "PrettyCompact" : "TSV"));
if (!config().has("output-format") && !config().has("format") && checkIfStdoutIsRegularFile())
{
std::optional<String> format_from_file_name;
format_from_file_name = FormatFactory::instance().tryGetFormatFromFileDescriptor(STDOUT_FILENO);
format = format_from_file_name ? *format_from_file_name : "TSV";
}
else
format = config().getString("output-format", config().getString("format", is_interactive ? "PrettyCompact" : "TSV"));
insert_format = "Values";
/// Setting value from cmd arg overrides one from config

View File

@ -182,11 +182,14 @@ public:
if constexpr (Trait::sampler == Sampler::NONE)
{
if (limit_num_elems && cur_elems.value.size() >= max_elems)
if constexpr (limit_num_elems)
{
if constexpr (Trait::last)
cur_elems.value[(cur_elems.total_values - 1) % max_elems] = row_value;
return;
if (cur_elems.value.size() >= max_elems)
{
if constexpr (Trait::last)
cur_elems.value[(cur_elems.total_values - 1) % max_elems] = row_value;
return;
}
}
cur_elems.value.push_back(row_value, arena);
@ -236,7 +239,7 @@ public:
void mergeNoSampler(Data & cur_elems, const Data & rhs_elems, Arena * arena) const
{
if (!limit_num_elems)
if constexpr (!limit_num_elems)
{
if (rhs_elems.value.size())
cur_elems.value.insertByOffsets(rhs_elems.value, 0, rhs_elems.value.size(), arena);

View File

@ -310,9 +310,11 @@ public:
{
for (Field & element : values)
{
UInt8 is_null = 0;
readBinary(is_null, buf);
if (!is_null)
/// We must initialize the Field type since some internal functions (like operator=) use them
new (&element) Field;
bool has_value = false;
readBinary(has_value, buf);
if (has_value)
serialization->deserializeBinary(element, buf, {});
}
}

View File

@ -144,7 +144,7 @@ public:
count = other.count;
compressed = other.compressed;
sampled.resize(other.sampled.size());
sampled.resize_exact(other.sampled.size());
memcpy(sampled.data(), other.sampled.data(), sizeof(Stats) * other.sampled.size());
return;
}
@ -180,7 +180,7 @@ public:
compress();
backup_sampled.clear();
backup_sampled.reserve(sampled.size() + other.sampled.size());
backup_sampled.reserve_exact(sampled.size() + other.sampled.size());
double merged_relative_error = std::max(relative_error, other.relative_error);
size_t merged_count = count + other.count;
Int64 additional_self_delta = static_cast<Int64>(std::floor(2 * other.relative_error * other.count));
@ -268,11 +268,7 @@ public:
size_t sampled_len = 0;
readBinaryLittleEndian(sampled_len, buf);
if (sampled_len > compress_threshold)
throw Exception(
ErrorCodes::INCORRECT_DATA, "The number of elements {} for quantileGK exceeds {}", sampled_len, compress_threshold);
sampled.resize(sampled_len);
sampled.resize_exact(sampled_len);
for (size_t i = 0; i < sampled_len; ++i)
{
@ -317,7 +313,7 @@ private:
::sort(head_sampled.begin(), head_sampled.end());
backup_sampled.clear();
backup_sampled.reserve(sampled.size() + head_sampled.size());
backup_sampled.reserve_exact(sampled.size() + head_sampled.size());
size_t sample_idx = 0;
size_t ops_idx = 0;

View File

@ -14,6 +14,8 @@
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <boost/algorithm/string.hpp>
namespace DB
{

View File

@ -1,5 +1,7 @@
#include <Common/FunctionDocumentation.h>
#include <boost/algorithm/string.hpp>
namespace DB
{
@ -31,14 +33,7 @@ std::string FunctionDocumentation::examplesAsString() const
std::string FunctionDocumentation::categoriesAsString() const
{
if (categories.empty())
return "";
auto it = categories.begin();
std::string res = *it;
for (; it != categories.end(); ++it)
res += ", " + *it;
return res;
return boost::algorithm::join(categories, ", ");
}
}

View File

@ -476,6 +476,7 @@ The server successfully detected this situation and will download merged part fr
M(FileSegmentRemoveMicroseconds, "File segment remove() time") \
M(FileSegmentHolderCompleteMicroseconds, "File segments holder complete() time") \
M(FileSegmentFailToIncreasePriority, "Number of times the priority was not increased due to a high contention on the cache lock") \
M(FilesystemCacheFailToReserveSpaceBecauseOfLockContention, "Number of times space reservation was skipped due to a high contention on the cache lock") \
M(FilesystemCacheHoldFileSegments, "Filesystem cache file segments count, which were hold") \
M(FilesystemCacheUnusedHoldFileSegments, "Filesystem cache file segments count, which were hold, but not used (because of seek or LIMIT n, etc)") \
\

View File

@ -123,17 +123,15 @@ protected:
std::string getServerUrl() const
{
return "http://" + server_data.socket->address().toString();
return "http://" + server_data.server->socket().address().toString();
}
void startServer()
{
server_data.reset();
server_data.params = new Poco::Net::HTTPServerParams();
server_data.socket = std::make_unique<Poco::Net::ServerSocket>(server_data.port);
server_data.handler_factory = new HTTPRequestHandlerFactory(slowdown_receive);
server_data.server = std::make_unique<Poco::Net::HTTPServer>(
server_data.handler_factory, *server_data.socket, server_data.params);
server_data.handler_factory, server_data.port);
server_data.server->start();
}
@ -155,8 +153,7 @@ protected:
{
// just some port to avoid collisions with others tests
UInt16 port = 9871;
Poco::Net::HTTPServerParams::Ptr params;
std::unique_ptr<Poco::Net::ServerSocket> socket;
HTTPRequestHandlerFactory::Ptr handler_factory;
std::unique_ptr<Poco::Net::HTTPServer> server;
@ -171,8 +168,6 @@ protected:
server = nullptr;
handler_factory = nullptr;
socket = nullptr;
params = nullptr;
}
~ServerData() {

View File

@ -374,7 +374,7 @@ void Context::updateKeeperConfiguration([[maybe_unused]] const Poco::Util::Abstr
if (!shared->keeper_dispatcher)
return;
shared->keeper_dispatcher->updateConfiguration(getConfigRef(), getMacros());
shared->keeper_dispatcher->updateConfiguration(config_, getMacros());
}
std::shared_ptr<zkutil::ZooKeeper> Context::getZooKeeper() const

View File

@ -15,6 +15,7 @@ namespace ErrorCodes
extern const int THERE_IS_NO_PROFILE;
extern const int NO_ELEMENTS_IN_CONFIG;
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
extern const int BAD_ARGUMENTS;
}
IMPLEMENT_SETTINGS_TRAITS(SettingsTraits, LIST_OF_SETTINGS)
@ -114,7 +115,11 @@ std::vector<String> Settings::getAllRegisteredNames() const
void Settings::set(std::string_view name, const Field & value)
{
if (name == "compatibility")
{
if (value.getType() != Field::Types::Which::String)
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unexpected type of value for setting 'compatibility'. Expected String, got {}", value.getTypeName());
applyCompatibilitySetting(value.get<String>());
}
/// If we change setting that was changed by compatibility setting before
/// we should remove it from settings_changed_by_compatibility_setting,
/// otherwise the next time we will change compatibility setting

View File

@ -778,6 +778,7 @@ class IColumn;
M(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), "Max remote filesystem cache size that can be downloaded by a single query", 0) \
M(Bool, throw_on_error_from_cache_on_write_operations, false, "Ignore error from cache when caching on write operations (INSERT, merges)", 0) \
M(UInt64, filesystem_cache_segments_batch_size, 20, "Limit on size of a single batch of file segments that a read buffer can request from cache. Too low value will lead to excessive requests to cache, too large may slow down eviction from cache", 0) \
M(UInt64, filesystem_cache_reserve_space_wait_lock_timeout_milliseconds, 1000, "Wait time to lock cache for sapce reservation in filesystem cache", 0) \
\
M(Bool, use_page_cache_for_disks_without_file_cache, false, "Use userspace page cache for remote disks that don't have filesystem cache enabled.", 0) \
M(Bool, read_from_page_cache_if_exists_otherwise_bypass_cache, false, "Use userspace page cache in passive mode, similar to read_from_filesystem_cache_if_exists_otherwise_bypass_cache.", 0) \

View File

@ -93,6 +93,7 @@ static std::map<ClickHouseVersion, SettingsChangesHistory::SettingsChanges> sett
{"input_format_json_use_string_type_for_ambiguous_paths_in_named_tuples_inference_from_objects", false, false, "Allow to use String type for ambiguous paths during named tuple inference from JSON objects"},
{"throw_if_deduplication_in_dependent_materialized_views_enabled_with_async_insert", false, true, "Deduplication is dependent materialized view cannot work together with async inserts."},
{"parallel_replicas_allow_in_with_subquery", false, true, "If true, subquery for IN will be executed on every follower replica"},
{"filesystem_cache_reserve_space_wait_lock_timeout_milliseconds", 1000, 1000, "Wait time to lock cache for sapce reservation in filesystem cache"},
}},
{"24.2", {{"allow_suspicious_variant_types", true, false, "Don't allow creating Variant type with suspicious variants by default"},
{"validate_experimental_and_suspicious_types_inside_nested_types", false, true, "Validate usage of experimental and suspicious types inside nested types"},

View File

@ -149,7 +149,9 @@ ASTPtr DatabasesOverlay::getCreateTableQueryImpl(const String & name, ContextPtr
*/
ASTPtr DatabasesOverlay::getCreateDatabaseQuery() const
{
return std::make_shared<ASTCreateQuery>();
auto query = std::make_shared<ASTCreateQuery>();
query->setDatabase(getDatabaseName());
return query;
}
String DatabasesOverlay::getTableDataPath(const String & table_name) const

View File

@ -1,7 +1,27 @@
#include "DiskType.h"
#include <Poco/String.h>
#include <Common/Exception.h>
namespace DB
{
namespace ErrorCodes
{
extern const int UNKNOWN_ELEMENT_IN_CONFIG;
}
MetadataStorageType metadataTypeFromString(const String & type)
{
auto check_type = Poco::toLower(type);
if (check_type == "local")
return MetadataStorageType::Local;
if (check_type == "plain")
return MetadataStorageType::Plain;
if (check_type == "web")
return MetadataStorageType::StaticWeb;
throw Exception(ErrorCodes::UNKNOWN_ELEMENT_IN_CONFIG,
"MetadataStorageFactory: unknown metadata storage type: {}", type);
}
bool DataSourceDescription::operator==(const DataSourceDescription & other) const
{
@ -14,4 +34,32 @@ bool DataSourceDescription::sameKind(const DataSourceDescription & other) const
== std::tie(other.type, other.object_storage_type, other.description);
}
std::string DataSourceDescription::toString() const
{
switch (type)
{
case DataSourceType::Local:
return "local";
case DataSourceType::RAM:
return "memory";
case DataSourceType::ObjectStorage:
{
switch (object_storage_type)
{
case ObjectStorageType::S3:
return "s3";
case ObjectStorageType::HDFS:
return "hdfs";
case ObjectStorageType::Azure:
return "azure_blob_storage";
case ObjectStorageType::Local:
return "local_blob_storage";
case ObjectStorageType::Web:
return "web";
case ObjectStorageType::None:
return "none";
}
}
}
}
}

View File

@ -17,7 +17,6 @@ enum class ObjectStorageType
{
None,
S3,
S3_Plain,
Azure,
HDFS,
Web,
@ -30,9 +29,11 @@ enum class MetadataStorageType
Local,
Plain,
StaticWeb,
Memory,
};
MetadataStorageType metadataTypeFromString(const String & type);
String toString(DataSourceType data_source_type);
struct DataSourceDescription
{
DataSourceType type;
@ -47,36 +48,7 @@ struct DataSourceDescription
bool operator==(const DataSourceDescription & other) const;
bool sameKind(const DataSourceDescription & other) const;
std::string toString() const
{
switch (type)
{
case DataSourceType::Local:
return "local";
case DataSourceType::RAM:
return "memory";
case DataSourceType::ObjectStorage:
{
switch (object_storage_type)
{
case ObjectStorageType::S3:
return "s3";
case ObjectStorageType::S3_Plain:
return "s3_plain";
case ObjectStorageType::HDFS:
return "hdfs";
case ObjectStorageType::Azure:
return "azure_blob_storage";
case ObjectStorageType::Local:
return "local_blob_storage";
case ObjectStorageType::Web:
return "web";
case ObjectStorageType::None:
return "none";
}
}
}
}
std::string toString() const;
};
}

View File

@ -637,7 +637,8 @@ void CachedOnDiskReadBufferFromFile::predownload(FileSegment & file_segment)
ProfileEvents::increment(ProfileEvents::CachedReadBufferReadFromSourceBytes, current_impl_buffer_size);
bool continue_predownload = file_segment.reserve(current_predownload_size);
bool continue_predownload = file_segment.reserve(
current_predownload_size, settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds);
if (continue_predownload)
{
LOG_TEST(log, "Left to predownload: {}, buffer size: {}", bytes_to_predownload, current_impl_buffer_size);
@ -992,7 +993,7 @@ bool CachedOnDiskReadBufferFromFile::nextImplStep()
{
chassert(file_offset_of_buffer_end + size - 1 <= file_segment.range().right);
bool success = file_segment.reserve(size);
bool success = file_segment.reserve(size, settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds);
if (success)
{
chassert(file_segment.getCurrentWriteOffset() == static_cast<size_t>(implementation_buffer->getPosition()));

View File

@ -26,16 +26,18 @@ FileSegmentRangeWriter::FileSegmentRangeWriter(
FileCache * cache_,
const FileSegment::Key & key_,
const FileCacheUserInfo & user_,
size_t reserve_space_lock_wait_timeout_milliseconds_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
const String & query_id_,
const String & source_path_)
: cache(cache_)
, key(key_)
, user(user_)
, reserve_space_lock_wait_timeout_milliseconds(reserve_space_lock_wait_timeout_milliseconds_)
, log(getLogger("FileSegmentRangeWriter"))
, cache_log(cache_log_)
, query_id(query_id_)
, source_path(source_path_)
, user(user_)
{
}
@ -89,7 +91,7 @@ bool FileSegmentRangeWriter::write(const char * data, size_t size, size_t offset
size_t size_to_write = std::min(available_size, size);
bool reserved = file_segment->reserve(size_to_write);
bool reserved = file_segment->reserve(size_to_write, reserve_space_lock_wait_timeout_milliseconds);
if (!reserved)
{
appendFilesystemCacheLog(*file_segment);
@ -211,6 +213,7 @@ CachedOnDiskWriteBufferFromFile::CachedOnDiskWriteBufferFromFile(
, key(key_)
, query_id(query_id_)
, user(user_)
, reserve_space_lock_wait_timeout_milliseconds(settings_.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds)
, throw_on_error_from_cache(settings_.throw_on_error_from_cache)
, cache_log(!query_id_.empty() && settings_.enable_filesystem_cache_log ? cache_log_ : nullptr)
{
@ -251,7 +254,8 @@ void CachedOnDiskWriteBufferFromFile::cacheData(char * data, size_t size, bool t
if (!cache_writer)
{
cache_writer = std::make_unique<FileSegmentRangeWriter>(cache.get(), key, user, cache_log, query_id, source_path);
cache_writer = std::make_unique<FileSegmentRangeWriter>(
cache.get(), key, user, reserve_space_lock_wait_timeout_milliseconds, cache_log, query_id, source_path);
}
Stopwatch watch(CLOCK_MONOTONIC);

View File

@ -30,6 +30,7 @@ public:
FileCache * cache_,
const FileSegment::Key & key_,
const FileCacheUserInfo & user_,
size_t reserve_space_lock_wait_timeout_milliseconds_,
std::shared_ptr<FilesystemCacheLog> cache_log_,
const String & query_id_,
const String & source_path_);
@ -52,13 +53,14 @@ private:
void completeFileSegment();
FileCache * cache;
FileSegment::Key key;
const FileSegment::Key key;
const FileCacheUserInfo user;
const size_t reserve_space_lock_wait_timeout_milliseconds;
LoggerPtr log;
std::shared_ptr<FilesystemCacheLog> cache_log;
String query_id;
String source_path;
FileCacheUserInfo user;
const String query_id;
const String source_path;
FileSegmentsHolderPtr file_segments;
@ -99,11 +101,12 @@ private:
String source_path;
FileCacheKey key;
size_t current_download_offset = 0;
const String query_id;
const FileCacheUserInfo user;
const size_t reserve_space_lock_wait_timeout_milliseconds;
const bool throw_on_error_from_cache;
bool throw_on_error_from_cache;
size_t current_download_offset = 0;
bool cache_in_error_state_or_disabled = false;
std::unique_ptr<FileSegmentRangeWriter> cache_writer;

View File

@ -218,6 +218,7 @@ public:
virtual bool isReadOnly() const { return false; }
virtual bool isWriteOnce() const { return false; }
virtual bool isPlain() const { return false; }
virtual bool supportParallelWrite() const { return false; }

View File

@ -31,6 +31,8 @@ LocalObjectStorage::LocalObjectStorage(String key_prefix_)
description = *block_device_id;
else
description = "/";
fs::create_directories(key_prefix);
}
bool LocalObjectStorage::exists(const StoredObject & object) const
@ -106,13 +108,11 @@ std::unique_ptr<ReadBufferFromFileBase> LocalObjectStorage::readObject( /// NOLI
std::optional<size_t> read_hint,
std::optional<size_t> file_size) const
{
const auto & path = object.remote_path;
if (!file_size)
file_size = tryGetSizeFromFilePath(path);
file_size = tryGetSizeFromFilePath(object.remote_path);
LOG_TEST(log, "Read object: {}", path);
return createReadBufferFromFileBase(path, patchSettings(read_settings), read_hint, file_size);
LOG_TEST(log, "Read object: {}", object.remote_path);
return createReadBufferFromFileBase(object.remote_path, patchSettings(read_settings), read_hint, file_size);
}
std::unique_ptr<WriteBufferFromFileBase> LocalObjectStorage::writeObject( /// NOLINT
@ -126,6 +126,11 @@ std::unique_ptr<WriteBufferFromFileBase> LocalObjectStorage::writeObject( /// NO
throw Exception(ErrorCodes::BAD_ARGUMENTS, "LocalObjectStorage doesn't support append to files");
LOG_TEST(log, "Write object: {}", object.remote_path);
/// Unlike real blob storage, in local fs we cannot create a file with non-existing prefix.
/// So let's create it.
fs::create_directories(fs::path(object.remote_path).parent_path());
return std::make_unique<WriteBufferFromFile>(object.remote_path, buf_size);
}
@ -157,9 +162,36 @@ void LocalObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
removeObjectIfExists(object);
}
ObjectMetadata LocalObjectStorage::getObjectMetadata(const std::string & /* path */) const
ObjectMetadata LocalObjectStorage::getObjectMetadata(const std::string & path) const
{
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Metadata is not supported for LocalObjectStorage");
ObjectMetadata object_metadata;
LOG_TEST(log, "Getting metadata for path: {}", path);
object_metadata.size_bytes = fs::file_size(path);
object_metadata.last_modified = Poco::Timestamp::fromEpochTime(
std::chrono::duration_cast<std::chrono::seconds>(fs::last_write_time(path).time_since_epoch()).count());
return object_metadata;
}
void LocalObjectStorage::listObjects(const std::string & path, RelativePathsWithMetadata & children, int /* max_keys */) const
{
for (const auto & entry : fs::directory_iterator(path))
{
if (entry.is_directory())
{
listObjects(entry.path(), children, 0);
continue;
}
auto metadata = getObjectMetadata(entry.path());
children.emplace_back(entry.path(), std::move(metadata));
}
}
bool LocalObjectStorage::existsOrHasAnyChild(const std::string & path) const
{
/// Unlike real object storage, existence of a prefix path can be checked by
/// just checking existence of this prefix directly, so simple exists is enough here.
return exists(StoredObject(path));
}
void LocalObjectStorage::copyObject( // NOLINT

View File

@ -58,6 +58,10 @@ public:
ObjectMetadata getObjectMetadata(const std::string & path) const override;
void listObjects(const std::string & path, RelativePathsWithMetadata & children, int max_keys) const override;
bool existsOrHasAnyChild(const std::string & path) const override;
void copyObject( /// NOLINT
const StoredObject & object_from,
const StoredObject & object_to,

View File

@ -32,6 +32,35 @@ void MetadataStorageFactory::registerMetadataStorageType(const std::string & met
}
}
std::string MetadataStorageFactory::getCompatibilityMetadataTypeHint(const ObjectStorageType & type)
{
switch (type)
{
case ObjectStorageType::S3:
case ObjectStorageType::HDFS:
case ObjectStorageType::Local:
case ObjectStorageType::Azure:
return "local";
case ObjectStorageType::Web:
return "web";
default:
return "";
}
}
std::string MetadataStorageFactory::getMetadataType(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const std::string & compatibility_type_hint)
{
if (compatibility_type_hint.empty() && !config.has(config_prefix + ".metadata_type"))
{
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Expected `metadata_type` in config");
}
return config.getString(config_prefix + ".metadata_type", compatibility_type_hint);
}
MetadataStoragePtr MetadataStorageFactory::create(
const std::string & name,
const Poco::Util::AbstractConfiguration & config,
@ -39,12 +68,7 @@ MetadataStoragePtr MetadataStorageFactory::create(
ObjectStoragePtr object_storage,
const std::string & compatibility_type_hint) const
{
if (compatibility_type_hint.empty() && !config.has(config_prefix + ".metadata_type"))
{
throw Exception(ErrorCodes::NO_ELEMENTS_IN_CONFIG, "Expected `metadata_type` in config");
}
const auto type = config.getString(config_prefix + ".metadata_type", compatibility_type_hint);
const auto type = getMetadataType(config, config_prefix, compatibility_type_hint);
const auto it = registry.find(type);
if (it == registry.end())

View File

@ -25,6 +25,13 @@ public:
ObjectStoragePtr object_storage,
const std::string & compatibility_type_hint) const;
static std::string getMetadataType(
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
const std::string & compatibility_type_hint = "");
static std::string getCompatibilityMetadataTypeHint(const ObjectStorageType & type);
private:
using Registry = std::unordered_map<String, Creator>;
Registry registry;

View File

@ -48,10 +48,7 @@ bool MetadataStorageFromPlainObjectStorage::isDirectory(const std::string & path
std::string directory = object_key.serialize();
if (!directory.ends_with('/'))
directory += '/';
RelativePathsWithMetadata files;
object_storage->listObjects(directory, files, 1);
return !files.empty();
return object_storage->existsOrHasAnyChild(directory);
}
uint64_t MetadataStorageFromPlainObjectStorage::getFileSize(const String & path) const

View File

@ -18,6 +18,8 @@
#include <Disks/ObjectStorages/Local/LocalObjectStorage.h>
#include <Disks/loadLocalDiskConfig.h>
#endif
#include <Disks/ObjectStorages/MetadataStorageFactory.h>
#include <Disks/ObjectStorages/PlainObjectStorage.h>
#include <Interpreters/Context.h>
#include <Common/Macros.h>
@ -32,6 +34,36 @@ namespace ErrorCodes
extern const int LOGICAL_ERROR;
}
namespace
{
bool isPlainStorage(
ObjectStorageType type,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix)
{
auto compatibility_hint = MetadataStorageFactory::getCompatibilityMetadataTypeHint(type);
auto metadata_type = MetadataStorageFactory::getMetadataType(config, config_prefix, compatibility_hint);
return metadataTypeFromString(metadata_type) == MetadataStorageType::Plain;
}
template <typename BaseObjectStorage, class ...Args>
ObjectStoragePtr createObjectStorage(
ObjectStorageType type,
const Poco::Util::AbstractConfiguration & config,
const std::string & config_prefix,
Args && ...args)
{
if (isPlainStorage(type, config, config_prefix))
{
return std::make_shared<PlainObjectStorage<BaseObjectStorage>>(std::forward<Args>(args)...);
}
else
{
return std::make_shared<BaseObjectStorage>(std::forward<Args>(args)...);
}
}
}
ObjectStorageFactory & ObjectStorageFactory::instance()
{
static ObjectStorageFactory factory;
@ -127,14 +159,14 @@ void registerS3ObjectStorage(ObjectStorageFactory & factory)
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
auto settings = getSettings(config, config_prefix, context);
auto client = getClient(config, config_prefix, context, *settings);
auto key_generator = getKeyGenerator(disk_type, uri, config, config_prefix);
auto key_generator = getKeyGenerator(uri, config, config_prefix);
auto object_storage = std::make_shared<S3ObjectStorage>(
std::move(client), std::move(settings), uri, s3_capabilities, key_generator, name);
auto object_storage = createObjectStorage<S3ObjectStorage>(
ObjectStorageType::S3, config, config_prefix, std::move(client), std::move(settings), uri, s3_capabilities, key_generator, name);
/// NOTE: should we still perform this check for clickhouse-disks?
if (!skip_access_check)
checkS3Capabilities(*object_storage, s3_capabilities, name);
checkS3Capabilities(*dynamic_cast<S3ObjectStorage *>(object_storage.get()), s3_capabilities, name);
return object_storage;
});
@ -163,14 +195,14 @@ void registerS3PlainObjectStorage(ObjectStorageFactory & factory)
auto s3_capabilities = getCapabilitiesFromConfig(config, config_prefix);
auto settings = getSettings(config, config_prefix, context);
auto client = getClient(config, config_prefix, context, *settings);
auto key_generator = getKeyGenerator(disk_type, uri, config, config_prefix);
auto key_generator = getKeyGenerator(uri, config, config_prefix);
auto object_storage = std::make_shared<S3PlainObjectStorage>(
auto object_storage = std::make_shared<PlainObjectStorage<S3ObjectStorage>>(
std::move(client), std::move(settings), uri, s3_capabilities, key_generator, name);
/// NOTE: should we still perform this check for clickhouse-disks?
if (!skip_access_check)
checkS3Capabilities(*object_storage, s3_capabilities, name);
checkS3Capabilities(*dynamic_cast<S3ObjectStorage *>(object_storage.get()), s3_capabilities, name);
return object_storage;
});
@ -198,7 +230,7 @@ void registerHDFSObjectStorage(ObjectStorageFactory & factory)
context->getSettingsRef().hdfs_replication
);
return std::make_unique<HDFSObjectStorage>(uri, std::move(settings), config);
return createObjectStorage<HDFSObjectStorage>(ObjectStorageType::HDFS, config, config_prefix, uri, std::move(settings), config);
});
}
#endif
@ -214,12 +246,11 @@ void registerAzureObjectStorage(ObjectStorageFactory & factory)
bool /* skip_access_check */) -> ObjectStoragePtr
{
AzureBlobStorageEndpoint endpoint = processAzureBlobStorageEndpoint(config, config_prefix);
return std::make_unique<AzureObjectStorage>(
name,
return createObjectStorage<AzureObjectStorage>(
ObjectStorageType::Azure, config, config_prefix, name,
getAzureBlobContainerClient(config, config_prefix),
getAzureBlobStorageSettings(config, config_prefix, context),
endpoint.prefix.empty() ? endpoint.container_name : endpoint.container_name + "/" + endpoint.prefix);
};
factory.registerObjectStorageType("azure_blob_storage", creator);
factory.registerObjectStorageType("azure", creator);
@ -250,7 +281,7 @@ void registerWebObjectStorage(ObjectStorageFactory & factory)
ErrorCodes::BAD_ARGUMENTS, "Bad URI: `{}`. Error: {}", uri, e.what());
}
return std::make_shared<WebObjectStorage>(uri, context);
return createObjectStorage<WebObjectStorage>(ObjectStorageType::Web, config, config_prefix, uri, context);
});
}
@ -268,7 +299,7 @@ void registerLocalObjectStorage(ObjectStorageFactory & factory)
loadDiskLocalConfig(name, config, config_prefix, context, object_key_prefix, keep_free_space_bytes);
/// keys are mapped to the fs, object_key_prefix is a directory also
fs::create_directories(object_key_prefix);
return std::make_shared<LocalObjectStorage>(object_key_prefix);
return createObjectStorage<LocalObjectStorage>(ObjectStorageType::Local, config, config_prefix, object_key_prefix);
};
factory.registerObjectStorageType("local_blob_storage", creator);

View File

@ -0,0 +1,35 @@
#pragma once
#include <Disks/ObjectStorages/IObjectStorage.h>
#include <Common/ObjectStorageKeyGenerator.h>
namespace DB
{
/// Do not encode keys, store as-is, and do not require separate disk for metadata.
/// But because of this does not support renames/hardlinks/attrs/...
///
/// NOTE: This disk has excessive API calls.
template <typename BaseObjectStorage>
class PlainObjectStorage : public BaseObjectStorage
{
public:
template <class ...Args>
explicit PlainObjectStorage(Args && ...args)
: BaseObjectStorage(std::forward<Args>(args)...) {}
std::string getName() const override { return "" + BaseObjectStorage::getName(); }
/// Notes:
/// - supports BACKUP to this disk
/// - does not support INSERT into MergeTree table on this disk
bool isWriteOnce() const override { return true; }
bool isPlain() const override { return true; }
ObjectStorageKey generateObjectKeyForPath(const std::string & path) const override
{
return ObjectStorageKey::createAsRelative(BaseObjectStorage::getCommonKeyPrefix(), path);
}
};
}

View File

@ -10,25 +10,6 @@ namespace DB
void registerObjectStorages();
void registerMetadataStorages();
static std::string getCompatibilityMetadataTypeHint(const ObjectStorageType & type)
{
switch (type)
{
case ObjectStorageType::S3:
case ObjectStorageType::HDFS:
case ObjectStorageType::Local:
case ObjectStorageType::Azure:
return "local";
case ObjectStorageType::S3_Plain:
return "plain";
case ObjectStorageType::Web:
return "web";
case ObjectStorageType::None:
return "";
}
UNREACHABLE();
}
void registerDiskObjectStorage(DiskFactory & factory, bool global_skip_access_check)
{
registerObjectStorages();
@ -47,7 +28,10 @@ void registerDiskObjectStorage(DiskFactory & factory, bool global_skip_access_ch
std::string compatibility_metadata_type_hint;
if (!config.has(config_prefix + ".metadata_type"))
{
compatibility_metadata_type_hint = getCompatibilityMetadataTypeHint(object_storage->getType());
if (object_storage->isPlain())
compatibility_metadata_type_hint = "plain";
else
compatibility_metadata_type_hint = MetadataStorageFactory::getCompatibilityMetadataTypeHint(object_storage->getType());
}
auto metadata_storage = MetadataStorageFactory::instance().create(

View File

@ -15,16 +15,10 @@ namespace ErrorCodes
}
ObjectStorageKeysGeneratorPtr getKeyGenerator(
String type,
const S3::URI & uri,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix)
{
if (type == "s3_plain")
return createObjectStorageKeysGeneratorAsIsWithPrefix(uri.key);
chassert(type == "s3");
bool storage_metadata_write_full_object_key = DiskObjectStorageMetadata::getWriteFullObjectKeySetting();
bool send_metadata = config.getBool(config_prefix + ".send_metadata", false);

View File

@ -12,7 +12,6 @@ namespace DB
namespace S3 { struct URI; }
ObjectStorageKeysGeneratorPtr getKeyGenerator(
String type,
const S3::URI & uri,
const Poco::Util::AbstractConfiguration & config,
const String & config_prefix);

View File

@ -48,6 +48,7 @@ namespace ErrorCodes
{
extern const int S3_ERROR;
extern const int BAD_ARGUMENTS;
extern const int LOGICAL_ERROR;
}
namespace
@ -562,6 +563,8 @@ std::unique_ptr<IObjectStorage> S3ObjectStorage::cloneObjectStorage(
ObjectStorageKey S3ObjectStorage::generateObjectKeyForPath(const std::string & path) const
{
if (!key_generator)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Key generator is not set");
return key_generator->generate(path);
}

View File

@ -182,27 +182,6 @@ private:
LoggerPtr log;
};
/// Do not encode keys, store as-is, and do not require separate disk for metadata.
/// But because of this does not support renames/hardlinks/attrs/...
///
/// NOTE: This disk has excessive API calls.
class S3PlainObjectStorage : public S3ObjectStorage
{
public:
std::string getName() const override { return "S3PlainObjectStorage"; }
template <class ...Args>
explicit S3PlainObjectStorage(Args && ...args)
: S3ObjectStorage("S3PlainObjectStorage", std::forward<Args>(args)...) {}
ObjectStorageType getType() const override { return ObjectStorageType::S3_Plain; }
/// Notes:
/// - supports BACKUP to this disk
/// - does not support INSERT into MergeTree table on this disk
bool isWriteOnce() const override { return true; }
};
}
#endif

View File

@ -86,6 +86,10 @@ WebObjectStorage::loadFiles(const String & path, const std::unique_lock<std::sha
loaded_files.emplace_back(file_path);
}
/// Check for not found url after read attempt, because of delayed initialization.
if (metadata_buf->hasNotFoundURL())
return {};
auto [it, inserted] = files.add(path, FileData::createDirectoryInfo(true));
if (!inserted)
{

View File

@ -1,10 +1,11 @@
#include <Formats/ReadSchemaUtils.h>
#include <Interpreters/Context.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Common/assert_cast.h>
#include <IO/WithFileSize.h>
#include <IO/EmptyReadBuffer.h>
#include <IO/PeekableReadBuffer.h>
#include <IO/WithFileSize.h>
#include <Interpreters/Context.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Storages/IStorage.h>
#include <Common/assert_cast.h>
namespace DB
{

View File

@ -4561,7 +4561,7 @@ arguments, result_type, input_rows_count); \
if (from_low_cardinality)
{
const auto * col_low_cardinality = typeid_cast<const ColumnLowCardinality *>(arguments[0].column.get());
const auto * col_low_cardinality = assert_cast<const ColumnLowCardinality *>(arguments[0].column.get());
if (skip_not_null_check && col_low_cardinality->containsNull())
throw Exception(ErrorCodes::CANNOT_INSERT_NULL_IN_ORDINARY_COLUMN, "Cannot convert NULL value to non-Nullable type");
@ -4586,7 +4586,7 @@ arguments, result_type, input_rows_count); \
if (to_low_cardinality)
{
auto res_column = to_low_cardinality->createColumn();
auto * col_low_cardinality = typeid_cast<ColumnLowCardinality *>(res_column.get());
auto * col_low_cardinality = assert_cast<ColumnLowCardinality *>(res_column.get());
if (from_low_cardinality && !src_converted_to_full_column)
{

View File

@ -324,12 +324,15 @@ public:
String getName() const override { return name; }
bool isVariadic() const override { return true; }
bool isShortCircuit(ShortCircuitSettings & settings, size_t /*number_of_arguments*/) const override
bool isShortCircuit(ShortCircuitSettings & settings, size_t number_of_arguments) const override
{
if constexpr (dictionary_get_function_type != DictionaryGetFunctionType::getOrDefault)
return false;
settings.arguments_with_disabled_lazy_execution.insert({0, 1, 2});
/// We execute lazily only last argument with default expression.
for (size_t i = 0; i != number_of_arguments - 1; ++i)
settings.arguments_with_disabled_lazy_execution.insert(i);
settings.enable_lazy_execution_for_common_descendants_of_arguments = false;
settings.force_enable_lazy_execution = false;
return true;

View File

@ -355,7 +355,7 @@ public:
{
arrays.emplace_back(
column_tuple->getColumnPtr(j),
recursiveRemoveLowCardinality(type_tuple.getElement(j)),
type_tuple.getElement(j),
array_with_type_and_name.name + "." + tuple_names[j]);
}
}
@ -363,7 +363,7 @@ public:
{
arrays.emplace_back(
column_array->getDataPtr(),
recursiveRemoveLowCardinality(array_type->getNestedType()),
array_type->getNestedType(),
array_with_type_and_name.name);
}

View File

@ -1007,8 +1007,13 @@ private:
if (!(*null_map)[row])
continue;
}
else if (!applyVisitor(FieldVisitorAccurateEquals(), arr[i], value))
continue;
else
{
if (null_map && (*null_map)[row])
continue;
if (!applyVisitor(FieldVisitorAccurateEquals(), arr[i], value))
continue;
}
ConcreteAction::apply(data[row], i);

View File

@ -4,6 +4,7 @@
#include <DataTypes/DataTypesNumber.h>
#include <Columns/ColumnArray.h>
#include <Columns/ColumnVector.h>
#include <Columns/ColumnsNumber.h>
#include <Columns/ColumnConst.h>
#include <Functions/FunctionFactory.h>
#include <Functions/IFunction.h>

View File

@ -13,7 +13,15 @@ using FunctionSin = FunctionMathUnary<UnaryFunctionVectorized<SinName, sin>>;
REGISTER_FUNCTION(Sin)
{
factory.registerFunction<FunctionSin>({}, FunctionFactory::CaseInsensitive);
factory.registerFunction<FunctionSin>(
FunctionDocumentation{
.description = "Returns the sine of the argument.",
.syntax = "sin(x)",
.arguments = {{"x", "The number whose sine will be returned. (U)Int*, Float* or Decimal*."}},
.returned_value = "The sine of x.",
.examples = {{.name = "simple", .query = "SELECT sin(1.23)", .result = "0.9424888019316975"}},
.categories{"Mathematical", "Trigonometric"}},
FunctionFactory::CaseInsensitive);
}
}

View File

@ -62,32 +62,17 @@ public:
{
}
/// Get the name of the function.
String getName() const override
{
return name;
}
/// Do not sleep during query analysis.
bool isSuitableForConstantFolding() const override
{
return false;
}
size_t getNumberOfArguments() const override
{
return 1;
}
String getName() const override { return name; }
bool isSuitableForConstantFolding() const override { return false; } /// Do not sleep during query analysis.
size_t getNumberOfArguments() const override { return 1; }
bool isSuitableForShortCircuitArgumentsExecution(const DataTypesWithConstInfo & /*arguments*/) const override { return true; }
DataTypePtr getReturnTypeImpl(const DataTypes & arguments) const override
{
WhichDataType which(arguments[0]);
if (!which.isFloat()
&& !which.isNativeUInt())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}, expected Float64",
if (!which.isFloat() && !which.isNativeUInt())
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument of function {}, expected UInt* or Float*",
arguments[0]->getName(), getName());
return std::make_shared<DataTypeUInt8>();

View File

@ -21,7 +21,35 @@ using FunctionSimpleJSONExtractBool = FunctionsStringSearch<ExtractParamImpl<Nam
REGISTER_FUNCTION(VisitParamExtractBool)
{
factory.registerFunction<FunctionSimpleJSONExtractBool>();
factory.registerFunction<FunctionSimpleJSONExtractBool>(FunctionDocumentation{
.description = "Parses a true/false value from the value of the field named field_name. The result is UInt8.",
.syntax = "simpleJSONExtractBool(json, field_name)",
.arguments
= {{"json", "The JSON in which the field is searched for. String."},
{"field_name", "The name of the field to search for. String literal."}},
.returned_value
= R"(It returns 1 if the value of the field is true, 0 otherwise. This means this function will return 0 including (and not only) in the following cases:
- If the field doesn't exists.
- If the field contains true as a string, e.g.: {"field":"true"}.
- If the field contains 1 as a numerical value.)",
.examples
= {{.name = "simple",
.query = R"(CREATE TABLE jsons
(
json String
)
ENGINE = Memory;
INSERT INTO jsons VALUES ('{"foo":false,"bar":true}');
INSERT INTO jsons VALUES ('{"foo":"true","qux":1}');
SELECT simpleJSONExtractBool(json, 'bar') FROM jsons ORDER BY json;
SELECT simpleJSONExtractBool(json, 'foo') FROM jsons ORDER BY json;)",
.result = R"(0
1
0
0)"}},
.categories{"JSON"}});
factory.registerAlias("visitParamExtractBool", "simpleJSONExtractBool");
}

View File

@ -11,7 +11,36 @@ using FunctionSimpleJSONExtractFloat = FunctionsStringSearch<ExtractParamImpl<Na
REGISTER_FUNCTION(VisitParamExtractFloat)
{
factory.registerFunction<FunctionSimpleJSONExtractFloat>();
factory.registerFunction<FunctionSimpleJSONExtractFloat>(FunctionDocumentation{
.description
= "Parses Float64 from the value of the field named field_name. If this is a string field, it tries to parse a number from the "
"beginning of the string. If the field does not exist, or it exists but does not contain a number, it returns 0.",
.syntax = "simpleJSONExtractFloat(json, field_name)",
.arguments
= {{"json", "The JSON in which the field is searched for. String."},
{"field_name", "The name of the field to search for. String literal."}},
.returned_value = "It returns the number parsed from the field if the field exists and contains a number, 0 otherwise.",
.examples
= {{.name = "simple",
.query = R"(CREATE TABLE jsons
(
json String
)
ENGINE = Memory;
INSERT INTO jsons VALUES ('{"foo":"-4e3"}');
INSERT INTO jsons VALUES ('{"foo":-3.4}');
INSERT INTO jsons VALUES ('{"foo":5}');
INSERT INTO jsons VALUES ('{"foo":"not1number"}');
INSERT INTO jsons VALUES ('{"baz":2}');
SELECT simpleJSONExtractFloat(json, 'foo') FROM jsons ORDER BY json;)",
.result = R"(0
-4000
0
-3.4
5)"}},
.categories{"JSON"}});
factory.registerAlias("visitParamExtractFloat", "simpleJSONExtractFloat");
}

View File

@ -11,7 +11,36 @@ using FunctionSimpleJSONExtractInt = FunctionsStringSearch<ExtractParamImpl<Name
REGISTER_FUNCTION(VisitParamExtractInt)
{
factory.registerFunction<FunctionSimpleJSONExtractInt>();
factory.registerFunction<FunctionSimpleJSONExtractInt>(FunctionDocumentation{
.description
= "Parses Int64 from the value of the field named field_name. If this is a string field, it tries to parse a number from the "
"beginning of the string. If the field does not exist, or it exists but does not contain a number, it returns 0.",
.syntax = "simpleJSONExtractInt(json, field_name)",
.arguments
= {{"json", "The JSON in which the field is searched for. String."},
{"field_name", "The name of the field to search for. String literal."}},
.returned_value = "It returns the number parsed from the field if the field exists and contains a number, 0 otherwise.",
.examples
= {{.name = "simple",
.query = R"(CREATE TABLE jsons
(
json String
)
ENGINE = Memory;
INSERT INTO jsons VALUES ('{"foo":"-4e3"}');
INSERT INTO jsons VALUES ('{"foo":-3.4}');
INSERT INTO jsons VALUES ('{"foo":5}');
INSERT INTO jsons VALUES ('{"foo":"not1number"}');
INSERT INTO jsons VALUES ('{"baz":2}');
SELECT simpleJSONExtractInt(json, 'foo') FROM jsons ORDER BY json;)",
.result = R"(0
-4
0
-3
5)"}},
.categories{"JSON"}});
factory.registerAlias("visitParamExtractInt", "simpleJSONExtractInt");
}

View File

@ -61,7 +61,35 @@ using FunctionSimpleJSONExtractRaw = FunctionsStringSearchToString<ExtractParamT
REGISTER_FUNCTION(VisitParamExtractRaw)
{
factory.registerFunction<FunctionSimpleJSONExtractRaw>();
factory.registerFunction<FunctionSimpleJSONExtractRaw>(FunctionDocumentation{
.description = "Returns the value of the field named field_name as a String, including separators.",
.syntax = "simpleJSONExtractRaw(json, field_name)",
.arguments
= {{"json", "The JSON in which the field is searched for. String."},
{"field_name", "The name of the field to search for. String literal."}},
.returned_value
= "It returns the value of the field as a String including separators if the field exists, or an empty String otherwise.",
.examples
= {{.name = "simple",
.query = R"(CREATE TABLE jsons
(
json String
)
ENGINE = Memory;
INSERT INTO jsons VALUES ('{"foo":"-4e3"}');
INSERT INTO jsons VALUES ('{"foo":-3.4}');
INSERT INTO jsons VALUES ('{"foo":5}');
INSERT INTO jsons VALUES ('{"foo":{"def":[1,2,3]}}');
INSERT INTO jsons VALUES ('{"baz":2}');
SELECT simpleJSONExtractRaw(json, 'foo') FROM jsons ORDER BY json;)",
.result = R"(
"-4e3"
-3.4
5
{"def":[1,2,3]})"}},
.categories{"JSON"}});
factory.registerAlias("visitParamExtractRaw", "simpleJSONExtractRaw");
}

View File

@ -22,7 +22,35 @@ using FunctionSimpleJSONExtractString = FunctionsStringSearchToString<ExtractPar
REGISTER_FUNCTION(VisitParamExtractString)
{
factory.registerFunction<FunctionSimpleJSONExtractString>();
factory.registerFunction<FunctionSimpleJSONExtractString>(FunctionDocumentation{
.description = R"(Parses String in double quotes from the value of the field named field_name.
There is currently no support for code points in the format \uXXXX\uYYYY that are not from the basic multilingual plane (they are converted to CESU-8 instead of UTF-8).)",
.syntax = "simpleJSONExtractString(json, field_name)",
.arguments
= {{"json", "The JSON in which the field is searched for. String."},
{"field_name", "The name of the field to search for. String literal."}},
.returned_value = "It returns the value of a field as a String, including separators. The value is unescaped. It returns an empty "
"String: if the field doesn't contain a double quoted string, if unescaping fails or if the field doesn't exist.",
.examples
= {{.name = "simple",
.query = R"(CREATE TABLE jsons
(
json String
)
ENGINE = Memory;
INSERT INTO jsons VALUES ('{"foo":"\\n\\u0000"}');
INSERT INTO jsons VALUES ('{"foo":"\\u263"}');
INSERT INTO jsons VALUES ('{"foo":"\\u263a"}');
INSERT INTO jsons VALUES ('{"foo":"hello}');
SELECT simpleJSONExtractString(json, 'foo') FROM jsons ORDER BY json;)",
.result = R"(\n\0
)"}},
.categories{"JSON"}});
factory.registerAlias("visitParamExtractString", "simpleJSONExtractString");
}

View File

@ -12,7 +12,36 @@ using FunctionSimpleJSONExtractUInt = FunctionsStringSearch<ExtractParamImpl<Nam
REGISTER_FUNCTION(VisitParamExtractUInt)
{
factory.registerFunction<FunctionSimpleJSONExtractUInt>();
factory.registerFunction<FunctionSimpleJSONExtractUInt>(FunctionDocumentation{
.description
= "Parses UInt64 from the value of the field named field_name. If this is a string field, it tries to parse a number from the "
"beginning of the string. If the field does not exist, or it exists but does not contain a number, it returns 0.",
.syntax = "simpleJSONExtractUInt(json, field_name)",
.arguments
= {{"json", "The JSON in which the field is searched for. String."},
{"field_name", "The name of the field to search for. String literal."}},
.returned_value = "It returns the number parsed from the field if the field exists and contains a number, 0 otherwise.",
.examples
= {{.name = "simple",
.query = R"(CREATE TABLE jsons
(
json String
)
ENGINE = Memory;
INSERT INTO jsons VALUES ('{"foo":"4e3"}');
INSERT INTO jsons VALUES ('{"foo":3.4}');
INSERT INTO jsons VALUES ('{"foo":5}');
INSERT INTO jsons VALUES ('{"foo":"not1number"}');
INSERT INTO jsons VALUES ('{"baz":2}');
SELECT simpleJSONExtractUInt(json, 'foo') FROM jsons ORDER BY json;)",
.result = R"(0
4
0
3
5)"}},
.categories{"JSON"}});
factory.registerAlias("visitParamExtractUInt", "simpleJSONExtractUInt");
}

View File

@ -21,7 +21,28 @@ using FunctionSimpleJSONHas = FunctionsStringSearch<ExtractParamImpl<NameSimpleJ
REGISTER_FUNCTION(VisitParamHas)
{
factory.registerFunction<FunctionSimpleJSONHas>();
factory.registerFunction<FunctionSimpleJSONHas>(FunctionDocumentation{
.description = "Checks whether there is a field named field_name. The result is UInt8.",
.syntax = "simpleJSONHas(json, field_name)",
.arguments
= {{"json", "The JSON in which the field is searched for. String."},
{"field_name", "The name of the field to search for. String literal."}},
.returned_value = "It returns 1 if the field exists, 0 otherwise.",
.examples
= {{.name = "simple",
.query = R"(CREATE TABLE jsons
(
json String
)
ENGINE = Memory;
INSERT INTO jsons VALUES ('{"foo":"true","qux":1}');
SELECT simpleJSONHas(json, 'foo') FROM jsons;
SELECT simpleJSONHas(json, 'bar') FROM jsons;)",
.result = R"(1
0)"}},
.categories{"JSON"}});
factory.registerAlias("visitParamHas", "simpleJSONHas");
}

View File

@ -100,6 +100,7 @@ struct ReadSettings
bool read_from_filesystem_cache_if_exists_otherwise_bypass_cache = false;
bool enable_filesystem_cache_log = false;
size_t filesystem_cache_segments_batch_size = 20;
size_t filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = 1000;
bool use_page_cache_for_disks_without_file_cache = false;
bool read_from_page_cache_if_exists_otherwise_bypass_cache = false;

View File

@ -449,6 +449,7 @@ bool ReadWriteBufferFromHTTP::nextImpl()
if (http_skip_not_found_url && e.getHTTPStatus() == Poco::Net::HTTPResponse::HTTPStatus::HTTP_NOT_FOUND)
{
next_result = false;
has_not_found_url = true;
return;
}
@ -740,4 +741,3 @@ ReadWriteBufferFromHTTP::HTTPFileInfo ReadWriteBufferFromHTTP::parseFileInfo(con
}
}

View File

@ -79,6 +79,7 @@ private:
const bool use_external_buffer;
const bool http_skip_not_found_url;
bool has_not_found_url = false;
std::function<void(std::ostream &)> out_stream_callback;
@ -183,6 +184,8 @@ public:
std::optional<time_t> tryGetLastModificationTime();
bool hasNotFoundURL() const { return has_not_found_url; }
HTTPFileInfo getFileInfo();
static HTTPFileInfo parseFileInfo(const Poco::Net::HTTPResponse & response, size_t requested_range_begin);
};

View File

@ -20,6 +20,7 @@ struct WriteSettings
bool enable_filesystem_cache_on_write_operations = false;
bool enable_filesystem_cache_log = false;
bool throw_on_error_from_cache = false;
size_t filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = 1000;
bool s3_allow_parallel_part_upload = true;

View File

@ -1111,6 +1111,7 @@ void NO_INLINE Aggregator::executeImpl(
bool all_keys_are_const,
AggregateDataPtr overflow_row) const
{
bool use_compiled_functions = false;
if (!no_more_keys)
{
/// Prefetching doesn't make sense for small hash tables, because they fit in caches entirely.
@ -1118,33 +1119,47 @@ void NO_INLINE Aggregator::executeImpl(
&& (method.data.getBufferSizeInBytes() > min_bytes_for_prefetch);
#if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder && !hasSparseArguments(aggregate_instructions))
{
if (prefetch)
executeImplBatch<false, true, true>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
else
executeImplBatch<false, true, false>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
}
else
use_compiled_functions = compiled_aggregate_functions_holder && !hasSparseArguments(aggregate_instructions);
#endif
{
if (prefetch)
executeImplBatch<false, false, true>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
else
executeImplBatch<false, false, false>(
method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
}
if (prefetch)
executeImplBatch<false, true>(
method,
state,
aggregates_pool,
row_begin,
row_end,
aggregate_instructions,
all_keys_are_const,
use_compiled_functions,
overflow_row);
else
executeImplBatch<false, false>(
method,
state,
aggregates_pool,
row_begin,
row_end,
aggregate_instructions,
all_keys_are_const,
use_compiled_functions,
overflow_row);
}
else
{
executeImplBatch<true, false, false>(method, state, aggregates_pool, row_begin, row_end, aggregate_instructions, all_keys_are_const, overflow_row);
executeImplBatch<true, false>(
method,
state,
aggregates_pool,
row_begin,
row_end,
aggregate_instructions,
all_keys_are_const,
use_compiled_functions,
overflow_row);
}
}
template <bool no_more_keys, bool use_compiled_functions, bool prefetch, typename Method, typename State>
template <bool no_more_keys, bool prefetch, typename Method, typename State>
void NO_INLINE Aggregator::executeImplBatch(
Method & method,
State & state,
@ -1153,6 +1168,7 @@ void NO_INLINE Aggregator::executeImplBatch(
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
bool all_keys_are_const,
bool use_compiled_functions [[maybe_unused]],
AggregateDataPtr overflow_row) const
{
using KeyHolder = decltype(state.getKeyHolder(0, std::declval<Arena &>()));
@ -1284,7 +1300,7 @@ void NO_INLINE Aggregator::executeImplBatch(
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
if (use_compiled_functions)
{
const auto & compiled_aggregate_functions = compiled_aggregate_functions_holder->compiled_aggregate_functions;
compiled_aggregate_functions.create_aggregate_states_function(aggregate_data);
@ -1293,20 +1309,6 @@ void NO_INLINE Aggregator::executeImplBatch(
static constexpr bool skip_compiled_aggregate_functions = true;
createAggregateStates<skip_compiled_aggregate_functions>(aggregate_data);
}
#if defined(MEMORY_SANITIZER)
/// We compile only functions that do not allocate some data in Arena. Only store necessary state in AggregateData place.
for (size_t aggregate_function_index = 0; aggregate_function_index < aggregate_functions.size(); ++aggregate_function_index)
{
if (!is_aggregate_function_compiled[aggregate_function_index])
continue;
auto aggregate_data_with_offset = aggregate_data + offsets_of_aggregate_states[aggregate_function_index];
auto data_size = params.aggregates[aggregate_function_index].function->sizeOfData();
__msan_unpoison(aggregate_data_with_offset, data_size);
}
#endif
}
else
#endif
@ -1339,7 +1341,7 @@ void NO_INLINE Aggregator::executeImplBatch(
}
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
if (use_compiled_functions)
{
std::vector<ColumnData> columns_data;
@ -1372,9 +1374,8 @@ void NO_INLINE Aggregator::executeImplBatch(
for (size_t i = 0; i < aggregate_functions.size(); ++i)
{
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
if (is_aggregate_function_compiled[i])
continue;
if (use_compiled_functions && is_aggregate_function_compiled[i])
continue;
#endif
AggregateFunctionInstruction * inst = aggregate_instructions + i;
@ -1387,18 +1388,19 @@ void NO_INLINE Aggregator::executeImplBatch(
}
template <bool use_compiled_functions>
void NO_INLINE Aggregator::executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t row_begin, size_t row_end,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const
Arena * arena,
bool use_compiled_functions [[maybe_unused]]) const
{
if (row_begin == row_end)
return;
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
if (use_compiled_functions)
{
std::vector<ColumnData> columns_data;
@ -1418,20 +1420,6 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
auto add_into_aggregate_states_function_single_place = compiled_aggregate_functions_holder->compiled_aggregate_functions.add_into_aggregate_states_function_single_place;
add_into_aggregate_states_function_single_place(row_begin, row_end, columns_data.data(), res);
#if defined(MEMORY_SANITIZER)
/// We compile only functions that do not allocate some data in Arena. Only store necessary state in AggregateData place.
for (size_t aggregate_function_index = 0; aggregate_function_index < aggregate_functions.size(); ++aggregate_function_index)
{
if (!is_aggregate_function_compiled[aggregate_function_index])
continue;
auto aggregate_data_with_offset = res + offsets_of_aggregate_states[aggregate_function_index];
auto data_size = params.aggregates[aggregate_function_index].function->sizeOfData();
__msan_unpoison(aggregate_data_with_offset, data_size);
}
#endif
}
#endif
@ -1439,13 +1427,10 @@ void NO_INLINE Aggregator::executeWithoutKeyImpl(
for (size_t i = 0; i < aggregate_functions.size(); ++i)
{
AggregateFunctionInstruction * inst = aggregate_instructions + i;
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
if (is_aggregate_function_compiled[i])
continue;
if (use_compiled_functions && is_aggregate_function_compiled[i])
continue;
#endif
addBatchSinglePlace(row_begin, row_end, inst, res + inst->state_offset, arena);
}
}
@ -1704,16 +1689,14 @@ bool Aggregator::executeOnBlock(Columns columns,
if (result.type == AggregatedDataVariants::Type::without_key)
{
/// TODO: Enable compilation after investigation
// #if USE_EMBEDDED_COMPILER
// if (compiled_aggregate_functions_holder)
// {
// executeWithoutKeyImpl<true>(result.without_key, row_begin, row_end, aggregate_functions_instructions.data(), result.aggregates_pool);
// }
// else
// #endif
{
executeWithoutKeyImpl<false>(result.without_key, row_begin, row_end, aggregate_functions_instructions.data(), result.aggregates_pool);
}
bool use_compiled_functions = false;
executeWithoutKeyImpl(
result.without_key,
row_begin,
row_end,
aggregate_functions_instructions.data(),
result.aggregates_pool,
use_compiled_functions);
}
else
{
@ -1965,19 +1948,13 @@ Aggregator::convertToBlockImpl(Method & method, Table & data, Arena * arena, Are
ConvertToBlockRes<return_single_block> res;
bool use_compiled_functions = false;
if (final)
{
#if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder)
{
static constexpr bool use_compiled_functions = !Method::low_cardinality_optimization;
res = convertToBlockImplFinal<Method, use_compiled_functions, return_single_block>(method, data, arena, aggregates_pools, rows);
}
else
use_compiled_functions = compiled_aggregate_functions_holder != nullptr && !Method::low_cardinality_optimization;
#endif
{
res = convertToBlockImplFinal<Method, false, return_single_block>(method, data, arena, aggregates_pools, rows);
}
res = convertToBlockImplFinal<Method, return_single_block>(method, data, arena, aggregates_pools, use_compiled_functions, rows);
}
else
{
@ -2059,8 +2036,12 @@ inline void Aggregator::insertAggregatesIntoColumns(Mapped & mapped, MutableColu
}
template <bool use_compiled_functions>
Block Aggregator::insertResultsIntoColumns(PaddedPODArray<AggregateDataPtr> & places, OutputBlockColumns && out_cols, Arena * arena, bool has_null_key_data [[maybe_unused]]) const
Block Aggregator::insertResultsIntoColumns(
PaddedPODArray<AggregateDataPtr> & places,
OutputBlockColumns && out_cols,
Arena * arena,
bool has_null_key_data [[maybe_unused]],
bool use_compiled_functions [[maybe_unused]]) const
{
std::exception_ptr exception;
size_t aggregate_functions_destroy_index = 0;
@ -2068,7 +2049,7 @@ Block Aggregator::insertResultsIntoColumns(PaddedPODArray<AggregateDataPtr> & pl
try
{
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
if (use_compiled_functions)
{
/** For JIT compiled functions we need to resize columns before pass them into compiled code.
* insert_aggregates_into_columns_function function does not throw exception.
@ -2098,14 +2079,13 @@ Block Aggregator::insertResultsIntoColumns(PaddedPODArray<AggregateDataPtr> & pl
for (; aggregate_functions_destroy_index < params.aggregates_size;)
{
if constexpr (use_compiled_functions)
#if USE_EMBEDDED_COMPILER
if (use_compiled_functions && is_aggregate_function_compiled[aggregate_functions_destroy_index])
{
if (is_aggregate_function_compiled[aggregate_functions_destroy_index])
{
++aggregate_functions_destroy_index;
continue;
}
++aggregate_functions_destroy_index;
continue;
}
#endif
auto & final_aggregate_column = out_cols.final_aggregate_columns[aggregate_functions_destroy_index];
size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index];
@ -2127,14 +2107,13 @@ Block Aggregator::insertResultsIntoColumns(PaddedPODArray<AggregateDataPtr> & pl
for (; aggregate_functions_destroy_index < params.aggregates_size; ++aggregate_functions_destroy_index)
{
if constexpr (use_compiled_functions)
#if USE_EMBEDDED_COMPILER
if (use_compiled_functions && is_aggregate_function_compiled[aggregate_functions_destroy_index])
{
if (is_aggregate_function_compiled[aggregate_functions_destroy_index])
{
++aggregate_functions_destroy_index;
continue;
}
++aggregate_functions_destroy_index;
continue;
}
#endif
size_t offset = offsets_of_aggregate_states[aggregate_functions_destroy_index];
aggregate_functions[aggregate_functions_destroy_index]->destroyBatch(0, places.size(), places.data(), offset);
@ -2146,9 +2125,9 @@ Block Aggregator::insertResultsIntoColumns(PaddedPODArray<AggregateDataPtr> & pl
return finalizeBlock(params, getHeader(/* final */ true), std::move(out_cols), /* final */ true, places.size());
}
template <typename Method, bool use_compiled_functions, bool return_single_block, typename Table>
Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE
Aggregator::convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t) const
template <typename Method, bool return_single_block, typename Table>
Aggregator::ConvertToBlockRes<return_single_block> NO_INLINE Aggregator::convertToBlockImplFinal(
Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool use_compiled_functions [[maybe_unused]], size_t) const
{
/// +1 for nullKeyData, if `data` doesn't have it - not a problem, just some memory for one excessive row will be preallocated
const size_t max_block_size = (return_single_block ? data.size() : std::min(params.max_block_size, data.size())) + 1;
@ -2204,7 +2183,8 @@ Aggregator::convertToBlockImplFinal(Method & method, Table & data, Arena * arena
{
if (places.size() >= max_block_size)
{
res.emplace_back(insertResultsIntoColumns<use_compiled_functions>(places, std::move(out_cols.value()), arena, has_null_key_data));
res.emplace_back(
insertResultsIntoColumns(places, std::move(out_cols.value()), arena, has_null_key_data, use_compiled_functions));
places.clear();
out_cols.reset();
has_null_key_data = false;
@ -2214,12 +2194,13 @@ Aggregator::convertToBlockImplFinal(Method & method, Table & data, Arena * arena
if constexpr (return_single_block)
{
return insertResultsIntoColumns<use_compiled_functions>(places, std::move(out_cols.value()), arena, has_null_key_data);
return insertResultsIntoColumns(places, std::move(out_cols.value()), arena, has_null_key_data, use_compiled_functions);
}
else
{
if (out_cols.has_value())
res.emplace_back(insertResultsIntoColumns<use_compiled_functions>(places, std::move(out_cols.value()), arena, has_null_key_data));
res.emplace_back(
insertResultsIntoColumns(places, std::move(out_cols.value()), arena, has_null_key_data, use_compiled_functions));
return res;
}
}
@ -2609,8 +2590,9 @@ void NO_INLINE Aggregator::mergeDataNullKey(
}
}
template <typename Method, bool use_compiled_functions, bool prefetch, typename Table>
void NO_INLINE Aggregator::mergeDataImpl(Table & table_dst, Table & table_src, Arena * arena) const
template <typename Method, bool prefetch, typename Table>
void NO_INLINE
Aggregator::mergeDataImpl(Table & table_dst, Table & table_src, Arena * arena, bool use_compiled_functions [[maybe_unused]]) const
{
if constexpr (Method::low_cardinality_optimization || Method::one_key_nullable_optimization)
mergeDataNullKey<Method, Table>(table_dst, table_src, arena);
@ -2637,7 +2619,7 @@ void NO_INLINE Aggregator::mergeDataImpl(Table & table_dst, Table & table_src, A
table_src.clearAndShrink();
#if USE_EMBEDDED_COMPILER
if constexpr (use_compiled_functions)
if (use_compiled_functions)
{
const auto & compiled_functions = compiled_aggregate_functions_holder->compiled_aggregate_functions;
compiled_functions.merge_aggregate_states_function(dst_places.data(), src_places.data(), dst_places.size());
@ -2787,26 +2769,16 @@ void NO_INLINE Aggregator::mergeSingleLevelDataImpl(
if (!no_more_keys)
{
bool use_compiled_functions = false;
#if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder)
{
if (prefetch)
mergeDataImpl<Method, true, true>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool);
else
mergeDataImpl<Method, true, false>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool);
}
else
use_compiled_functions = compiled_aggregate_functions_holder != nullptr;
#endif
{
if (prefetch)
mergeDataImpl<Method, false, true>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool);
else
mergeDataImpl<Method, false, false>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool);
}
if (prefetch)
mergeDataImpl<Method, true>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool, use_compiled_functions);
else
mergeDataImpl<Method, false>(
getDataVariant<Method>(*res).data, getDataVariant<Method>(current).data, res->aggregates_pool, use_compiled_functions);
}
else if (res->without_key)
{
@ -2851,26 +2823,22 @@ void NO_INLINE Aggregator::mergeBucketImpl(
return;
AggregatedDataVariants & current = *data[result_num];
bool use_compiled_functions = false;
#if USE_EMBEDDED_COMPILER
if (compiled_aggregate_functions_holder)
{
if (prefetch)
mergeDataImpl<Method, true, true>(
getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena);
else
mergeDataImpl<Method, true, false>(
getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena);
}
else
use_compiled_functions = compiled_aggregate_functions_holder != nullptr;
#endif
{
if (prefetch)
mergeDataImpl<Method, false, true>(
getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena);
else
mergeDataImpl<Method, false, false>(
getDataVariant<Method>(*res).data.impls[bucket], getDataVariant<Method>(current).data.impls[bucket], arena);
}
if (prefetch)
mergeDataImpl<Method, true>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket],
arena,
use_compiled_functions);
else
mergeDataImpl<Method, false>(
getDataVariant<Method>(*res).data.impls[bucket],
getDataVariant<Method>(current).data.impls[bucket],
arena,
use_compiled_functions);
}
}
@ -2938,11 +2906,12 @@ ManyAggregatedDataVariants Aggregator::prepareVariantsToMerge(ManyAggregatedData
return non_empty_data;
}
template <bool no_more_keys, typename State, typename Table>
template <typename State, typename Table>
void NO_INLINE Aggregator::mergeStreamsImplCase(
Arena * aggregates_pool,
State & state,
Table & data,
bool no_more_keys,
AggregateDataPtr overflow_row,
size_t row_begin,
size_t row_end,
@ -2954,36 +2923,34 @@ void NO_INLINE Aggregator::mergeStreamsImplCase(
if (!arena_for_keys)
arena_for_keys = aggregates_pool;
for (size_t i = row_begin; i < row_end; ++i)
if (no_more_keys)
{
AggregateDataPtr aggregate_data = nullptr;
if constexpr (!no_more_keys)
for (size_t i = row_begin; i < row_end; i++)
{
auto emplace_result = state.emplaceKey(data, i, *arena_for_keys); // NOLINT
if (emplace_result.isInserted())
auto find_result = state.findKey(data, i, *arena_for_keys);
/// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys.
AggregateDataPtr value = find_result.isFound() ? find_result.getMapped() : overflow_row;
places[i] = value;
}
}
else
{
for (size_t i = row_begin; i < row_end; i++)
{
auto emplace_result = state.emplaceKey(data, i, *arena_for_keys);
if (!emplace_result.isInserted())
places[i] = emplace_result.getMapped();
else
{
emplace_result.setMapped(nullptr);
aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
AggregateDataPtr aggregate_data = aggregates_pool->alignedAlloc(total_size_of_aggregate_states, align_aggregate_states);
createAggregateStates(aggregate_data);
emplace_result.setMapped(aggregate_data);
places[i] = aggregate_data;
}
else
aggregate_data = emplace_result.getMapped();
}
else
{
auto find_result = state.findKey(data, i, *arena_for_keys);
if (find_result.isFound())
aggregate_data = find_result.getMapped();
}
/// aggregate_date == nullptr means that the new key did not fit in the hash table because of no_more_keys.
AggregateDataPtr value = aggregate_data ? aggregate_data : overflow_row;
places[i] = value;
}
for (size_t j = 0; j < params.aggregates_size; ++j)
@ -3037,22 +3004,16 @@ void NO_INLINE Aggregator::mergeStreamsImpl(
if (use_cache)
{
typename Method::State state(key_columns, key_sizes, aggregation_state_cache);
if (!no_more_keys)
mergeStreamsImplCase<false>(aggregates_pool, state, data, overflow_row, row_begin, row_end, aggregate_columns_data, arena_for_keys);
else
mergeStreamsImplCase<true>(aggregates_pool, state, data, overflow_row, row_begin, row_end, aggregate_columns_data, arena_for_keys);
mergeStreamsImplCase(
aggregates_pool, state, data, no_more_keys, overflow_row, row_begin, row_end, aggregate_columns_data, arena_for_keys);
consecutive_keys_cache_stats.update(row_end - row_begin, state.getCacheMissesSinceLastReset());
}
else
{
typename Method::StateNoCache state(key_columns, key_sizes, aggregation_state_cache);
if (!no_more_keys)
mergeStreamsImplCase<false>(aggregates_pool, state, data, overflow_row, row_begin, row_end, aggregate_columns_data, arena_for_keys);
else
mergeStreamsImplCase<true>(aggregates_pool, state, data, overflow_row, row_begin, row_end, aggregate_columns_data, arena_for_keys);
mergeStreamsImplCase(
aggregates_pool, state, data, no_more_keys, overflow_row, row_begin, row_end, aggregate_columns_data, arena_for_keys);
}
}

View File

@ -1395,7 +1395,7 @@ private:
AggregateDataPtr overflow_row) const;
/// Specialization for a particular value no_more_keys.
template <bool no_more_keys, bool use_compiled_functions, bool prefetch, typename Method, typename State>
template <bool no_more_keys, bool prefetch, typename Method, typename State>
void executeImplBatch(
Method & method,
State & state,
@ -1404,16 +1404,17 @@ private:
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
bool all_keys_are_const,
bool use_compiled_functions,
AggregateDataPtr overflow_row) const;
/// For case when there are no keys (all aggregate into one row).
template <bool use_compiled_functions>
void executeWithoutKeyImpl(
AggregatedDataWithoutKey & res,
size_t row_begin,
size_t row_end,
AggregateFunctionInstruction * aggregate_instructions,
Arena * arena) const;
Arena * arena,
bool use_compiled_functions) const;
template <typename Method>
void writeToTemporaryFileImpl(
@ -1429,8 +1430,8 @@ private:
Arena * arena) const;
/// Merge data from hash table `src` into `dst`.
template <typename Method, bool use_compiled_functions, bool prefetch, typename Table>
void mergeDataImpl(Table & table_dst, Table & table_src, Arena * arena) const;
template <typename Method, bool prefetch, typename Table>
void mergeDataImpl(Table & table_dst, Table & table_src, Arena * arena, bool use_compiled_functions) const;
/// Merge data from hash table `src` into `dst`, but only for keys that already exist in dst. In other cases, merge the data into `overflows`.
template <typename Method, typename Table>
@ -1467,12 +1468,16 @@ private:
MutableColumns & final_aggregate_columns,
Arena * arena) const;
template <bool use_compiled_functions>
Block insertResultsIntoColumns(PaddedPODArray<AggregateDataPtr> & places, OutputBlockColumns && out_cols, Arena * arena, bool has_null_key_data) const;
Block insertResultsIntoColumns(
PaddedPODArray<AggregateDataPtr> & places,
OutputBlockColumns && out_cols,
Arena * arena,
bool has_null_key_data,
bool use_compiled_functions) const;
template <typename Method, bool use_compiled_functions, bool return_single_block, typename Table>
ConvertToBlockRes<return_single_block>
convertToBlockImplFinal(Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, size_t rows) const;
template <typename Method, bool return_single_block, typename Table>
ConvertToBlockRes<return_single_block> convertToBlockImplFinal(
Method & method, Table & data, Arena * arena, Arenas & aggregates_pools, bool use_compiled_functions, size_t rows) const;
template <bool return_single_block, typename Method, typename Table>
ConvertToBlockRes<return_single_block>
@ -1508,11 +1513,12 @@ private:
bool final,
ThreadPool * thread_pool) const;
template <bool no_more_keys, typename State, typename Table>
template <typename State, typename Table>
void mergeStreamsImplCase(
Arena * aggregates_pool,
State & state,
Table & data,
bool no_more_keys,
AggregateDataPtr overflow_row,
size_t row_begin,
size_t row_end,

View File

@ -27,6 +27,7 @@ namespace ProfileEvents
extern const Event FilesystemCacheReserveMicroseconds;
extern const Event FilesystemCacheGetOrSetMicroseconds;
extern const Event FilesystemCacheGetMicroseconds;
extern const Event FilesystemCacheFailToReserveSpaceBecauseOfLockContention;
}
namespace DB
@ -188,9 +189,9 @@ CacheGuard::Lock FileCache::lockCache() const
return cache_guard.lock();
}
CacheGuard::Lock FileCache::tryLockCache() const
CacheGuard::Lock FileCache::tryLockCache(std::optional<std::chrono::milliseconds> acquire_timeout) const
{
return cache_guard.tryLock();
return acquire_timeout.has_value() ? cache_guard.tryLockFor(acquire_timeout.value()) : cache_guard.tryLock();
}
FileSegments FileCache::getImpl(const LockedKey & locked_key, const FileSegment::Range & range, size_t file_segments_limit) const
@ -776,12 +777,18 @@ bool FileCache::tryReserve(
FileSegment & file_segment,
const size_t size,
FileCacheReserveStat & reserve_stat,
const UserInfo & user)
const UserInfo & user,
size_t lock_wait_timeout_milliseconds)
{
ProfileEventTimeIncrement<Microseconds> watch(ProfileEvents::FilesystemCacheReserveMicroseconds);
assertInitialized();
auto cache_lock = lockCache();
auto cache_lock = tryLockCache(std::chrono::milliseconds(lock_wait_timeout_milliseconds));
if (!cache_lock)
{
ProfileEvents::increment(ProfileEvents::FilesystemCacheFailToReserveSpaceBecauseOfLockContention);
return false;
}
LOG_TEST(
log, "Trying to reserve space ({} bytes) for {}:{}, current usage {}/{}",

View File

@ -161,7 +161,8 @@ public:
FileSegment & file_segment,
size_t size,
FileCacheReserveStat & stat,
const UserInfo & user);
const UserInfo & user,
size_t lock_wait_timeout_milliseconds);
std::vector<FileSegment::Info> getFileSegmentInfos(const UserID & user_id);
@ -173,7 +174,7 @@ public:
void deactivateBackgroundOperations();
CacheGuard::Lock lockCache() const;
CacheGuard::Lock tryLockCache() const;
CacheGuard::Lock tryLockCache(std::optional<std::chrono::milliseconds> acquire_timeout = std::nullopt) const;
std::vector<FileSegment::Info> sync();

View File

@ -497,7 +497,7 @@ LockedKeyPtr FileSegment::lockKeyMetadata(bool assert_exists) const
return metadata->tryLock();
}
bool FileSegment::reserve(size_t size_to_reserve, FileCacheReserveStat * reserve_stat)
bool FileSegment::reserve(size_t size_to_reserve, size_t lock_wait_timeout_milliseconds, FileCacheReserveStat * reserve_stat)
{
if (!size_to_reserve)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Zero space reservation is not allowed");
@ -549,7 +549,7 @@ bool FileSegment::reserve(size_t size_to_reserve, FileCacheReserveStat * reserve
if (!reserve_stat)
reserve_stat = &dummy_stat;
bool reserved = cache->tryReserve(*this, size_to_reserve, *reserve_stat, getKeyMetadata()->user);
bool reserved = cache->tryReserve(*this, size_to_reserve, *reserve_stat, getKeyMetadata()->user, lock_wait_timeout_milliseconds);
if (!reserved)
setDownloadFailedUnlocked(lockFileSegment());

View File

@ -199,7 +199,7 @@ public:
/// Try to reserve exactly `size` bytes (in addition to the getDownloadedSize() bytes already downloaded).
/// Returns true if reservation was successful, false otherwise.
bool reserve(size_t size_to_reserve, FileCacheReserveStat * reserve_stat = nullptr);
bool reserve(size_t size_to_reserve, size_t lock_wait_timeout_milliseconds, FileCacheReserveStat * reserve_stat = nullptr);
/// Write data into reserved space.
void write(const char * from, size_t size, size_t offset);

View File

@ -61,17 +61,26 @@ namespace DB
*/
struct CacheGuard : private boost::noncopyable
{
using Mutex = std::timed_mutex;
/// struct is used (not keyword `using`) to make CacheGuard::Lock non-interchangable with other guards locks
/// so, we wouldn't be able to pass CacheGuard::Lock to a function which accepts KeyGuard::Lock, for example
struct Lock : public std::unique_lock<std::mutex>
struct Lock : public std::unique_lock<Mutex>
{
using Base = std::unique_lock<std::mutex>;
using Base = std::unique_lock<Mutex>;
using Base::Base;
};
Lock lock() { return Lock(mutex); }
Lock tryLock() { return Lock(mutex, std::try_to_lock); }
std::mutex mutex;
Lock tryLockFor(const std::chrono::milliseconds & acquire_timeout)
{
return Lock(mutex, std::chrono::duration<double, std::milli>(acquire_timeout));
}
private:
Mutex mutex;
};
/**

View File

@ -1,6 +1,7 @@
#include <Interpreters/Cache/Metadata.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Cache/FileSegment.h>
#include <Interpreters/Context.h>
#include <Common/logger_useful.h>
#include <Common/ElapsedTimeProfileEventIncrement.h>
#include <filesystem>
@ -693,6 +694,9 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
reader->set(memory->data(), memory->size());
}
const auto reserve_space_lock_wait_timeout_milliseconds =
Context::getGlobalContextInstance()->getReadSettings().filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
size_t offset = file_segment.getCurrentWriteOffset();
if (offset != static_cast<size_t>(reader->getPosition()))
reader->seek(offset, SEEK_SET);
@ -701,7 +705,7 @@ void CacheMetadata::downloadImpl(FileSegment & file_segment, std::optional<Memor
{
auto size = reader->available();
if (!file_segment.reserve(size))
if (!file_segment.reserve(size, reserve_space_lock_wait_timeout_milliseconds))
{
LOG_TEST(
log, "Failed to reserve space during background download "

View File

@ -1,6 +1,7 @@
#include <Interpreters/Cache/WriteBufferToFileSegment.h>
#include <Interpreters/Cache/FileSegment.h>
#include <Interpreters/Cache/FileCache.h>
#include <Interpreters/Context.h>
#include <IO/SwapHelper.h>
#include <IO/ReadBufferFromFile.h>
@ -18,9 +19,22 @@ namespace ErrorCodes
extern const int NOT_ENOUGH_SPACE;
}
namespace
{
size_t getCacheLockWaitTimeout()
{
auto query_context = CurrentThread::getQueryContext();
if (query_context)
return query_context->getReadSettings().filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
else
return Context::getGlobalContextInstance()->getReadSettings().filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
}
}
WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegment * file_segment_)
: WriteBufferFromFileDecorator(std::make_unique<WriteBufferFromFile>(file_segment_->getPath()))
, file_segment(file_segment_)
, reserve_space_lock_wait_timeout_milliseconds(getCacheLockWaitTimeout())
{
}
@ -31,6 +45,7 @@ WriteBufferToFileSegment::WriteBufferToFileSegment(FileSegmentsHolderPtr segment
: throw Exception(ErrorCodes::LOGICAL_ERROR, "WriteBufferToFileSegment can be created only from single segment"))
, file_segment(&segment_holder_->front())
, segment_holder(std::move(segment_holder_))
, reserve_space_lock_wait_timeout_milliseconds(getCacheLockWaitTimeout())
{
}
@ -49,7 +64,7 @@ void WriteBufferToFileSegment::nextImpl()
FileCacheReserveStat reserve_stat;
/// In case of an error, we don't need to finalize the file segment
/// because it will be deleted soon and completed in the holder's destructor.
bool ok = file_segment->reserve(bytes_to_write, &reserve_stat);
bool ok = file_segment->reserve(bytes_to_write, reserve_space_lock_wait_timeout_milliseconds, &reserve_stat);
if (!ok)
{

View File

@ -28,6 +28,8 @@ private:
/// Empty if file_segment is not owned by this WriteBufferToFileSegment
FileSegmentsHolderPtr segment_holder;
const size_t reserve_space_lock_wait_timeout_milliseconds;
};

View File

@ -5166,6 +5166,7 @@ ReadSettings Context::getReadSettings() const
res.read_from_filesystem_cache_if_exists_otherwise_bypass_cache = settings.read_from_filesystem_cache_if_exists_otherwise_bypass_cache;
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
res.filesystem_cache_segments_batch_size = settings.filesystem_cache_segments_batch_size;
res.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
res.filesystem_cache_max_download_size = settings.filesystem_cache_max_download_size;
res.skip_download_if_exceeds_query_cache = settings.skip_download_if_exceeds_query_cache;
@ -5214,6 +5215,7 @@ WriteSettings Context::getWriteSettings() const
res.enable_filesystem_cache_on_write_operations = settings.enable_filesystem_cache_on_write_operations;
res.enable_filesystem_cache_log = settings.enable_filesystem_cache_log;
res.throw_on_error_from_cache = settings.throw_on_error_from_cache_on_write_operations;
res.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds = settings.filesystem_cache_reserve_space_wait_lock_timeout_milliseconds;
res.s3_allow_parallel_part_upload = settings.s3_allow_parallel_part_upload;

View File

@ -1,15 +1,14 @@
#pragma once
#include <Core/UUID.h>
#include <Databases/IDatabase.h>
#include <Databases/TablesDependencyGraph.h>
#include <Interpreters/Context_fwd.h>
#include <Interpreters/StorageID.h>
#include <Databases/TablesDependencyGraph.h>
#include <Parsers/IAST_fwd.h>
#include <Storages/IStorage_fwd.h>
#include "Common/NamePrompter.h"
#include <Common/NamePrompter.h>
#include <Common/SharedMutex.h>
#include "Storages/IStorage.h"
#include "Databases/IDatabase.h"
#include <boost/noncopyable.hpp>
#include <Poco/Logger.h>

View File

@ -1,21 +1,18 @@
#include <Compression/CompressedWriteBuffer.h>
#include <Formats/NativeWriter.h>
#include <Formats/formatBlock.h>
#include <Interpreters/Context.h>
#include <Interpreters/GraceHashJoin.h>
#include <Interpreters/HashJoin.h>
#include <Interpreters/TableJoin.h>
#include <Formats/NativeWriter.h>
#include <Interpreters/TemporaryDataOnDisk.h>
#include <Compression/CompressedWriteBuffer.h>
#include <base/FnTraits.h>
#include <Common/formatReadable.h>
#include <Common/logger_useful.h>
#include <Common/thread_local_rng.h>
#include <base/FnTraits.h>
#include <fmt/format.h>
#include <Formats/formatBlock.h>
#include <numeric>
#include <fmt/format.h>
namespace CurrentMetrics

View File

@ -2552,7 +2552,12 @@ void InterpreterSelectQuery::executeFetchColumns(QueryProcessingStage::Enum proc
/// If necessary, we request more sources than the number of threads - to distribute the work evenly over the threads.
if (max_streams > 1 && !is_sync_remote)
max_streams = static_cast<size_t>(max_streams * settings.max_streams_to_max_threads_ratio);
{
if (auto streams_with_ratio = max_streams * settings.max_streams_to_max_threads_ratio; streams_with_ratio < SIZE_MAX)
max_streams = static_cast<size_t>(streams_with_ratio);
else
throw Exception(ErrorCodes::PARAMETER_OUT_OF_BOUND, "Exceeded limit for `max_streams` with `max_streams_to_max_threads_ratio`. Make sure that `max_streams * max_streams_to_max_threads_ratio` not exceeds {}, current value: {}", SIZE_MAX, streams_with_ratio);
}
auto & prewhere_info = analysis_result.prewhere_info;

View File

@ -2,11 +2,11 @@
#include <boost/noncopyable.hpp>
#include <Interpreters/Context.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Core/Block.h>
#include <Disks/IVolume.h>
#include <Common/CurrentMetrics.h>
#include <Disks/TemporaryFileOnDisk.h>
#include <Interpreters/Cache/FileSegment.h>
#include <Common/CurrentMetrics.h>
namespace CurrentMetrics

View File

@ -245,7 +245,7 @@ void download(FileSegment & file_segment)
ASSERT_EQ(file_segment.state(), State::DOWNLOADING);
ASSERT_EQ(file_segment.getDownloadedSize(), 0);
ASSERT_TRUE(file_segment.reserve(file_segment.range().size()));
ASSERT_TRUE(file_segment.reserve(file_segment.range().size(), 1000));
download(cache_base_path, file_segment);
ASSERT_EQ(file_segment.state(), State::DOWNLOADING);
@ -257,7 +257,7 @@ void assertDownloadFails(FileSegment & file_segment)
{
ASSERT_EQ(file_segment.getOrSetDownloader(), FileSegment::getCallerId());
ASSERT_EQ(file_segment.getDownloadedSize(), 0);
ASSERT_FALSE(file_segment.reserve(file_segment.range().size()));
ASSERT_FALSE(file_segment.reserve(file_segment.range().size(), 1000));
file_segment.complete();
}
@ -956,7 +956,7 @@ TEST_F(FileCacheTest, temporaryData)
for (auto & segment : *some_data_holder)
{
ASSERT_TRUE(segment->getOrSetDownloader() == DB::FileSegment::getCallerId());
ASSERT_TRUE(segment->reserve(segment->range().size()));
ASSERT_TRUE(segment->reserve(segment->range().size(), 1000));
download(*segment);
segment->complete();
}

View File

@ -304,6 +304,9 @@ void Loggers::buildLoggers(Poco::Util::AbstractConfiguration & config, Poco::Log
log_settings.turn_off_logger = DB::TextLog::shouldTurnOffLogger();
log_settings.database = config.getString("text_log.database", "system");
log_settings.table = config.getString("text_log.table", "text_log");
split->addTextLog(DB::TextLog::getLogQueue(log_settings), text_log_level);
}
#endif

View File

@ -62,7 +62,7 @@ protected:
settings.ostr << '.';
}
chassert(table);
chassert(table != nullptr, "Table is empty for the ASTQueryWithTableAndOutputImpl.");
table->formatImpl(settings, state, frame);
}
};

View File

@ -85,6 +85,15 @@ using ASTShowCreateDictionaryQuery = ASTQueryWithTableAndOutputImpl<ASTShowCreat
class ASTExistsDatabaseQuery : public ASTQueryWithTableAndOutputImpl<ASTExistsDatabaseQueryIDAndQueryNames>
{
public:
ASTPtr clone() const override
{
auto res = std::make_shared<ASTExistsDatabaseQuery>(*this);
res->children.clear();
cloneTableOptions(*res);
return res;
}
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
{
@ -98,6 +107,15 @@ protected:
class ASTShowCreateDatabaseQuery : public ASTQueryWithTableAndOutputImpl<ASTShowCreateDatabaseQueryIDAndQueryNames>
{
public:
ASTPtr clone() const override
{
auto res = std::make_shared<ASTShowCreateDatabaseQuery>(*this);
res->children.clear();
cloneTableOptions(*res);
return res;
}
protected:
void formatQueryImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override
{

View File

@ -3,6 +3,8 @@
#include <DataTypes/DataTypesNumber.h>
#include <DataTypes/DataTypeNullable.h>
#include <Interpreters/Context.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/ConstantNode.h>
#include <Analyzer/WindowNode.h>

View File

@ -224,7 +224,9 @@ Chunk ParallelParsingInputFormat::read()
/// skipped all rows. For example, it can happen while using settings
/// input_format_allow_errors_num/input_format_allow_errors_ratio
/// and this segment contained only rows with errors.
/// Process the next unit.
/// Return this empty unit back to segmentator and process the next unit.
unit->status = READY_TO_INSERT;
segmentator_condvar.notify_all();
++reader_ticket_number;
unit = &processing_units[reader_ticket_number % processing_units.size()];
}

View File

@ -8,6 +8,7 @@
#include <IO/Operators.h>
#include <Interpreters/Aggregator.h>
#include <Interpreters/Context.h>
#include <Interpreters/ExpressionActions.h>
#include <Processors/Merges/AggregatingSortedTransform.h>
#include <Processors/Merges/FinishAggregatingInOrderTransform.h>
#include <Processors/QueryPlan/AggregatingStep.h>

View File

@ -1,10 +1,11 @@
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
#include <Interpreters/ExpressionActions.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <Processors/QueryPlan/CubeStep.h>
#include <Processors/Transforms/CubeTransform.h>
#include <Processors/Transforms/ExpressionTransform.h>
#include <Processors/QueryPlan/AggregatingStep.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/FunctionFactory.h>
namespace DB
{

View File

@ -18,6 +18,7 @@
#include <Common/logger_useful.h>
#include <Storages/StorageDummy.h>
#include <Storages/VirtualColumnUtils.h>
#include <Planner/PlannerExpressionAnalysis.h>
#include <Interpreters/InterpreterSelectQuery.h>
#include <Interpreters/InterpreterSelectQueryAnalyzer.h>
@ -464,6 +465,9 @@ AggregateProjectionCandidates getAggregateProjectionCandidates(
// LOG_TRACE(getLogger("optimizeUseProjections"), "Query DAG: {}", dag.dag->dumpDAG());
candidates.has_filter = dag.filter_node;
/// We can't use minmax projection if filter has non-deterministic functions.
if (dag.filter_node && !VirtualColumnUtils::isDeterministicInScopeOfQuery(dag.filter_node))
can_use_minmax_projection = false;
if (can_use_minmax_projection)
{

View File

@ -1,6 +1,7 @@
#include <memory>
#include <stdexcept>
#include <IO/Operators.h>
#include <Interpreters/Context.h>
#include <Processors/Merges/MergingSortedTransform.h>
#include <Processors/QueryPlan/SortingStep.h>
#include <Processors/Transforms/FinishSortingTransform.h>

View File

@ -337,7 +337,6 @@ void FilterTransform::doTransform(Chunk & chunk)
min_size_in_memory = size_in_memory;
first_non_constant_column = i;
}
break;
}
}
(void)min_size_in_memory; /// Suppress error of clang-analyzer-deadcode.DeadStores

View File

@ -126,8 +126,6 @@ bool ReadProgressCallback::onProgress(uint64_t read_rows, uint64_t read_bytes, c
CurrentThread::updatePerformanceCountersIfNeeded();
std::lock_guard lock(limits_and_quotas_mutex);
/// TODO: Should be done in PipelineExecutor.
for (const auto & limits : storage_limits)
limits.local_limits.speed_limits.throttle(progress.read_rows, progress.read_bytes, total_rows, total_stopwatch.elapsedMicroseconds(), limits.local_limits.timeout_overflow_mode);

View File

@ -41,7 +41,6 @@ private:
/// The total number of bytes to read. For progress bar.
std::atomic_size_t total_bytes = 0;
std::mutex limits_and_quotas_mutex;
Stopwatch total_stopwatch{CLOCK_MONOTONIC_COARSE}; /// Including waiting time
bool update_profile_events = true;

View File

@ -1071,6 +1071,8 @@ std::optional<UInt64> MergeTreeData::totalRowsByPartitionPredicateImpl(
auto virtual_columns_block = getBlockWithVirtualsForFilter({parts[0]});
auto filter_dag = VirtualColumnUtils::splitFilterDagForAllowedInputs(filter_actions_dag->getOutputs().at(0), nullptr);
if (!filter_dag)
return {};
/// Generate valid expressions for filtering
bool valid = true;

View File

@ -25,6 +25,8 @@ StorageSystemDisks::StorageSystemDisks(const StorageID & table_id_)
{"unreserved_space", std::make_shared<DataTypeUInt64>(), "Free space which is not taken by reservations (free_space minus the size of reservations taken by merges, inserts, and other disk write operations currently running)."},
{"keep_free_space", std::make_shared<DataTypeUInt64>(), "Amount of disk space that should stay free on disk in bytes. Defined in the keep_free_space_bytes parameter of disk configuration."},
{"type", std::make_shared<DataTypeString>(), "The disk type which tells where this disk stores the data - RAM, local drive or remote storage."},
{"object_storage_type", std::make_shared<DataTypeString>(), "Type of object storage if disk type is object_storage"},
{"metadata_type", std::make_shared<DataTypeString>(), "Type of metadata storage if disk type is object_storage"},
{"is_encrypted", std::make_shared<DataTypeUInt8>(), "Flag which shows whether this disk ecrypts the underlying data. "},
{"is_read_only", std::make_shared<DataTypeUInt8>(), "Flag which indicates that you can only perform read operations with this disk."},
{"is_write_once", std::make_shared<DataTypeUInt8>(), "Flag which indicates if disk is write-once. Which means that it does support BACKUP to this disk, but does not support INSERT into MergeTree table on this disk."},
@ -53,6 +55,8 @@ Pipe StorageSystemDisks::read(
MutableColumnPtr col_unreserved = ColumnUInt64::create();
MutableColumnPtr col_keep = ColumnUInt64::create();
MutableColumnPtr col_type = ColumnString::create();
MutableColumnPtr col_object_storage_type = ColumnString::create();
MutableColumnPtr col_metadata_type = ColumnString::create();
MutableColumnPtr col_is_encrypted = ColumnUInt8::create();
MutableColumnPtr col_is_read_only = ColumnUInt8::create();
MutableColumnPtr col_is_write_once = ColumnUInt8::create();
@ -69,7 +73,9 @@ Pipe StorageSystemDisks::read(
col_unreserved->insert(disk_ptr->getUnreservedSpace().value_or(std::numeric_limits<UInt64>::max()));
col_keep->insert(disk_ptr->getKeepingFreeSpace());
auto data_source_description = disk_ptr->getDataSourceDescription();
col_type->insert(data_source_description.toString());
col_type->insert(magic_enum::enum_name(data_source_description.type));
col_object_storage_type->insert(magic_enum::enum_name(data_source_description.object_storage_type));
col_metadata_type->insert(magic_enum::enum_name(data_source_description.metadata_type));
col_is_encrypted->insert(data_source_description.is_encrypted);
col_is_read_only->insert(disk_ptr->isReadOnly());
col_is_write_once->insert(disk_ptr->isWriteOnce());
@ -91,6 +97,8 @@ Pipe StorageSystemDisks::read(
res_columns.emplace_back(std::move(col_unreserved));
res_columns.emplace_back(std::move(col_keep));
res_columns.emplace_back(std::move(col_type));
res_columns.emplace_back(std::move(col_object_storage_type));
res_columns.emplace_back(std::move(col_metadata_type));
res_columns.emplace_back(std::move(col_is_encrypted));
res_columns.emplace_back(std::move(col_is_read_only));
res_columns.emplace_back(std::move(col_is_write_once));

View File

@ -238,6 +238,23 @@ static bool canEvaluateSubtree(const ActionsDAG::Node * node, const Block & allo
return true;
}
bool isDeterministicInScopeOfQuery(const ActionsDAG::Node * node)
{
for (const auto * child : node->children)
{
if (!isDeterministicInScopeOfQuery(child))
return false;
}
if (node->type != ActionsDAG::ActionType::FUNCTION)
return true;
if (!node->function_base->isDeterministicInScopeOfQuery())
return false;
return true;
}
static const ActionsDAG::Node * splitFilterNodeForAllowedInputs(
const ActionsDAG::Node * node,
const Block * allowed_inputs,
@ -313,6 +330,10 @@ static const ActionsDAG::Node * splitFilterNodeForAllowedInputs(
}
}
}
else if (!isDeterministicInScopeOfQuery(node))
{
return nullptr;
}
}
if (allowed_inputs && !canEvaluateSubtree(node, *allowed_inputs))

View File

@ -25,6 +25,9 @@ void filterBlockWithPredicate(const ActionsDAG::Node * predicate, Block & block,
/// Just filters block. Block should contain all the required columns.
void filterBlockWithDAG(ActionsDAGPtr dag, Block & block, ContextPtr context);
/// Recursively checks if all functions used in DAG are deterministic in scope of query.
bool isDeterministicInScopeOfQuery(const ActionsDAG::Node * node);
/// Extract a part of predicate that can be evaluated using only columns from input_names.
ActionsDAGPtr splitFilterDagForAllowedInputs(const ActionsDAG::Node * predicate, const Block * allowed_inputs);

View File

@ -9,7 +9,6 @@
01747_join_view_filter_dictionary
01761_cast_to_enum_nullable
01925_join_materialized_columns
01952_optimize_distributed_group_by_sharding_key
02354_annoy
# Check after constants refactoring
02901_parallel_replicas_rollup

View File

@ -51,7 +51,7 @@ class Queue:
label: str
def get_scales() -> Tuple[int, int]:
def get_scales(runner_type: str) -> Tuple[int, int]:
"returns the multipliers for scaling down and up ASG by types"
# Scaling down is quicker on the lack of running jobs than scaling up on
# queue
@ -63,8 +63,12 @@ def get_scales() -> Tuple[int, int]:
# 10. I am trying 7 now.
# 7 still looks a bit slow, so I try 6
# Let's have it the same as the other ASG
#
# All type of style-checkers should be added very quickly to not block the workflows
# UPDATE THE COMMENT ON CHANGES
scale_up = 3
if "style" in runner_type:
scale_up = 1
return scale_down, scale_up
@ -95,7 +99,7 @@ def set_capacity(
continue
raise ValueError("Queue status is not in ['in_progress', 'queued']")
scale_down, scale_up = get_scales()
scale_down, scale_up = get_scales(runner_type)
# With lyfecycle hooks some instances are actually free because some of
# them are in 'Terminating:Wait' state
effective_capacity = max(
@ -138,7 +142,7 @@ def set_capacity(
logging.info(
"The ASG %s capacity will be increased to %s, current capacity=%s, "
"effective capacity=%sm maximum capacity=%s, running jobs=%s, queue size=%s",
"effective capacity=%s, maximum capacity=%s, running jobs=%s, queue size=%s",
asg["AutoScalingGroupName"],
desired_capacity,
effective_capacity,

Some files were not shown because too many files have changed in this diff Show More