diff --git a/.github/workflows/backport_branches.yml b/.github/workflows/backport_branches.yml
index cbd3bd7bec4..c52a58eac8a 100644
--- a/.github/workflows/backport_branches.yml
+++ b/.github/workflows/backport_branches.yml
@@ -683,3 +683,4 @@ jobs:
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 finish_check.py
+ python3 merge_pr.py
diff --git a/.github/workflows/docs_check.yml b/.github/workflows/docs_check.yml
index a513eb9216d..d69020d810e 100644
--- a/.github/workflows/docs_check.yml
+++ b/.github/workflows/docs_check.yml
@@ -169,3 +169,4 @@ jobs:
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 finish_check.py
+ python3 merge_pr.py --check-approved
diff --git a/.github/workflows/pull_request.yml b/.github/workflows/pull_request.yml
index aecf3799a5d..c677ec4bf5c 100644
--- a/.github/workflows/pull_request.yml
+++ b/.github/workflows/pull_request.yml
@@ -4388,3 +4388,4 @@ jobs:
run: |
cd "$GITHUB_WORKSPACE/tests/ci"
python3 finish_check.py
+ python3 merge_pr.py --check-approved
diff --git a/docker/test/fuzzer/run-fuzzer.sh b/docker/test/fuzzer/run-fuzzer.sh
index 2582b599d58..3458cf905da 100755
--- a/docker/test/fuzzer/run-fuzzer.sh
+++ b/docker/test/fuzzer/run-fuzzer.sh
@@ -5,6 +5,7 @@ set -x
# core.COMM.PID-TID
sysctl kernel.core_pattern='core.%e.%p-%P'
+dmesg --clear ||:
set -e
set -u
@@ -368,6 +369,7 @@ if [ -f core.zst ]; then
fi
rg --text -F '' server.log > fatal.log ||:
+dmesg -T > dmesg.log ||:
zstd --threads=0 server.log
@@ -396,6 +398,7 @@ p.links a { padding: 5px; margin: 3px; background: #FFF; line-height: 2; white-s
fuzzer.log
server.log.zst
main.log
+ dmesg.log
${CORE_LINK}
diff --git a/docs/en/engines/database-engines/postgresql.md b/docs/en/engines/database-engines/postgresql.md
index ac19794c167..939995a61c5 100644
--- a/docs/en/engines/database-engines/postgresql.md
+++ b/docs/en/engines/database-engines/postgresql.md
@@ -136,3 +136,7 @@ DESCRIBE TABLE test_database.test_table;
│ data │ Nullable(String) │
└────────┴───────────────────┘
```
+
+## Related content
+
+- Blog: [ClickHouse and PostgreSQL - a match made in data heaven - part 1](https://clickhouse.com/blog/migrating-data-between-clickhouse-postgres)
diff --git a/docs/en/engines/table-engines/integrations/postgresql.md b/docs/en/engines/table-engines/integrations/postgresql.md
index 7f9659400b8..b73d28c8508 100644
--- a/docs/en/engines/table-engines/integrations/postgresql.md
+++ b/docs/en/engines/table-engines/integrations/postgresql.md
@@ -175,3 +175,6 @@ CREATE TABLE pg_table_schema_with_dots (a UInt32)
- [The `postgresql` table function](../../../sql-reference/table-functions/postgresql.md)
- [Using PostgreSQL as a dictionary source](../../../sql-reference/dictionaries/external-dictionaries/external-dicts-dict-sources.md#dicts-external_dicts_dict_sources-postgresql)
+
+## Related content
+- Blog: [ClickHouse and PostgreSQL - a match made in data heaven - part 1](https://clickhouse.com/blog/migrating-data-between-clickhouse-postgres)
diff --git a/docs/en/operations/utilities/clickhouse-local.md b/docs/en/operations/utilities/clickhouse-local.md
index 963ddfe7a02..a4fa5579638 100644
--- a/docs/en/operations/utilities/clickhouse-local.md
+++ b/docs/en/operations/utilities/clickhouse-local.md
@@ -120,5 +120,6 @@ Read 186 rows, 4.15 KiB in 0.035 sec., 5302 rows/sec., 118.34 KiB/sec.
## Related Content
+- [Extracting, converting, and querying data in local files using clickhouse-local](https://clickhouse.com/blog/extracting-converting-querying-local-files-with-sql-clickhouse-local)
- [Getting Data Into ClickHouse - Part 1](https://clickhouse.com/blog/getting-data-into-clickhouse-part-1)
- [Exploring massive, real-world data sets: 100+ Years of Weather Records in ClickHouse](https://clickhouse.com/blog/real-world-data-noaa-climate-data)
diff --git a/docs/en/sql-reference/aggregate-functions/reference/index.md b/docs/en/sql-reference/aggregate-functions/reference/index.md
index 5d9423e0a55..bd8e72e0fec 100644
--- a/docs/en/sql-reference/aggregate-functions/reference/index.md
+++ b/docs/en/sql-reference/aggregate-functions/reference/index.md
@@ -57,6 +57,7 @@ ClickHouse-specific aggregate functions:
- [uniqCombined](../../../sql-reference/aggregate-functions/reference/uniqcombined.md)
- [uniqCombined64](../../../sql-reference/aggregate-functions/reference/uniqcombined64.md)
- [uniqHLL12](../../../sql-reference/aggregate-functions/reference/uniqhll12.md)
+- [uniqTheta](../../../sql-reference/aggregate-functions/reference/uniqthetasketch.md)
- [quantile](../../../sql-reference/aggregate-functions/reference/quantile.md)
- [quantiles](../../../sql-reference/aggregate-functions/reference/quantiles.md)
- [quantileExact](../../../sql-reference/aggregate-functions/reference/quantileexact.md)
@@ -79,4 +80,4 @@ ClickHouse-specific aggregate functions:
- [cramersVBiasCorrected](./cramersvbiascorrected.md)
- [theilsU](./theilsu.md)
- [maxIntersections](./maxintersections.md)
-- [maxIntersectionsPosition](./maxintersectionsposition.md)
\ No newline at end of file
+- [maxIntersectionsPosition](./maxintersectionsposition.md)
diff --git a/docs/en/sql-reference/aggregate-functions/reference/quantileinterpolatedweighted.md b/docs/en/sql-reference/aggregate-functions/reference/quantileinterpolatedweighted.md
new file mode 100644
index 00000000000..07fcd187217
--- /dev/null
+++ b/docs/en/sql-reference/aggregate-functions/reference/quantileinterpolatedweighted.md
@@ -0,0 +1,68 @@
+---
+slug: /en/sql-reference/aggregate-functions/reference/quantileInterpolatedWeighted
+sidebar_position: 203
+---
+
+# quantileInterpolatedWeighted
+
+Computes [quantile](https://en.wikipedia.org/wiki/Quantile) of a numeric data sequence using linear interpolation, taking into account the weight of each element.
+
+To get the interpolated value, all the passed values are combined into an array, which are then sorted by their corresponding weights. Quantile interpolation is then performed using the [weighted percentile method](https://en.wikipedia.org/wiki/Percentile#The_weighted_percentile_method) by building a cumulative distribution based on weights and then a linear interpolation is performed using the weights and the values to compute the quantiles.
+
+When using multiple `quantile*` functions with different levels in a query, the internal states are not combined (that is, the query works less efficiently than it could). In this case, use the [quantiles](../../../sql-reference/aggregate-functions/reference/quantiles.md#quantiles) function.
+
+**Syntax**
+
+``` sql
+quantileInterpolatedWeighted(level)(expr, weight)
+```
+
+Alias: `medianInterpolatedWeighted`.
+
+**Arguments**
+
+- `level` — Level of quantile. Optional parameter. Constant floating-point number from 0 to 1. We recommend using a `level` value in the range of `[0.01, 0.99]`. Default value: 0.5. At `level=0.5` the function calculates [median](https://en.wikipedia.org/wiki/Median).
+- `expr` — Expression over the column values resulting in numeric [data types](../../../sql-reference/data-types/index.md#data_types), [Date](../../../sql-reference/data-types/date.md) or [DateTime](../../../sql-reference/data-types/datetime.md).
+- `weight` — Column with weights of sequence members. Weight is a number of value occurrences.
+
+**Returned value**
+
+- Quantile of the specified level.
+
+Type:
+
+- [Float64](../../../sql-reference/data-types/float.md) for numeric data type input.
+- [Date](../../../sql-reference/data-types/date.md) if input values have the `Date` type.
+- [DateTime](../../../sql-reference/data-types/datetime.md) if input values have the `DateTime` type.
+
+**Example**
+
+Input table:
+
+``` text
+┌─n─┬─val─┐
+│ 0 │ 3 │
+│ 1 │ 2 │
+│ 2 │ 1 │
+│ 5 │ 4 │
+└───┴─────┘
+```
+
+Query:
+
+``` sql
+SELECT quantileInterpolatedWeighted(n, val) FROM t
+```
+
+Result:
+
+``` text
+┌─quantileInterpolatedWeighted(n, val)─┐
+│ 1 │
+└──────────────────────────────────────┘
+```
+
+**See Also**
+
+- [median](../../../sql-reference/aggregate-functions/reference/median.md#median)
+- [quantiles](../../../sql-reference/aggregate-functions/reference/quantiles.md#quantiles)
diff --git a/docs/en/sql-reference/aggregate-functions/reference/quantiles.md b/docs/en/sql-reference/aggregate-functions/reference/quantiles.md
index 5c9120fb8f4..57151915336 100644
--- a/docs/en/sql-reference/aggregate-functions/reference/quantiles.md
+++ b/docs/en/sql-reference/aggregate-functions/reference/quantiles.md
@@ -9,7 +9,7 @@ sidebar_position: 201
Syntax: `quantiles(level1, level2, …)(x)`
-All the quantile functions also have corresponding quantiles functions: `quantiles`, `quantilesDeterministic`, `quantilesTiming`, `quantilesTimingWeighted`, `quantilesExact`, `quantilesExactWeighted`, `quantilesTDigest`, `quantilesBFloat16`. These functions calculate all the quantiles of the listed levels in one pass, and return an array of the resulting values.
+All the quantile functions also have corresponding quantiles functions: `quantiles`, `quantilesDeterministic`, `quantilesTiming`, `quantilesTimingWeighted`, `quantilesExact`, `quantilesExactWeighted`, `quantileInterpolatedWeighted`, `quantilesTDigest`, `quantilesBFloat16`. These functions calculate all the quantiles of the listed levels in one pass, and return an array of the resulting values.
## quantilesExactExclusive
diff --git a/docs/en/sql-reference/data-types/json.md b/docs/en/sql-reference/data-types/json.md
index ab1596b1760..d9099ba5ad3 100644
--- a/docs/en/sql-reference/data-types/json.md
+++ b/docs/en/sql-reference/data-types/json.md
@@ -6,6 +6,10 @@ sidebar_label: JSON
# JSON
+:::warning
+This feature is experimental and is not production ready. If you need to work with JSON documents, consider using [this guide](/docs/en/guides/developer/working-with-json/json-load-data.md) instead.
+:::
+
Stores JavaScript Object Notation (JSON) documents in a single column.
`JSON` is an alias for `Object('json')`.
diff --git a/docs/en/sql-reference/functions/array-functions.md b/docs/en/sql-reference/functions/array-functions.md
index c044b972754..9d2f89c1837 100644
--- a/docs/en/sql-reference/functions/array-functions.md
+++ b/docs/en/sql-reference/functions/array-functions.md
@@ -121,7 +121,7 @@ Accepts an empty array and returns a one-element array that is equal to the defa
## range(end), range(\[start, \] end \[, step\])
-Returns an array of `UInt` numbers from `start` to `end - 1` by `step`.
+Returns an array of numbers from `start` to `end - 1` by `step`. The supported types are [UInt8, UInt16, UInt32, UInt64, Int8, Int16, Int32, Int64](../data-types/int-uint.md).
**Syntax**
``` sql
@@ -130,31 +130,30 @@ range([start, ] end [, step])
**Arguments**
-- `start` — The first element of the array. Optional, required if `step` is used. Default value: 0. [UInt](../data-types/int-uint.md)
-- `end` — The number before which the array is constructed. Required. [UInt](../data-types/int-uint.md)
-- `step` — Determines the incremental step between each element in the array. Optional. Default value: 1. [UInt](../data-types/int-uint.md)
+- `start` — The first element of the array. Optional, required if `step` is used. Default value: 0.
+- `end` — The number before which the array is constructed. Required.
+- `step` — Determines the incremental step between each element in the array. Optional. Default value: 1.
**Returned value**
-- Array of `UInt` numbers from `start` to `end - 1` by `step`.
+- Array of numbers from `start` to `end - 1` by `step`.
**Implementation details**
-- All arguments must be positive values: `start`, `end`, `step` are `UInt` data types, as well as elements of the returned array.
+- All arguments `start`, `end`, `step` must be below data types: `UInt8`, `UInt16`, `UInt32`, `UInt64`,`Int8`, `Int16`, `Int32`, `Int64`, as well as elements of the returned array, which's type is a super type of all arguments's.
- An exception is thrown if query results in arrays with a total length of more than number of elements specified by the [function_range_max_elements_in_block](../../operations/settings/settings.md#settings-function_range_max_elements_in_block) setting.
-
**Examples**
Query:
``` sql
-SELECT range(5), range(1, 5), range(1, 5, 2);
+SELECT range(5), range(1, 5), range(1, 5, 2), range(-1, 5, 2);
```
Result:
```txt
-┌─range(5)────┬─range(1, 5)─┬─range(1, 5, 2)─┐
-│ [0,1,2,3,4] │ [1,2,3,4] │ [1,3] │
-└─────────────┴─────────────┴────────────────┘
+┌─range(5)────┬─range(1, 5)─┬─range(1, 5, 2)─┬─range(-1, 5, 2)─┐
+│ [0,1,2,3,4] │ [1,2,3,4] │ [1,3] │ [-1,1,3] │
+└─────────────┴─────────────┴────────────────┴─────────────────┘
```
## array(x1, …), operator \[x1, …\]
diff --git a/docs/en/sql-reference/table-functions/generate.md b/docs/en/sql-reference/table-functions/generate.md
index dd56b47cd3a..380c8364090 100644
--- a/docs/en/sql-reference/table-functions/generate.md
+++ b/docs/en/sql-reference/table-functions/generate.md
@@ -39,3 +39,16 @@ SELECT * FROM generateRandom('a Array(Int8), d Decimal32(4), c Tuple(DateTime64(
│ [68] │ -67417.0770 │ ('2080-03-12 14:17:31.269','110425e5-413f-10a6-05ba-fa6b3e929f15') │
└──────────┴──────────────┴────────────────────────────────────────────────────────────────────┘
```
+
+```sql
+CREATE TABLE random (a Array(Int8), d Decimal32(4), c Tuple(DateTime64(3), UUID)) engine=Memory;
+INSERT INTO random SELECT * FROM generateRandom() LIMIT 2;
+SELECT * FROM random;
+```
+
+```text
+┌─a────────────────────────────┬────────────d─┬─c──────────────────────────────────────────────────────────────────┐
+│ [] │ 68091.8197 │ ('2037-10-02 12:44:23.368','039ecab7-81c2-45ee-208c-844e5c6c5652') │
+│ [8,-83,0,-22,65,9,-30,28,64] │ -186233.4909 │ ('2062-01-11 00:06:04.124','69563ea1-5ad1-f870-16d8-67061da0df25') │
+└──────────────────────────────┴──────────────┴────────────────────────────────────────────────────────────────────┘
+```
\ No newline at end of file
diff --git a/docs/zh/sql-reference/functions/array-functions.md b/docs/zh/sql-reference/functions/array-functions.md
index 565304710cc..d150b94b8af 100644
--- a/docs/zh/sql-reference/functions/array-functions.md
+++ b/docs/zh/sql-reference/functions/array-functions.md
@@ -117,7 +117,7 @@ SELECT notEmpty([1,2]);
## range(end), range(\[start, \] end \[, step\]) {#range}
-返回一个以`step`作为增量步长的从`start`到`end - 1`的`UInt`类型数字数组。
+返回一个以`step`作为增量步长的从`start`到`end - 1`的整形数字数组, 支持类型包括[`UInt8`, `UInt16`, `UInt32`, `UInt64`, `Int8`, `Int16`, `Int32`, `Int64`](../data-types/int-uint.md)。
**语法**
``` sql
@@ -126,31 +126,30 @@ range([start, ] end [, step])
**参数**
-- `start` — 数组的第一个元素。可选项,如果设置了`step`时同样需要`start`,默认值为:0,类型为[UInt](../data-types/int-uint.md)。
-- `end` — 计数到`end`结束,但不包括`end`,必填项,类型为[UInt](../data-types/int-uint.md)。
-- `step` — 确定数组中每个元素之间的增量步长。可选项,默认值为:1,类型为[UInt](../data-types/int-uint.md)。
+- `start` — 数组的第一个元素。可选项,如果设置了`step`时同样需要`start`,默认值为:0。
+- `end` — 计数到`end`结束,但不包括`end`,必填项。
+- `step` — 确定数组中每个元素之间的增量步长。可选项,默认值为:1。
**返回值**
-- 以`step`作为增量步长的从`start`到`end - 1`的`UInt`类型数字数组。
+- 以`step`作为增量步长的从`start`到`end - 1`的数字数组。
**注意事项**
-- 所有参数必须是正值:`start`、`end`、`step`,类型均为`UInt`,结果数组的元素与此相同。
+- 所有参数`start`、`end`、`step`必须属于以下几种类型之一:[`UInt8`, `UInt16`, `UInt32`, `UInt64`, `Int8`, `Int16`, `Int32`, `Int64`](../data-types/int-uint.md)。结果数组的元素数据类型为所有入参类型的最小超类,也必须属于以上几种类型之一。
- 如果查询结果的数组总长度超过[function_range_max_elements_in_block](../../operations/settings/settings.md#settings-function_range_max_elements_in_block)指定的元素数,将会抛出异常。
-
**示例**
查询语句:
``` sql
-SELECT range(5), range(1, 5), range(1, 5, 2);
+SELECT range(5), range(1, 5), range(1, 5, 2), range(-1, 5, 2);
```
结果:
```txt
-┌─range(5)────┬─range(1, 5)─┬─range(1, 5, 2)─┐
-│ [0,1,2,3,4] │ [1,2,3,4] │ [1,3] │
-└─────────────┴─────────────┴────────────────┘
+┌─range(5)────┬─range(1, 5)─┬─range(1, 5, 2)─┬─range(-1, 5, 2)─┐
+│ [0,1,2,3,4] │ [1,2,3,4] │ [1,3] │ [-1,1,3] │
+└─────────────┴─────────────┴────────────────┴─────────────────┘
```
## array(x1, …), operator \[x1, …\] {#arrayx1-operator-x1}
diff --git a/programs/server/Server.cpp b/programs/server/Server.cpp
index 742838d6433..419b80ccff2 100644
--- a/programs/server/Server.cpp
+++ b/programs/server/Server.cpp
@@ -140,6 +140,7 @@ namespace CurrentMetrics
namespace ProfileEvents
{
extern const Event MainConfigLoads;
+ extern const Event ServerStartupMilliseconds;
}
namespace fs = std::filesystem;
@@ -652,6 +653,8 @@ static void sanityChecks(Server & server)
int Server::main(const std::vector & /*args*/)
try
{
+ Stopwatch startup_watch;
+
Poco::Logger * log = &logger();
UseSSL use_ssl;
@@ -1822,6 +1825,9 @@ try
LOG_INFO(log, "Ready for connections.");
}
+ startup_watch.stop();
+ ProfileEvents::increment(ProfileEvents::ServerStartupMilliseconds, startup_watch.elapsedMilliseconds());
+
try
{
global_context->startClusterDiscovery();
diff --git a/src/AggregateFunctions/AggregateFunctionHistogram.h b/src/AggregateFunctions/AggregateFunctionHistogram.h
index c559b3f115f..ac81f7466fa 100644
--- a/src/AggregateFunctions/AggregateFunctionHistogram.h
+++ b/src/AggregateFunctions/AggregateFunctionHistogram.h
@@ -207,7 +207,7 @@ private:
{
// Fuse points if their text representations differ only in last digit
auto min_diff = 10 * (points[left].mean + points[right].mean) * std::numeric_limits::epsilon();
- if (points[left].mean + min_diff >= points[right].mean)
+ if (points[left].mean + std::fabs(min_diff) >= points[right].mean)
{
points[left] = points[left] + points[right];
}
diff --git a/src/AggregateFunctions/AggregateFunctionQuantile.h b/src/AggregateFunctions/AggregateFunctionQuantile.h
index 6427d03f089..49157acf690 100644
--- a/src/AggregateFunctions/AggregateFunctionQuantile.h
+++ b/src/AggregateFunctions/AggregateFunctionQuantile.h
@@ -232,6 +232,9 @@ struct NameQuantilesExactInclusive { static constexpr auto name = "quantilesExac
struct NameQuantileExactWeighted { static constexpr auto name = "quantileExactWeighted"; };
struct NameQuantilesExactWeighted { static constexpr auto name = "quantilesExactWeighted"; };
+struct NameQuantileInterpolatedWeighted { static constexpr auto name = "quantileInterpolatedWeighted"; };
+struct NameQuantilesInterpolatedWeighted { static constexpr auto name = "quantilesInterpolatedWeighted"; };
+
struct NameQuantileTiming { static constexpr auto name = "quantileTiming"; };
struct NameQuantileTimingWeighted { static constexpr auto name = "quantileTimingWeighted"; };
struct NameQuantilesTiming { static constexpr auto name = "quantilesTiming"; };
diff --git a/src/AggregateFunctions/AggregateFunctionQuantileInterpolatedWeighted.cpp b/src/AggregateFunctions/AggregateFunctionQuantileInterpolatedWeighted.cpp
new file mode 100644
index 00000000000..68b42376df7
--- /dev/null
+++ b/src/AggregateFunctions/AggregateFunctionQuantileInterpolatedWeighted.cpp
@@ -0,0 +1,70 @@
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+
+
+namespace DB
+{
+struct Settings;
+
+namespace ErrorCodes
+{
+ extern const int ILLEGAL_TYPE_OF_ARGUMENT;
+}
+
+namespace
+{
+
+ template using FuncQuantileInterpolatedWeighted = AggregateFunctionQuantile, NameQuantileInterpolatedWeighted, true, void, false>;
+ template using FuncQuantilesInterpolatedWeighted = AggregateFunctionQuantile, NameQuantilesInterpolatedWeighted, true, void, true>;
+
+ template class Function>
+ AggregateFunctionPtr createAggregateFunctionQuantile(
+ const std::string & name, const DataTypes & argument_types, const Array & params, const Settings *)
+ {
+ /// Second argument type check doesn't depend on the type of the first one.
+ Function::assertSecondArg(argument_types);
+
+ const DataTypePtr & argument_type = argument_types[0];
+ WhichDataType which(argument_type);
+
+#define DISPATCH(TYPE) \
+ if (which.idx == TypeIndex::TYPE) return std::make_shared>(argument_types, params);
+ FOR_BASIC_NUMERIC_TYPES(DISPATCH)
+#undef DISPATCH
+ if (which.idx == TypeIndex::Date) return std::make_shared>(argument_types, params);
+ if (which.idx == TypeIndex::DateTime) return std::make_shared>(argument_types, params);
+
+ if (which.idx == TypeIndex::Decimal32) return std::make_shared>(argument_types, params);
+ if (which.idx == TypeIndex::Decimal64) return std::make_shared>(argument_types, params);
+ if (which.idx == TypeIndex::Decimal128) return std::make_shared>(argument_types, params);
+ if (which.idx == TypeIndex::Decimal256) return std::make_shared>(argument_types, params);
+ if (which.idx == TypeIndex::DateTime64) return std::make_shared>(argument_types, params);
+
+ if (which.idx == TypeIndex::Int128) return std::make_shared>(argument_types, params);
+ if (which.idx == TypeIndex::UInt128) return std::make_shared>(argument_types, params);
+ if (which.idx == TypeIndex::Int256) return std::make_shared>(argument_types, params);
+ if (which.idx == TypeIndex::UInt256) return std::make_shared>(argument_types, params);
+
+ throw Exception("Illegal type " + argument_type->getName() + " of argument for aggregate function " + name,
+ ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT);
+ }
+}
+
+void registerAggregateFunctionsQuantileInterpolatedWeighted(AggregateFunctionFactory & factory)
+{
+ /// For aggregate functions returning array we cannot return NULL on empty set.
+ AggregateFunctionProperties properties = { .returns_default_when_only_null = true };
+
+ factory.registerFunction(NameQuantileInterpolatedWeighted::name, createAggregateFunctionQuantile);
+ factory.registerFunction(NameQuantilesInterpolatedWeighted::name, { createAggregateFunctionQuantile, properties });
+
+ /// 'median' is an alias for 'quantile'
+ factory.registerAlias("medianInterpolatedWeighted", NameQuantileInterpolatedWeighted::name);
+}
+
+}
diff --git a/src/AggregateFunctions/QuantileInterpolatedWeighted.h b/src/AggregateFunctions/QuantileInterpolatedWeighted.h
new file mode 100644
index 00000000000..95daeed2e57
--- /dev/null
+++ b/src/AggregateFunctions/QuantileInterpolatedWeighted.h
@@ -0,0 +1,308 @@
+#pragma once
+
+#include
+
+#include
+#include
+
+
+namespace DB
+{
+struct Settings;
+
+namespace ErrorCodes
+{
+ extern const int NOT_IMPLEMENTED;
+}
+
+/** Approximates Quantile by:
+ * - sorting input values and weights
+ * - building a cumulative distribution based on weights
+ * - performing linear interpolation between the weights and values
+ *
+ */
+template
+struct QuantileInterpolatedWeighted
+{
+ struct Int128Hash
+ {
+ size_t operator()(Int128 x) const
+ {
+ return CityHash_v1_0_2::Hash128to64({x >> 64, x & 0xffffffffffffffffll});
+ }
+ };
+
+ using Weight = UInt64;
+ using UnderlyingType = NativeType;
+ using Hasher = std::conditional_t, Int128Hash, HashCRC32>;
+
+ /// When creating, the hash table must be small.
+ using Map = HashMapWithStackMemory;
+
+ Map map;
+
+ void add(const Value & x)
+ {
+ /// We must skip NaNs as they are not compatible with comparison sorting.
+ if (!isNaN(x))
+ ++map[x];
+ }
+
+ void add(const Value & x, Weight weight)
+ {
+ if (!isNaN(x))
+ map[x] += weight;
+ }
+
+ void merge(const QuantileInterpolatedWeighted & rhs)
+ {
+ for (const auto & pair : rhs.map)
+ map[pair.getKey()] += pair.getMapped();
+ }
+
+ void serialize(WriteBuffer & buf) const
+ {
+ map.write(buf);
+ }
+
+ void deserialize(ReadBuffer & buf)
+ {
+ typename Map::Reader reader(buf);
+ while (reader.next())
+ {
+ const auto & pair = reader.get();
+ map[pair.first] = pair.second;
+ }
+ }
+
+ Value get(Float64 level) const
+ {
+ return getImpl(level);
+ }
+
+ void getMany(const Float64 * levels, const size_t * indices, size_t size, Value * result) const
+ {
+ getManyImpl(levels, indices, size, result);
+ }
+
+ /// The same, but in the case of an empty state, NaN is returned.
+ Float64 getFloat(Float64) const
+ {
+ throw Exception("Method getFloat is not implemented for QuantileInterpolatedWeighted", ErrorCodes::NOT_IMPLEMENTED);
+ }
+
+ void getManyFloat(const Float64 *, const size_t *, size_t, Float64 *) const
+ {
+ throw Exception("Method getManyFloat is not implemented for QuantileInterpolatedWeighted", ErrorCodes::NOT_IMPLEMENTED);
+ }
+
+private:
+ using Pair = typename std::pair;
+
+ /// Get the value of the `level` quantile. The level must be between 0 and 1.
+ template
+ T getImpl(Float64 level) const
+ {
+ size_t size = map.size();
+
+ if (0 == size)
+ return std::numeric_limits::quiet_NaN();
+
+ /// Maintain a vector of pair of values and weights for easier sorting and for building
+ /// a cumulative distribution using the provided weights.
+ std::vector value_weight_pairs;
+ value_weight_pairs.reserve(size);
+
+ /// Note: weight provided must be a 64-bit integer
+ /// Float64 is used as accumulator here to get approximate results.
+ /// But weight used in the internal array is stored as Float64 as we
+ /// do some quantile estimation operation which involves division and
+ /// require Float64 level of precision.
+
+ Float64 sum_weight = 0;
+ for (const auto & pair : map)
+ {
+ sum_weight += pair.getMapped();
+ auto value = pair.getKey();
+ auto weight = pair.getMapped();
+ value_weight_pairs.push_back({value, weight});
+ }
+
+ ::sort(value_weight_pairs.begin(), value_weight_pairs.end(), [](const Pair & a, const Pair & b) { return a.first < b.first; });
+
+ Float64 accumulated = 0;
+
+ /// vector for populating and storing the cumulative sum using the provided weights.
+ /// example: [0,1,2,3,4,5] -> [0,1,3,6,10,15]
+ std::vector weights_cum_sum;
+ weights_cum_sum.reserve(size);
+
+ for (size_t idx = 0; idx < size; ++idx)
+ {
+ accumulated += value_weight_pairs[idx].second;
+ weights_cum_sum.push_back(accumulated);
+ }
+
+ /// The following estimation of quantile is general and the idea is:
+ /// https://en.wikipedia.org/wiki/Percentile#The_weighted_percentile_method
+
+ /// calculates a simple cumulative distribution based on weights
+ if (sum_weight != 0)
+ {
+ for (size_t idx = 0; idx < size; ++idx)
+ value_weight_pairs[idx].second = (weights_cum_sum[idx] - 0.5 * value_weight_pairs[idx].second) / sum_weight;
+ }
+
+ /// perform linear interpolation
+ size_t idx = 0;
+ if (size >= 2)
+ {
+ if (level >= value_weight_pairs[size - 2].second)
+ {
+ idx = size - 2;
+ }
+ else
+ {
+ size_t start = 0, end = size - 1;
+ while (start <= end)
+ {
+ size_t mid = start + (end - start) / 2;
+ if (mid > size)
+ break;
+ if (level > value_weight_pairs[mid + 1].second)
+ start = mid + 1;
+ else
+ {
+ idx = mid;
+ end = mid - 1;
+ }
+ }
+ }
+ }
+
+ size_t l = idx;
+ size_t u = idx + 1 < size ? idx + 1 : idx;
+
+ Float64 xl = value_weight_pairs[l].second, xr = value_weight_pairs[u].second;
+ UnderlyingType yl = value_weight_pairs[l].first, yr = value_weight_pairs[u].first;
+
+ if (level < xl)
+ yr = yl;
+ if (level > xr)
+ yl = yr;
+
+ return static_cast(interpolate(level, xl, xr, yl, yr));
+ }
+
+ /// Get the `size` values of `levels` quantiles. Write `size` results starting with `result` address.
+ /// indices - an array of index levels such that the corresponding elements will go in ascending order.
+ template
+ void getManyImpl(const Float64 * levels, const size_t * indices, size_t num_levels, Value * result) const
+ {
+ size_t size = map.size();
+
+ if (0 == size)
+ {
+ for (size_t i = 0; i < num_levels; ++i)
+ result[i] = Value();
+ return;
+ }
+
+ std::vector value_weight_pairs;
+ value_weight_pairs.reserve(size);
+
+ Float64 sum_weight = 0;
+ for (const auto & pair : map)
+ {
+ sum_weight += pair.getMapped();
+ auto value = pair.getKey();
+ auto weight = pair.getMapped();
+ value_weight_pairs.push_back({value, weight});
+ }
+
+ ::sort(value_weight_pairs.begin(), value_weight_pairs.end(), [](const Pair & a, const Pair & b) { return a.first < b.first; });
+
+ Float64 accumulated = 0;
+
+ /// vector for populating and storing the cumulative sum using the provided weights.
+ /// example: [0,1,2,3,4,5] -> [0,1,3,6,10,15]
+ std::vector weights_cum_sum;
+ weights_cum_sum.reserve(size);
+
+ for (size_t idx = 0; idx < size; ++idx)
+ {
+ accumulated += value_weight_pairs[idx].second;
+ weights_cum_sum.emplace_back(accumulated);
+ }
+
+
+ /// The following estimation of quantile is general and the idea is:
+ /// https://en.wikipedia.org/wiki/Percentile#The_weighted_percentile_method
+
+ /// calculates a simple cumulative distribution based on weights
+ if (sum_weight != 0)
+ {
+ for (size_t idx = 0; idx < size; ++idx)
+ value_weight_pairs[idx].second = (weights_cum_sum[idx] - 0.5 * value_weight_pairs[idx].second) / sum_weight;
+ }
+
+ for (size_t level_index = 0; level_index < num_levels; ++level_index)
+ {
+ /// perform linear interpolation for every level
+ auto level = levels[indices[level_index]];
+
+ size_t idx = 0;
+ if (size >= 2)
+ {
+ if (level >= value_weight_pairs[size - 2].second)
+ {
+ idx = size - 2;
+ }
+ else
+ {
+ size_t start = 0, end = size - 1;
+ while (start <= end)
+ {
+ size_t mid = start + (end - start) / 2;
+ if (mid > size)
+ break;
+ if (level > value_weight_pairs[mid + 1].second)
+ start = mid + 1;
+ else
+ {
+ idx = mid;
+ end = mid - 1;
+ }
+ }
+ }
+ }
+
+ size_t l = idx;
+ size_t u = idx + 1 < size ? idx + 1 : idx;
+
+ Float64 xl = value_weight_pairs[l].second, xr = value_weight_pairs[u].second;
+ UnderlyingType yl = value_weight_pairs[l].first, yr = value_weight_pairs[u].first;
+
+ if (level < xl)
+ yr = yl;
+ if (level > xr)
+ yl = yr;
+
+ result[indices[level_index]] = static_cast(interpolate(level, xl, xr, yl, yr));
+ }
+ }
+
+ /// This ignores overflows or NaN's that might arise during add, sub and mul operations and doesn't aim to provide exact
+ /// results since `the quantileInterpolatedWeighted` function itself relies mainly on approximation.
+ UnderlyingType NO_SANITIZE_UNDEFINED interpolate(Float64 level, Float64 xl, Float64 xr, UnderlyingType yl, UnderlyingType yr) const
+ {
+ UnderlyingType dy = yr - yl;
+ Float64 dx = xr - xl;
+ dx = dx == 0 ? 1 : dx; /// to handle NaN behavior that might arise during integer division below.
+
+ /// yl + (dy / dx) * (level - xl)
+ return static_cast(yl + (dy / dx) * (level - xl));
+ }
+};
+
+}
diff --git a/src/AggregateFunctions/registerAggregateFunctions.cpp b/src/AggregateFunctions/registerAggregateFunctions.cpp
index ecf6ab51367..1fe759c122a 100644
--- a/src/AggregateFunctions/registerAggregateFunctions.cpp
+++ b/src/AggregateFunctions/registerAggregateFunctions.cpp
@@ -21,6 +21,7 @@ void registerAggregateFunctionsQuantile(AggregateFunctionFactory &);
void registerAggregateFunctionsQuantileDeterministic(AggregateFunctionFactory &);
void registerAggregateFunctionsQuantileExact(AggregateFunctionFactory &);
void registerAggregateFunctionsQuantileExactWeighted(AggregateFunctionFactory &);
+void registerAggregateFunctionsQuantileInterpolatedWeighted(AggregateFunctionFactory &);
void registerAggregateFunctionsQuantileExactLow(AggregateFunctionFactory &);
void registerAggregateFunctionsQuantileExactHigh(AggregateFunctionFactory &);
void registerAggregateFunctionsQuantileExactInclusive(AggregateFunctionFactory &);
@@ -106,6 +107,7 @@ void registerAggregateFunctions()
registerAggregateFunctionsQuantileDeterministic(factory);
registerAggregateFunctionsQuantileExact(factory);
registerAggregateFunctionsQuantileExactWeighted(factory);
+ registerAggregateFunctionsQuantileInterpolatedWeighted(factory);
registerAggregateFunctionsQuantileExactLow(factory);
registerAggregateFunctionsQuantileExactHigh(factory);
registerAggregateFunctionsQuantileExactInclusive(factory);
diff --git a/src/Analyzer/MatcherNode.cpp b/src/Analyzer/MatcherNode.cpp
index 9d822771087..fc74b4ff67e 100644
--- a/src/Analyzer/MatcherNode.cpp
+++ b/src/Analyzer/MatcherNode.cpp
@@ -11,6 +11,7 @@
#include
#include
#include
+#include
namespace DB
{
@@ -206,19 +207,43 @@ QueryTreeNodePtr MatcherNode::cloneImpl() const
ASTPtr MatcherNode::toASTImpl() const
{
ASTPtr result;
+ ASTPtr transformers;
+
+ if (!children.empty())
+ {
+ transformers = std::make_shared();
+
+ for (const auto & child : children)
+ transformers->children.push_back(child->toAST());
+ }
if (matcher_type == MatcherNodeType::ASTERISK)
{
if (qualified_identifier.empty())
{
- result = std::make_shared();
+ auto asterisk = std::make_shared();
+
+ if (transformers)
+ {
+ asterisk->transformers = std::move(transformers);
+ asterisk->children.push_back(asterisk->transformers);
+ }
+
+ result = asterisk;
}
else
{
auto qualified_asterisk = std::make_shared();
auto identifier_parts = qualified_identifier.getParts();
- qualified_asterisk->children.push_back(std::make_shared(std::move(identifier_parts)));
+ qualified_asterisk->qualifier = std::make_shared(std::move(identifier_parts));
+ qualified_asterisk->children.push_back(qualified_asterisk->qualifier);
+
+ if (transformers)
+ {
+ qualified_asterisk->transformers = std::move(transformers);
+ qualified_asterisk->children.push_back(qualified_asterisk->transformers);
+ }
result = qualified_asterisk;
}
@@ -229,6 +254,13 @@ ASTPtr MatcherNode::toASTImpl() const
{
auto regexp_matcher = std::make_shared();
regexp_matcher->setPattern(columns_matcher->pattern());
+
+ if (transformers)
+ {
+ regexp_matcher->transformers = std::move(transformers);
+ regexp_matcher->children.push_back(regexp_matcher->transformers);
+ }
+
result = regexp_matcher;
}
else
@@ -237,7 +269,14 @@ ASTPtr MatcherNode::toASTImpl() const
regexp_matcher->setPattern(columns_matcher->pattern());
auto identifier_parts = qualified_identifier.getParts();
- regexp_matcher->children.push_back(std::make_shared(std::move(identifier_parts)));
+ regexp_matcher->qualifier = std::make_shared(std::move(identifier_parts));
+ regexp_matcher->children.push_back(regexp_matcher->qualifier);
+
+ if (transformers)
+ {
+ regexp_matcher->transformers = std::move(transformers);
+ regexp_matcher->children.push_back(regexp_matcher->transformers);
+ }
result = regexp_matcher;
}
@@ -257,23 +296,36 @@ ASTPtr MatcherNode::toASTImpl() const
{
auto columns_list_matcher = std::make_shared();
columns_list_matcher->column_list = std::move(column_list);
+ columns_list_matcher->children.push_back(columns_list_matcher->column_list);
+
+ if (transformers)
+ {
+ columns_list_matcher->transformers = std::move(transformers);
+ columns_list_matcher->children.push_back(columns_list_matcher->transformers);
+ }
+
result = columns_list_matcher;
}
else
{
auto columns_list_matcher = std::make_shared();
- columns_list_matcher->column_list = std::move(column_list);
auto identifier_parts = qualified_identifier.getParts();
- columns_list_matcher->children.push_back(std::make_shared(std::move(identifier_parts)));
+ columns_list_matcher->qualifier = std::make_shared(std::move(identifier_parts));
+ columns_list_matcher->column_list = std::move(column_list);
+ columns_list_matcher->children.push_back(columns_list_matcher->qualifier);
+ columns_list_matcher->children.push_back(columns_list_matcher->column_list);
+
+ if (transformers)
+ {
+ columns_list_matcher->transformers = std::move(transformers);
+ columns_list_matcher->children.push_back(columns_list_matcher->transformers);
+ }
result = columns_list_matcher;
}
}
- for (const auto & child : children)
- result->children.push_back(child->toAST());
-
return result;
}
diff --git a/src/Analyzer/QueryTreeBuilder.cpp b/src/Analyzer/QueryTreeBuilder.cpp
index 2b2326badfa..adaa878ae2f 100644
--- a/src/Analyzer/QueryTreeBuilder.cpp
+++ b/src/Analyzer/QueryTreeBuilder.cpp
@@ -111,7 +111,7 @@ private:
QueryTreeNodePtr buildJoinTree(const ASTPtr & tables_in_select_query, const ContextPtr & context) const;
- ColumnTransformersNodes buildColumnTransformers(const ASTPtr & matcher_expression, size_t start_child_index, const ContextPtr & context) const;
+ ColumnTransformersNodes buildColumnTransformers(const ASTPtr & matcher_expression, const ContextPtr & context) const;
ASTPtr query;
QueryTreeNodePtr query_tree_node;
@@ -439,13 +439,13 @@ QueryTreeNodePtr QueryTreeBuilder::buildExpression(const ASTPtr & expression, co
}
else if (const auto * asterisk = expression->as())
{
- auto column_transformers = buildColumnTransformers(expression, 0 /*start_child_index*/, context);
+ auto column_transformers = buildColumnTransformers(asterisk->transformers, context);
result = std::make_shared(std::move(column_transformers));
}
else if (const auto * qualified_asterisk = expression->as())
{
- auto & qualified_identifier = qualified_asterisk->children.at(0)->as();
- auto column_transformers = buildColumnTransformers(expression, 1 /*start_child_index*/, context);
+ auto & qualified_identifier = qualified_asterisk->qualifier->as();
+ auto column_transformers = buildColumnTransformers(qualified_asterisk->transformers, context);
result = std::make_shared(Identifier(qualified_identifier.name_parts), std::move(column_transformers));
}
else if (const auto * ast_literal = expression->as())
@@ -543,7 +543,7 @@ QueryTreeNodePtr QueryTreeBuilder::buildExpression(const ASTPtr & expression, co
}
else if (const auto * columns_regexp_matcher = expression->as())
{
- auto column_transformers = buildColumnTransformers(expression, 0 /*start_child_index*/, context);
+ auto column_transformers = buildColumnTransformers(columns_regexp_matcher->transformers, context);
result = std::make_shared(columns_regexp_matcher->getMatcher(), std::move(column_transformers));
}
else if (const auto * columns_list_matcher = expression->as())
@@ -557,18 +557,18 @@ QueryTreeNodePtr QueryTreeBuilder::buildExpression(const ASTPtr & expression, co
column_list_identifiers.emplace_back(Identifier{column_list_identifier.name_parts});
}
- auto column_transformers = buildColumnTransformers(expression, 0 /*start_child_index*/, context);
+ auto column_transformers = buildColumnTransformers(columns_list_matcher->transformers, context);
result = std::make_shared(std::move(column_list_identifiers), std::move(column_transformers));
}
else if (const auto * qualified_columns_regexp_matcher = expression->as())
{
- auto & qualified_identifier = qualified_columns_regexp_matcher->children.at(0)->as();
- auto column_transformers = buildColumnTransformers(expression, 1 /*start_child_index*/, context);
+ auto & qualified_identifier = qualified_columns_regexp_matcher->qualifier->as();
+ auto column_transformers = buildColumnTransformers(qualified_columns_regexp_matcher->transformers, context);
result = std::make_shared(Identifier(qualified_identifier.name_parts), qualified_columns_regexp_matcher->getMatcher(), std::move(column_transformers));
}
else if (const auto * qualified_columns_list_matcher = expression->as())
{
- auto & qualified_identifier = qualified_columns_list_matcher->children.at(0)->as();
+ auto & qualified_identifier = qualified_columns_list_matcher->qualifier->as();
Identifiers column_list_identifiers;
column_list_identifiers.reserve(qualified_columns_list_matcher->column_list->children.size());
@@ -579,7 +579,7 @@ QueryTreeNodePtr QueryTreeBuilder::buildExpression(const ASTPtr & expression, co
column_list_identifiers.emplace_back(Identifier{column_list_identifier.name_parts});
}
- auto column_transformers = buildColumnTransformers(expression, 1 /*start_child_index*/, context);
+ auto column_transformers = buildColumnTransformers(qualified_columns_list_matcher->transformers, context);
result = std::make_shared(Identifier(qualified_identifier.name_parts), std::move(column_list_identifiers), std::move(column_transformers));
}
else
@@ -833,15 +833,15 @@ QueryTreeNodePtr QueryTreeBuilder::buildJoinTree(const ASTPtr & tables_in_select
}
-ColumnTransformersNodes QueryTreeBuilder::buildColumnTransformers(const ASTPtr & matcher_expression, size_t start_child_index, const ContextPtr & context) const
+ColumnTransformersNodes QueryTreeBuilder::buildColumnTransformers(const ASTPtr & matcher_expression, const ContextPtr & context) const
{
ColumnTransformersNodes column_transformers;
- size_t children_size = matcher_expression->children.size();
- for (; start_child_index < children_size; ++start_child_index)
+ if (!matcher_expression)
+ return column_transformers;
+
+ for (const auto & child : matcher_expression->children)
{
- const auto & child = matcher_expression->children[start_child_index];
-
if (auto * apply_transformer = child->as())
{
if (apply_transformer->lambda)
diff --git a/src/Common/ProfileEvents.cpp b/src/Common/ProfileEvents.cpp
index abc28299f96..1b387c5a080 100644
--- a/src/Common/ProfileEvents.cpp
+++ b/src/Common/ProfileEvents.cpp
@@ -449,7 +449,8 @@ The server successfully detected this situation and will download merged part fr
M(OverflowBreak, "Number of times, data processing was cancelled by query complexity limitation with setting '*_overflow_mode' = 'break' and the result is incomplete.") \
M(OverflowThrow, "Number of times, data processing was cancelled by query complexity limitation with setting '*_overflow_mode' = 'throw' and exception was thrown.") \
M(OverflowAny, "Number of times approximate GROUP BY was in effect: when aggregation was performed only on top of first 'max_rows_to_group_by' unique keys and other keys were ignored due to 'group_by_overflow_mode' = 'any'.") \
-
+ \
+ M(ServerStartupMilliseconds, "Time elapsed from starting server to listening to sockets in milliseconds")\
namespace ProfileEvents
{
diff --git a/src/Compression/CompressionCodecDelta.cpp b/src/Compression/CompressionCodecDelta.cpp
index 31800b6b332..88534f6082b 100644
--- a/src/Compression/CompressionCodecDelta.cpp
+++ b/src/Compression/CompressionCodecDelta.cpp
@@ -28,6 +28,7 @@ protected:
bool isCompression() const override { return false; }
bool isGenericCompression() const override { return false; }
+ bool isDelta() const override { return true; }
private:
UInt8 delta_bytes_size;
diff --git a/src/Compression/CompressionCodecDoubleDelta.cpp b/src/Compression/CompressionCodecDoubleDelta.cpp
index dd2507ab14a..a18ce81e02b 100644
--- a/src/Compression/CompressionCodecDoubleDelta.cpp
+++ b/src/Compression/CompressionCodecDoubleDelta.cpp
@@ -133,6 +133,7 @@ protected:
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
+ bool isDelta() const override { return true; }
private:
UInt8 data_bytes_size;
diff --git a/src/Compression/CompressionCodecFPC.cpp b/src/Compression/CompressionCodecFPC.cpp
index 48eba210b60..0468adc4610 100644
--- a/src/Compression/CompressionCodecFPC.cpp
+++ b/src/Compression/CompressionCodecFPC.cpp
@@ -39,6 +39,7 @@ protected:
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
+ bool isFloatingPointTimeSeries() const override { return true; }
private:
static constexpr UInt32 HEADER_SIZE = 2;
diff --git a/src/Compression/CompressionCodecGorilla.cpp b/src/Compression/CompressionCodecGorilla.cpp
index 88b8c2bc3bb..a3aa5850e19 100644
--- a/src/Compression/CompressionCodecGorilla.cpp
+++ b/src/Compression/CompressionCodecGorilla.cpp
@@ -123,6 +123,7 @@ protected:
bool isCompression() const override { return true; }
bool isGenericCompression() const override { return false; }
+ bool isFloatingPointTimeSeries() const override { return true; }
private:
UInt8 data_bytes_size;
@@ -444,14 +445,19 @@ void CompressionCodecGorilla::doDecompressData(const char * source, UInt32 sourc
void registerCodecGorilla(CompressionCodecFactory & factory)
{
UInt8 method_code = static_cast(CompressionMethodByte::Gorilla);
- factory.registerCompressionCodecWithType("Gorilla", method_code,
- [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
+ auto codec_builder = [&](const ASTPtr & arguments, const IDataType * column_type) -> CompressionCodecPtr
{
if (arguments)
throw Exception("Codec Gorilla does not accept any arguments", ErrorCodes::BAD_ARGUMENTS);
+ if (column_type != nullptr)
+ if (!WhichDataType(*column_type).isFloat())
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "Gorilla codec is not applicable for {} because the data type is not float",
+ column_type->getName());
+
UInt8 data_bytes_size = column_type ? getDataBytesSize(column_type) : 0;
return std::make_shared(data_bytes_size);
- });
+ };
+ factory.registerCompressionCodecWithType("Gorilla", method_code, codec_builder);
}
}
diff --git a/src/Compression/CompressionFactoryAdditions.cpp b/src/Compression/CompressionFactoryAdditions.cpp
index 3e215076871..fd2d6a67aa8 100644
--- a/src/Compression/CompressionFactoryAdditions.cpp
+++ b/src/Compression/CompressionFactoryAdditions.cpp
@@ -59,15 +59,17 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
{
ASTPtr codecs_descriptions = std::make_shared();
- bool is_compression = false;
- bool has_none = false;
+ bool with_compressing_codec = false;
+ bool with_none_codec = false;
std::optional generic_compression_codec_pos;
- std::set encryption_codecs;
+ std::optional first_delta_codec_pos;
+ std::optional last_floating_point_time_series_codec_pos;
+ std::set encryption_codecs_pos;
bool can_substitute_codec_arguments = true;
for (size_t i = 0, size = func->arguments->children.size(); i < size; ++i)
{
- const auto & inner_codec_ast = func->arguments->children[i];
+ const ASTPtr & inner_codec_ast = func->arguments->children[i];
String codec_family_name;
ASTPtr codec_arguments;
if (const auto * family_name = inner_codec_ast->as())
@@ -83,8 +85,7 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
else
throw Exception("Unexpected AST element for compression codec", ErrorCodes::UNEXPECTED_AST_STRUCTURE);
- /// Default codec replaced with current default codec which may depend on different
- /// settings (and properties of data) in runtime.
+ /// Replace "Default" codec by configured default codec which may depend on different settings and data properties at runtime.
CompressionCodecPtr result_codec;
if (codec_family_name == DEFAULT_CODEC_NAME)
{
@@ -136,21 +137,27 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
codecs_descriptions->children.emplace_back(result_codec->getCodecDesc());
}
- is_compression |= result_codec->isCompression();
- has_none |= result_codec->isNone();
+ with_compressing_codec |= result_codec->isCompression();
+ with_none_codec |= result_codec->isNone();
if (!generic_compression_codec_pos && result_codec->isGenericCompression())
generic_compression_codec_pos = i;
if (result_codec->isEncryption())
- encryption_codecs.insert(i);
+ encryption_codecs_pos.insert(i);
+
+ if (result_codec->isDelta() && !first_delta_codec_pos.has_value())
+ first_delta_codec_pos = i;
+
+ if (result_codec->isFloatingPointTimeSeries())
+ last_floating_point_time_series_codec_pos = i;
}
String codec_description = queryToString(codecs_descriptions);
if (sanity_check)
{
- if (codecs_descriptions->children.size() > 1 && has_none)
+ if (codecs_descriptions->children.size() > 1 && with_none_codec)
throw Exception(
"It does not make sense to have codec NONE along with other compression codecs: " + codec_description
+ ". (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).",
@@ -159,7 +166,7 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
/// Allow to explicitly specify single NONE codec if user don't want any compression.
/// But applying other transformations solely without compression (e.g. Delta) does not make sense.
/// It's okay to apply encryption codecs solely without anything else.
- if (!is_compression && !has_none && encryption_codecs.size() != codecs_descriptions->children.size())
+ if (!with_compressing_codec && !with_none_codec && encryption_codecs_pos.size() != codecs_descriptions->children.size())
throw Exception(
"Compression codec " + codec_description
+ " does not compress anything."
@@ -171,8 +178,8 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
/// It does not make sense to apply any non-encryption codecs
/// after encryption one.
- if (!encryption_codecs.empty() &&
- *encryption_codecs.begin() != codecs_descriptions->children.size() - encryption_codecs.size())
+ if (!encryption_codecs_pos.empty() &&
+ *encryption_codecs_pos.begin() != codecs_descriptions->children.size() - encryption_codecs_pos.size())
throw Exception("The combination of compression codecs " + codec_description + " is meaningless,"
" because it does not make sense to apply any non-post-processing codecs after"
" post-processing ones. (Note: you can enable setting 'allow_suspicious_codecs'"
@@ -181,11 +188,18 @@ ASTPtr CompressionCodecFactory::validateCodecAndGetPreprocessedAST(
/// It does not make sense to apply any transformations after generic compression algorithm
/// So, generic compression can be only one and only at the end.
if (generic_compression_codec_pos &&
- *generic_compression_codec_pos != codecs_descriptions->children.size() - 1 - encryption_codecs.size())
+ *generic_compression_codec_pos != codecs_descriptions->children.size() - 1 - encryption_codecs_pos.size())
throw Exception("The combination of compression codecs " + codec_description + " is meaningless,"
" because it does not make sense to apply any transformations after generic compression algorithm."
" (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).", ErrorCodes::BAD_ARGUMENTS);
+ /// Floating point time series codecs usually have implicit delta compression (or something equivalent), so it does not make
+ /// sense to run delta compression manually. Another reason for blocking such combination is occasional data corruption (#45195).
+ if (first_delta_codec_pos.has_value() && last_floating_point_time_series_codec_pos.has_value()
+ && (*first_delta_codec_pos < last_floating_point_time_series_codec_pos))
+ throw Exception("The combination of compression codecs " + codec_description + " is meaningless,"
+ " because it does not make sense to apply delta transformations before floating point time series codecs."
+ " (Note: you can enable setting 'allow_suspicious_codecs' to skip this check).", ErrorCodes::BAD_ARGUMENTS);
}
/// For columns with nested types like Tuple(UInt32, UInt64) we
diff --git a/src/Compression/ICompressionCodec.h b/src/Compression/ICompressionCodec.h
index f40404a84f3..c717392bbca 100644
--- a/src/Compression/ICompressionCodec.h
+++ b/src/Compression/ICompressionCodec.h
@@ -113,6 +113,12 @@ public:
/// If it does nothing.
virtual bool isNone() const { return false; }
+ /// If the only purpose of the codec is to delta (or double-delta) the data.
+ virtual bool isDelta() const { return false; }
+
+ /// If the codec is specialized for floating point time series.
+ virtual bool isFloatingPointTimeSeries() const { return false; }
+
protected:
/// This is used for fuzz testing
friend int LLVMFuzzerTestOneInput(const uint8_t * data, size_t size);
diff --git a/src/Compression/tests/gtest_compressionCodec.cpp b/src/Compression/tests/gtest_compressionCodec.cpp
index b4c29bf9ce6..a843144c7c4 100644
--- a/src/Compression/tests/gtest_compressionCodec.cpp
+++ b/src/Compression/tests/gtest_compressionCodec.cpp
@@ -529,6 +529,13 @@ public:
TEST_P(CodecTest, TranscodingWithDataType)
{
+ /// Gorilla can only be applied to floating point columns
+ bool codec_is_gorilla = std::get<0>(GetParam()).codec_statement.find("Gorilla") != std::string::npos;
+ WhichDataType which(std::get<1>(GetParam()).data_type.get());
+ bool data_is_float = which.isFloat();
+ if (codec_is_gorilla && !data_is_float)
+ GTEST_SKIP() << "Skipping Gorilla-compressed integer column";
+
const auto codec = makeCodec(CODEC_WITH_DATA_TYPE);
testTranscoding(*codec);
}
@@ -1204,68 +1211,20 @@ auto DDperformanceTestSequence()
}
// prime numbers in ascending order with some random repetitions hit all the cases of Gorilla.
-auto PrimesWithMultiplierGenerator = [](int multiplier = 1)
-{
- return [multiplier](auto i)
- {
- static const int vals[] = {
- 2, 3, 5, 7, 11, 11, 13, 17, 19, 23, 29, 29, 31, 37, 41, 43,
- 47, 47, 53, 59, 61, 61, 67, 71, 73, 79, 83, 89, 89, 97, 101, 103,
- 107, 107, 109, 113, 113, 127, 127, 127
- };
- static const size_t count = sizeof(vals)/sizeof(vals[0]);
-
- return static_cast(vals[i % count]) * multiplier;
- };
-};
-
-template
-auto GCompatibilityTestSequence()
-{
- // Also multiply result by some factor to test large values on types that can hold those.
- return generateSeq(G(PrimesWithMultiplierGenerator(intExp10(sizeof(ValueType)))), 0, 42);
-}
-
-INSTANTIATE_TEST_SUITE_P(Gorilla,
- CodecTestCompatibility,
- ::testing::Combine(
- ::testing::Values(Codec("Gorilla")),
- ::testing::ValuesIn(std::initializer_list>{
- {
- GCompatibilityTestSequence(),
- BIN_STR("\x95\x35\x00\x00\x00\x2a\x00\x00\x00\x01\x00\x2a\x00\x00\x00\x14\xe1\xdd\x25\xe5\x7b\x29\x86\xee\x2a\x16\x5a\xc5\x0b\x23\x75\x1b\x3c\xb1\x97\x8b\x5f\xcb\x43\xd9\xc5\x48\xab\x23\xaf\x62\x93\x71\x4a\x73\x0f\xc6\x0a")
- },
- {
- GCompatibilityTestSequence(),
- BIN_STR("\x95\x35\x00\x00\x00\x2a\x00\x00\x00\x01\x00\x2a\x00\x00\x00\x14\xe1\xdd\x25\xe5\x7b\x29\x86\xee\x2a\x16\x5a\xc5\x0b\x23\x75\x1b\x3c\xb1\x97\x8b\x5f\xcb\x43\xd9\xc5\x48\xab\x23\xaf\x62\x93\x71\x4a\x73\x0f\xc6\x0a")
- },
- {
- GCompatibilityTestSequence(),
- BIN_STR("\x95\x52\x00\x00\x00\x54\x00\x00\x00\x02\x00\x2a\x00\x00\x00\xc8\x00\xdc\xfe\x66\xdb\x1f\x4e\xa7\xde\xdc\xd5\xec\x6e\xf7\x37\x3a\x23\xe7\x63\xf5\x6a\x8e\x99\x37\x34\xf9\xf8\x2e\x76\x35\x2d\x51\xbb\x3b\xc3\x6d\x13\xbf\x86\x53\x9e\x25\xe4\xaf\xaf\x63\xd5\x6a\x6e\x76\x35\x3a\x27\xd3\x0f\x91\xae\x6b\x33\x57\x6e\x64\xcc\x55\x81\xe4")
- },
- {
- GCompatibilityTestSequence(),
- BIN_STR("\x95\x52\x00\x00\x00\x54\x00\x00\x00\x02\x00\x2a\x00\x00\x00\xc8\x00\xdc\xfe\x66\xdb\x1f\x4e\xa7\xde\xdc\xd5\xec\x6e\xf7\x37\x3a\x23\xe7\x63\xf5\x6a\x8e\x99\x37\x34\xf9\xf8\x2e\x76\x35\x2d\x51\xbb\x3b\xc3\x6d\x13\xbf\x86\x53\x9e\x25\xe4\xaf\xaf\x63\xd5\x6a\x6e\x76\x35\x3a\x27\xd3\x0f\x91\xae\x6b\x33\x57\x6e\x64\xcc\x55\x81\xe4")
- },
- {
- GCompatibilityTestSequence(),
- BIN_STR("\x95\x65\x00\x00\x00\xa8\x00\x00\x00\x04\x00\x2a\x00\x00\x00\x20\x4e\x00\x00\xe4\x57\x63\xc0\xbb\x67\xbc\xce\x91\x97\x99\x15\x9e\xe3\x36\x3f\x89\x5f\x8e\xf2\xec\x8e\xd3\xbf\x75\x43\x58\xc4\x7e\xcf\x93\x43\x38\xc6\x91\x36\x1f\xe7\xb6\x11\x6f\x02\x73\x46\xef\xe0\xec\x50\xfb\x79\xcb\x9c\x14\xfa\x13\xea\x8d\x66\x43\x48\xa0\xde\x3a\xcf\xff\x26\xe0\x5f\x93\xde\x5e\x7f\x6e\x36\x5e\xe6\xb4\x66\x5d\xb0\x0e\xc4")
- },
- {
- GCompatibilityTestSequence(),
- BIN_STR("\x95\x65\x00\x00\x00\xa8\x00\x00\x00\x04\x00\x2a\x00\x00\x00\x20\x4e\x00\x00\xe4\x57\x63\xc0\xbb\x67\xbc\xce\x91\x97\x99\x15\x9e\xe3\x36\x3f\x89\x5f\x8e\xf2\xec\x8e\xd3\xbf\x75\x43\x58\xc4\x7e\xcf\x93\x43\x38\xc6\x91\x36\x1f\xe7\xb6\x11\x6f\x02\x73\x46\xef\xe0\xec\x50\xfb\x79\xcb\x9c\x14\xfa\x13\xea\x8d\x66\x43\x48\xa0\xde\x3a\xcf\xff\x26\xe0\x5f\x93\xde\x5e\x7f\x6e\x36\x5e\xe6\xb4\x66\x5d\xb0\x0e\xc4")
- },
- {
- GCompatibilityTestSequence(),
- BIN_STR("\x95\x91\x00\x00\x00\x50\x01\x00\x00\x08\x00\x2a\x00\x00\x00\x00\xc2\xeb\x0b\x00\x00\x00\x00\xe3\x2b\xa0\xa6\x19\x85\x98\xdc\x45\x74\x74\x43\xc2\x57\x41\x4c\x6e\x42\x79\xd9\x8f\x88\xa5\x05\xf3\xf1\x94\xa3\x62\x1e\x02\xdf\x05\x10\xf1\x15\x97\x35\x2a\x50\x71\x0f\x09\x6c\x89\xf7\x65\x1d\x11\xb7\xcc\x7d\x0b\x70\xc1\x86\x88\x48\x47\x87\xb6\x32\x26\xa7\x86\x87\x88\xd3\x93\x3d\xfc\x28\x68\x85\x05\x0b\x13\xc6\x5f\xd4\x70\xe1\x5e\x76\xf1\x9f\xf3\x33\x2a\x14\x14\x5e\x40\xc1\x5c\x28\x3f\xec\x43\x03\x05\x11\x91\xe8\xeb\x8e\x0a\x0e\x27\x21\x55\xcb\x39\xbc\x6a\xff\x11\x5d\x81\xa0\xa6\x10")
- },
- {
- GCompatibilityTestSequence(),
- BIN_STR("\x95\x91\x00\x00\x00\x50\x01\x00\x00\x08\x00\x2a\x00\x00\x00\x00\xc2\xeb\x0b\x00\x00\x00\x00\xe3\x2b\xa0\xa6\x19\x85\x98\xdc\x45\x74\x74\x43\xc2\x57\x41\x4c\x6e\x42\x79\xd9\x8f\x88\xa5\x05\xf3\xf1\x94\xa3\x62\x1e\x02\xdf\x05\x10\xf1\x15\x97\x35\x2a\x50\x71\x0f\x09\x6c\x89\xf7\x65\x1d\x11\xb7\xcc\x7d\x0b\x70\xc1\x86\x88\x48\x47\x87\xb6\x32\x26\xa7\x86\x87\x88\xd3\x93\x3d\xfc\x28\x68\x85\x05\x0b\x13\xc6\x5f\xd4\x70\xe1\x5e\x76\xf1\x9f\xf3\x33\x2a\x14\x14\x5e\x40\xc1\x5c\x28\x3f\xec\x43\x03\x05\x11\x91\xe8\xeb\x8e\x0a\x0e\x27\x21\x55\xcb\x39\xbc\x6a\xff\x11\x5d\x81\xa0\xa6\x10")
- },
- })
- )
-);
+// auto PrimesWithMultiplierGenerator = [](int multiplier = 1)
+// {
+// return [multiplier](auto i)
+// {
+// static const int vals[] = {
+// 2, 3, 5, 7, 11, 11, 13, 17, 19, 23, 29, 29, 31, 37, 41, 43,
+// 47, 47, 53, 59, 61, 61, 67, 71, 73, 79, 83, 89, 89, 97, 101, 103,
+// 107, 107, 109, 113, 113, 127, 127, 127
+// };
+// static const size_t count = sizeof(vals)/sizeof(vals[0]);
+//
+// return static_cast(vals[i % count]) * multiplier;
+// };
+// };
// These 'tests' try to measure performance of encoding and decoding and hence only make sense to be run locally,
// also they require pretty big data to run against and generating this data slows down startup of unit test process.
diff --git a/src/Databases/DDLDependencyVisitor.cpp b/src/Databases/DDLDependencyVisitor.cpp
index f0137e5bd60..ffe84f6fb77 100644
--- a/src/Databases/DDLDependencyVisitor.cpp
+++ b/src/Databases/DDLDependencyVisitor.cpp
@@ -2,6 +2,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -175,7 +176,7 @@ namespace
/// Finds dependencies of a function.
void visitFunction(const ASTFunction & function)
{
- if (function.name == "joinGet" || function.name == "dictHas" || function.name == "dictIsIn" || function.name.starts_with("dictGet"))
+ if (functionIsJoinGet(function.name) || functionIsDictGet(function.name))
{
/// dictGet('dict_name', attr_names, id_expr)
/// dictHas('dict_name', id_expr)
diff --git a/src/Databases/DDLLoadingDependencyVisitor.cpp b/src/Databases/DDLLoadingDependencyVisitor.cpp
index 8536d1c890d..3a61f821629 100644
--- a/src/Databases/DDLLoadingDependencyVisitor.cpp
+++ b/src/Databases/DDLLoadingDependencyVisitor.cpp
@@ -1,6 +1,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -52,23 +53,41 @@ bool DDLMatcherBase::needChildVisit(const ASTPtr & node, const ASTPtr & child)
return true;
}
-ssize_t DDLMatcherBase::getPositionOfTableNameArgument(const ASTFunction & function)
+ssize_t DDLMatcherBase::getPositionOfTableNameArgumentToEvaluate(const ASTFunction & function)
{
- if (function.name == "joinGet" ||
- function.name == "dictHas" ||
- function.name == "dictIsIn" ||
- function.name.starts_with("dictGet"))
+ if (functionIsJoinGet(function.name) || functionIsDictGet(function.name))
return 0;
- if (Poco::toLower(function.name) == "in")
+ return -1;
+}
+
+ssize_t DDLMatcherBase::getPositionOfTableNameArgumentToVisit(const ASTFunction & function)
+{
+ ssize_t maybe_res = getPositionOfTableNameArgumentToEvaluate(function);
+ if (0 <= maybe_res)
+ return maybe_res;
+
+ if (functionIsInOrGlobalInOperator(function.name))
+ {
+ if (function.children.empty())
+ return -1;
+
+ const auto * args = function.children[0]->as();
+ if (!args || args->children.size() != 2)
+ return -1;
+
+ if (args->children[1]->as())
+ return -1;
+
return 1;
+ }
return -1;
}
void DDLLoadingDependencyVisitor::visit(const ASTFunction & function, Data & data)
{
- ssize_t table_name_arg_idx = getPositionOfTableNameArgument(function);
+ ssize_t table_name_arg_idx = getPositionOfTableNameArgumentToVisit(function);
if (table_name_arg_idx < 0)
return;
extractTableNameFromArgument(function, data, table_name_arg_idx);
diff --git a/src/Databases/DDLLoadingDependencyVisitor.h b/src/Databases/DDLLoadingDependencyVisitor.h
index f987e885266..f173517f852 100644
--- a/src/Databases/DDLLoadingDependencyVisitor.h
+++ b/src/Databases/DDLLoadingDependencyVisitor.h
@@ -23,7 +23,8 @@ class DDLMatcherBase
{
public:
static bool needChildVisit(const ASTPtr & node, const ASTPtr & child);
- static ssize_t getPositionOfTableNameArgument(const ASTFunction & function);
+ static ssize_t getPositionOfTableNameArgumentToVisit(const ASTFunction & function);
+ static ssize_t getPositionOfTableNameArgumentToEvaluate(const ASTFunction & function);
};
/// Visits ASTCreateQuery and extracts the names of all tables which should be loaded before a specified table.
diff --git a/src/Databases/NormalizeAndEvaluateConstantsVisitor.cpp b/src/Databases/NormalizeAndEvaluateConstantsVisitor.cpp
index d9e494e7c9a..c4d1e8bda8c 100644
--- a/src/Databases/NormalizeAndEvaluateConstantsVisitor.cpp
+++ b/src/Databases/NormalizeAndEvaluateConstantsVisitor.cpp
@@ -23,7 +23,7 @@ void NormalizeAndEvaluateConstants::visit(const ASTFunction & function, Data & d
{
/// Replace expressions like "dictGet(currentDatabase() || '.dict', 'value', toUInt32(1))"
/// with "dictGet('db_name.dict', 'value', toUInt32(1))"
- ssize_t table_name_arg_idx = getPositionOfTableNameArgument(function);
+ ssize_t table_name_arg_idx = getPositionOfTableNameArgumentToEvaluate(function);
if (table_name_arg_idx < 0)
return;
diff --git a/src/Functions/MatchImpl.h b/src/Functions/MatchImpl.h
index edb0df3ae34..3622db234b5 100644
--- a/src/Functions/MatchImpl.h
+++ b/src/Functions/MatchImpl.h
@@ -118,6 +118,16 @@ struct MatchImpl
if (haystack_offsets.empty())
return;
+ /// Shortcut for the silly but practical case that the pattern matches everything/nothing independently of the haystack:
+ /// - col [not] [i]like '%' / '%%'
+ /// - match(col, '.*')
+ if ((is_like && (needle == "%" or needle == "%%")) || (!is_like && (needle == ".*" || needle == ".*?")))
+ {
+ for (auto & x : res)
+ x = !negate;
+ return;
+ }
+
/// Special case that the [I]LIKE expression reduces to finding a substring in a string
String strstr_pattern;
if (is_like && impl::likePatternIsSubstring(needle, strstr_pattern))
@@ -267,6 +277,16 @@ struct MatchImpl
if (haystack.empty())
return;
+ /// Shortcut for the silly but practical case that the pattern matches everything/nothing independently of the haystack:
+ /// - col [not] [i]like '%' / '%%'
+ /// - match(col, '.*')
+ if ((is_like && (needle == "%" or needle == "%%")) || (!is_like && (needle == ".*" || needle == ".*?")))
+ {
+ for (auto & x : res)
+ x = !negate;
+ return;
+ }
+
/// Special case that the [I]LIKE expression reduces to finding a substring in a string
String strstr_pattern;
if (is_like && impl::likePatternIsSubstring(needle, strstr_pattern))
diff --git a/src/Interpreters/DatabaseAndTableWithAlias.cpp b/src/Interpreters/DatabaseAndTableWithAlias.cpp
index 70825ea8292..7fb581c1b4d 100644
--- a/src/Interpreters/DatabaseAndTableWithAlias.cpp
+++ b/src/Interpreters/DatabaseAndTableWithAlias.cpp
@@ -28,13 +28,29 @@ DatabaseAndTableWithAlias::DatabaseAndTableWithAlias(const ASTTableIdentifier &
database = current_database;
}
+DatabaseAndTableWithAlias::DatabaseAndTableWithAlias(const ASTIdentifier & identifier, const String & current_database)
+{
+ alias = identifier.tryGetAlias();
+
+ if (identifier.name_parts.size() == 2)
+ std::tie(database, table) = std::tie(identifier.name_parts[0], identifier.name_parts[1]);
+ else if (identifier.name_parts.size() == 1)
+ table = identifier.name_parts[0];
+ else
+ throw Exception("Logical error: invalid identifier", ErrorCodes::LOGICAL_ERROR);
+
+ if (database.empty())
+ database = current_database;
+}
+
DatabaseAndTableWithAlias::DatabaseAndTableWithAlias(const ASTPtr & node, const String & current_database)
{
- const auto * identifier = node->as();
- if (!identifier)
- throw Exception("Logical error: table identifier expected", ErrorCodes::LOGICAL_ERROR);
-
- *this = DatabaseAndTableWithAlias(*identifier, current_database);
+ if (const auto * table_identifier = node->as())
+ *this = DatabaseAndTableWithAlias(*table_identifier, current_database);
+ else if (const auto * identifier = node->as())
+ *this = DatabaseAndTableWithAlias(*identifier, current_database);
+ else
+ throw Exception("Logical error: identifier or table identifier expected", ErrorCodes::LOGICAL_ERROR);
}
DatabaseAndTableWithAlias::DatabaseAndTableWithAlias(const ASTTableExpression & table_expression, const String & current_database)
diff --git a/src/Interpreters/DatabaseAndTableWithAlias.h b/src/Interpreters/DatabaseAndTableWithAlias.h
index 237c56d3ce3..58327ff1d81 100644
--- a/src/Interpreters/DatabaseAndTableWithAlias.h
+++ b/src/Interpreters/DatabaseAndTableWithAlias.h
@@ -14,6 +14,7 @@ namespace DB
{
class ASTSelectQuery;
+class ASTIdentifier;
class ASTTableIdentifier;
struct ASTTableExpression;
@@ -28,6 +29,7 @@ struct DatabaseAndTableWithAlias
DatabaseAndTableWithAlias() = default;
explicit DatabaseAndTableWithAlias(const ASTPtr & identifier_node, const String & current_database = "");
+ explicit DatabaseAndTableWithAlias(const ASTIdentifier & identifier, const String & current_database = "");
explicit DatabaseAndTableWithAlias(const ASTTableIdentifier & identifier, const String & current_database = "");
explicit DatabaseAndTableWithAlias(const ASTTableExpression & table_expression, const String & current_database = "");
diff --git a/src/Interpreters/GatherFunctionQuantileVisitor.cpp b/src/Interpreters/GatherFunctionQuantileVisitor.cpp
index 2abd7af1455..805fcfec181 100644
--- a/src/Interpreters/GatherFunctionQuantileVisitor.cpp
+++ b/src/Interpreters/GatherFunctionQuantileVisitor.cpp
@@ -25,6 +25,7 @@ static const std::unordered_map quantile_fuse_name_mapping = {
{NameQuantileExactInclusive::name, NameQuantilesExactInclusive::name},
{NameQuantileExactLow::name, NameQuantilesExactLow::name},
{NameQuantileExactWeighted::name, NameQuantilesExactWeighted::name},
+ {NameQuantileInterpolatedWeighted::name, NameQuantilesInterpolatedWeighted::name},
{NameQuantileTDigest::name, NameQuantilesTDigest::name},
{NameQuantileTDigestWeighted::name, NameQuantilesTDigestWeighted::name},
{NameQuantileTiming::name, NameQuantilesTiming::name},
@@ -61,9 +62,11 @@ void GatherFunctionQuantileData::FuseQuantileAggregatesData::addFuncNode(ASTPtr
const auto & arguments = func->arguments->children;
+
bool need_two_args = func->name == NameQuantileDeterministic::name || func->name == NameQuantileExactWeighted::name
- || func->name == NameQuantileTimingWeighted::name || func->name == NameQuantileTDigestWeighted::name
- || func->name == NameQuantileBFloat16Weighted::name;
+ || func->name == NameQuantileInterpolatedWeighted::name || func->name == NameQuantileTimingWeighted::name
+ || func->name == NameQuantileTDigestWeighted::name || func->name == NameQuantileBFloat16Weighted::name;
+
if (arguments.size() != (need_two_args ? 2 : 1))
return;
diff --git a/src/Interpreters/InterpreterExplainQuery.cpp b/src/Interpreters/InterpreterExplainQuery.cpp
index 2e4fd50cd01..17a6b695088 100644
--- a/src/Interpreters/InterpreterExplainQuery.cpp
+++ b/src/Interpreters/InterpreterExplainQuery.cpp
@@ -288,6 +288,20 @@ struct ExplainSettings : public Settings
}
};
+struct QuerySyntaxSettings
+{
+ bool oneline = false;
+
+ constexpr static char name[] = "SYNTAX";
+
+ std::unordered_map> boolean_settings =
+ {
+ {"oneline", oneline},
+ };
+
+ std::unordered_map> integer_settings;
+};
+
template
ExplainSettings checkAndGetSettings(const ASTPtr & ast_settings)
{
@@ -362,13 +376,12 @@ QueryPipeline InterpreterExplainQuery::executeImpl()
}
case ASTExplainQuery::AnalyzedSyntax:
{
- if (ast.getSettings())
- throw Exception("Settings are not supported for EXPLAIN SYNTAX query.", ErrorCodes::UNKNOWN_SETTING);
+ auto settings = checkAndGetSettings(ast.getSettings());
ExplainAnalyzedSyntaxVisitor::Data data(getContext());
ExplainAnalyzedSyntaxVisitor(data).visit(query);
- ast.getExplainedQuery()->format(IAST::FormatSettings(buf, false));
+ ast.getExplainedQuery()->format(IAST::FormatSettings(buf, settings.oneline));
break;
}
case ASTExplainQuery::QueryTree:
diff --git a/src/Interpreters/JoinToSubqueryTransformVisitor.cpp b/src/Interpreters/JoinToSubqueryTransformVisitor.cpp
index a57b8d2354b..5500c274c23 100644
--- a/src/Interpreters/JoinToSubqueryTransformVisitor.cpp
+++ b/src/Interpreters/JoinToSubqueryTransformVisitor.cpp
@@ -49,7 +49,8 @@ ASTPtr makeSubqueryTemplate()
ASTPtr makeSubqueryQualifiedAsterisk()
{
auto asterisk = std::make_shared();
- asterisk->children.emplace_back(std::make_shared("--.s"));
+ asterisk->qualifier = std::make_shared("--.s");
+ asterisk->children.push_back(asterisk->qualifier);
return asterisk;
}
@@ -153,24 +154,34 @@ private:
for (auto & table_name : data.tables_order)
data.addTableColumns(table_name, columns);
- for (const auto & transformer : asterisk->children)
- IASTColumnsTransformer::transform(transformer, columns);
+ if (asterisk->transformers)
+ {
+ for (const auto & transformer : asterisk->transformers->children)
+ IASTColumnsTransformer::transform(transformer, columns);
+ }
}
else if (const auto * qualified_asterisk = child->as())
{
has_asterisks = true;
- auto & identifier = child->children[0]->as();
+ if (!qualified_asterisk->qualifier)
+ throw Exception("Logical error: qualified asterisk must have a qualifier", ErrorCodes::LOGICAL_ERROR);
+
+ auto & identifier = qualified_asterisk->qualifier->as();
data.addTableColumns(identifier.name(), columns);
- // QualifiedAsterisk's transformers start to appear at child 1
- for (const auto * it = qualified_asterisk->children.begin() + 1; it != qualified_asterisk->children.end(); ++it)
+ if (qualified_asterisk->transformers)
{
- if (it->get()->as() || it->get()->as() || it->get()->as())
- IASTColumnsTransformer::transform(*it, columns);
- else
- throw Exception("Logical error: qualified asterisk must only have children of IASTColumnsTransformer type", ErrorCodes::LOGICAL_ERROR);
+ for (const auto & transformer : qualified_asterisk->transformers->children)
+ {
+ if (transformer->as() ||
+ transformer->as() ||
+ transformer->as())
+ IASTColumnsTransformer::transform(transformer, columns);
+ else
+ throw Exception("Logical error: qualified asterisk must only have children of IASTColumnsTransformer type", ErrorCodes::LOGICAL_ERROR);
+ }
}
}
else if (const auto * columns_list_matcher = child->as())
@@ -180,8 +191,11 @@ private:
for (const auto & ident : columns_list_matcher->column_list->children)
columns.emplace_back(ident->clone());
- for (const auto & transformer : columns_list_matcher->children)
- IASTColumnsTransformer::transform(transformer, columns);
+ if (columns_list_matcher->transformers)
+ {
+ for (const auto & transformer : columns_list_matcher->transformers->children)
+ IASTColumnsTransformer::transform(transformer, columns);
+ }
}
else if (const auto * columns_regexp_matcher = child->as())
{
@@ -193,8 +207,11 @@ private:
columns,
[&](const String & column_name) { return columns_regexp_matcher->isColumnMatching(column_name); });
- for (const auto & transformer : columns_regexp_matcher->children)
- IASTColumnsTransformer::transform(transformer, columns);
+ if (columns_regexp_matcher->transformers)
+ {
+ for (const auto & transformer : columns_regexp_matcher->transformers->children)
+ IASTColumnsTransformer::transform(transformer, columns);
+ }
}
else
data.new_select_expression_list->children.push_back(child);
@@ -425,6 +442,7 @@ private:
{
if (data.expression_list->children.empty())
data.expression_list->children.emplace_back(std::make_shared());
+
select.setExpression(ASTSelectQuery::Expression::SELECT, std::move(data.expression_list));
}
data.done = true;
diff --git a/src/Interpreters/JoinedTables.cpp b/src/Interpreters/JoinedTables.cpp
index b88bb5d1caf..1d8676cfc57 100644
--- a/src/Interpreters/JoinedTables.cpp
+++ b/src/Interpreters/JoinedTables.cpp
@@ -154,7 +154,7 @@ private:
static void visit(const ASTQualifiedAsterisk & node, const ASTPtr &, Data & data)
{
- auto & identifier = node.children[0]->as();
+ auto & identifier = node.qualifier->as();
bool rewritten = false;
for (const auto & table : data)
{
diff --git a/src/Interpreters/TranslateQualifiedNamesVisitor.cpp b/src/Interpreters/TranslateQualifiedNamesVisitor.cpp
index 9c3a681fd32..36691885459 100644
--- a/src/Interpreters/TranslateQualifiedNamesVisitor.cpp
+++ b/src/Interpreters/TranslateQualifiedNamesVisitor.cpp
@@ -156,21 +156,19 @@ void TranslateQualifiedNamesMatcher::visit(ASTFunction & node, const ASTPtr &, D
func_arguments->children.clear();
}
-void TranslateQualifiedNamesMatcher::visit(const ASTQualifiedAsterisk &, const ASTPtr & ast, Data & data)
+void TranslateQualifiedNamesMatcher::visit(const ASTQualifiedAsterisk & node, const ASTPtr &, Data & data)
{
- if (ast->children.empty())
- throw Exception("Logical error: qualified asterisk must have children", ErrorCodes::LOGICAL_ERROR);
-
- auto & ident = ast->children[0];
+ if (!node.qualifier)
+ throw Exception("Logical error: qualified asterisk must have a qualifier", ErrorCodes::LOGICAL_ERROR);
/// @note it could contain table alias as table name.
- DatabaseAndTableWithAlias db_and_table(ident);
+ DatabaseAndTableWithAlias db_and_table(node.qualifier);
for (const auto & known_table : data.tables)
if (db_and_table.satisfies(known_table.table, true))
return;
- throw Exception("Unknown qualified identifier: " + ident->getAliasOrColumnName(), ErrorCodes::UNKNOWN_IDENTIFIER);
+ throw Exception("Unknown qualified identifier: " + node.qualifier->getAliasOrColumnName(), ErrorCodes::UNKNOWN_IDENTIFIER);
}
void TranslateQualifiedNamesMatcher::visit(ASTTableJoin & join, const ASTPtr & , Data & data)
@@ -266,16 +264,22 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
first_table = false;
}
- for (const auto & transformer : asterisk->children)
- IASTColumnsTransformer::transform(transformer, columns);
+ if (asterisk->transformers)
+ {
+ for (const auto & transformer : asterisk->transformers->children)
+ IASTColumnsTransformer::transform(transformer, columns);
+ }
}
else if (auto * asterisk_column_list = child->as())
{
for (const auto & ident : asterisk_column_list->column_list->children)
columns.emplace_back(ident->clone());
- for (const auto & transformer : asterisk_column_list->children)
- IASTColumnsTransformer::transform(transformer, columns);
+ if (asterisk_column_list->transformers)
+ {
+ for (const auto & transformer : asterisk_column_list->transformers->children)
+ IASTColumnsTransformer::transform(transformer, columns);
+ }
}
else if (const auto * asterisk_regexp_pattern = child->as())
{
@@ -292,12 +296,15 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
first_table = false;
}
- for (const auto & transformer : asterisk_regexp_pattern->children)
- IASTColumnsTransformer::transform(transformer, columns);
+ if (asterisk_regexp_pattern->transformers)
+ {
+ for (const auto & transformer : asterisk_regexp_pattern->transformers->children)
+ IASTColumnsTransformer::transform(transformer, columns);
+ }
}
else if (const auto * qualified_asterisk = child->as())
{
- DatabaseAndTableWithAlias ident_db_and_name(qualified_asterisk->children[0]);
+ DatabaseAndTableWithAlias ident_db_and_name(qualified_asterisk->qualifier);
for (const auto & table : tables_with_columns)
{
@@ -309,10 +316,10 @@ void TranslateQualifiedNamesMatcher::visit(ASTExpressionList & node, const ASTPt
}
}
- // QualifiedAsterisk's transformers start to appear at child 1
- for (const auto * it = qualified_asterisk->children.begin() + 1; it != qualified_asterisk->children.end(); ++it)
+ if (qualified_asterisk->transformers)
{
- IASTColumnsTransformer::transform(*it, columns);
+ for (const auto & transformer : qualified_asterisk->transformers->children)
+ IASTColumnsTransformer::transform(transformer, columns);
}
}
else
diff --git a/src/Parsers/ASTAsterisk.cpp b/src/Parsers/ASTAsterisk.cpp
index e2f45d04fa4..1ffbb85da7c 100644
--- a/src/Parsers/ASTAsterisk.cpp
+++ b/src/Parsers/ASTAsterisk.cpp
@@ -8,21 +8,37 @@ namespace DB
ASTPtr ASTAsterisk::clone() const
{
auto clone = std::make_shared(*this);
- clone->cloneChildren();
+
+ if (expression) { clone->expression = expression->clone(); clone->children.push_back(clone->expression); }
+ if (transformers) { clone->transformers = transformers->clone(); clone->children.push_back(clone->transformers); }
+
return clone;
}
-void ASTAsterisk::appendColumnName(WriteBuffer & ostr) const { ostr.write('*'); }
+void ASTAsterisk::appendColumnName(WriteBuffer & ostr) const
+{
+ if (expression)
+ {
+ expression->appendColumnName(ostr);
+ writeCString(".", ostr);
+ }
+
+ ostr.write('*');
+}
void ASTAsterisk::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
+ if (expression)
+ {
+ expression->formatImpl(settings, state, frame);
+ settings.ostr << ".";
+ }
+
settings.ostr << "*";
- /// Format column transformers
- for (const auto & child : children)
+ if (transformers)
{
- settings.ostr << ' ';
- child->formatImpl(settings, state, frame);
+ transformers->formatImpl(settings, state, frame);
}
}
diff --git a/src/Parsers/ASTAsterisk.h b/src/Parsers/ASTAsterisk.h
index 027758ba48c..840b7996536 100644
--- a/src/Parsers/ASTAsterisk.h
+++ b/src/Parsers/ASTAsterisk.h
@@ -16,6 +16,8 @@ public:
ASTPtr clone() const override;
void appendColumnName(WriteBuffer & ostr) const override;
+ ASTPtr expression;
+ ASTPtr transformers;
protected:
void formatImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const override;
};
diff --git a/src/Parsers/ASTColumnsMatcher.cpp b/src/Parsers/ASTColumnsMatcher.cpp
index 124206043cf..d301394cc54 100644
--- a/src/Parsers/ASTColumnsMatcher.cpp
+++ b/src/Parsers/ASTColumnsMatcher.cpp
@@ -18,12 +18,20 @@ namespace ErrorCodes
ASTPtr ASTColumnsRegexpMatcher::clone() const
{
auto clone = std::make_shared(*this);
- clone->cloneChildren();
+
+ if (expression) { clone->expression = expression->clone(); clone->children.push_back(clone->expression); }
+ if (transformers) { clone->transformers = transformers->clone(); clone->children.push_back(clone->transformers); }
+
return clone;
}
void ASTColumnsRegexpMatcher::appendColumnName(WriteBuffer & ostr) const
{
+ if (expression)
+ {
+ expression->appendColumnName(ostr);
+ writeCString(".", ostr);
+ }
writeCString("COLUMNS(", ostr);
writeQuotedString(original_pattern, ostr);
writeChar(')', ostr);
@@ -38,15 +46,21 @@ void ASTColumnsRegexpMatcher::updateTreeHashImpl(SipHash & hash_state) const
void ASTColumnsRegexpMatcher::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
- settings.ostr << (settings.hilite ? hilite_keyword : "") << "COLUMNS" << (settings.hilite ? hilite_none : "") << "(";
+ settings.ostr << (settings.hilite ? hilite_keyword : "");
+
+ if (expression)
+ {
+ expression->formatImpl(settings, state, frame);
+ settings.ostr << ".";
+ }
+
+ settings.ostr << "COLUMNS" << (settings.hilite ? hilite_none : "") << "(";
settings.ostr << quoteString(original_pattern);
settings.ostr << ")";
- /// Format column transformers
- for (const auto & child : children)
+ if (transformers)
{
- settings.ostr << ' ';
- child->formatImpl(settings, state, frame);
+ transformers->formatImpl(settings, state, frame);
}
}
@@ -60,6 +74,11 @@ void ASTColumnsRegexpMatcher::setPattern(String pattern)
DB::ErrorCodes::CANNOT_COMPILE_REGEXP);
}
+const String & ASTColumnsRegexpMatcher::getPattern() const
+{
+ return original_pattern;
+}
+
const std::shared_ptr & ASTColumnsRegexpMatcher::getMatcher() const
{
return column_matcher;
@@ -73,19 +92,23 @@ bool ASTColumnsRegexpMatcher::isColumnMatching(const String & column_name) const
ASTPtr ASTColumnsListMatcher::clone() const
{
auto clone = std::make_shared(*this);
- clone->column_list = column_list->clone();
- clone->cloneChildren();
- return clone;
-}
-void ASTColumnsListMatcher::updateTreeHashImpl(SipHash & hash_state) const
-{
- column_list->updateTreeHash(hash_state);
- IAST::updateTreeHashImpl(hash_state);
+ if (expression) { clone->expression = expression->clone(); clone->children.push_back(clone->expression); }
+ if (transformers) { clone->transformers = transformers->clone(); clone->children.push_back(clone->transformers); }
+
+ clone->column_list = column_list->clone();
+ clone->children.push_back(clone->column_list);
+
+ return clone;
}
void ASTColumnsListMatcher::appendColumnName(WriteBuffer & ostr) const
{
+ if (expression)
+ {
+ expression->appendColumnName(ostr);
+ writeCString(".", ostr);
+ }
writeCString("COLUMNS(", ostr);
for (auto * it = column_list->children.begin(); it != column_list->children.end(); ++it)
{
@@ -99,7 +122,15 @@ void ASTColumnsListMatcher::appendColumnName(WriteBuffer & ostr) const
void ASTColumnsListMatcher::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
- settings.ostr << (settings.hilite ? hilite_keyword : "") << "COLUMNS" << (settings.hilite ? hilite_none : "") << "(";
+ settings.ostr << (settings.hilite ? hilite_keyword : "");
+
+ if (expression)
+ {
+ expression->formatImpl(settings, state, frame);
+ settings.ostr << ".";
+ }
+
+ settings.ostr << "COLUMNS" << (settings.hilite ? hilite_none : "") << "(";
for (ASTs::const_iterator it = column_list->children.begin(); it != column_list->children.end(); ++it)
{
@@ -111,33 +142,39 @@ void ASTColumnsListMatcher::formatImpl(const FormatSettings & settings, FormatSt
}
settings.ostr << ")";
- /// Format column transformers
- for (const auto & child : children)
+ if (transformers)
{
- settings.ostr << ' ';
- child->formatImpl(settings, state, frame);
+ transformers->formatImpl(settings, state, frame);
}
}
ASTPtr ASTQualifiedColumnsRegexpMatcher::clone() const
{
auto clone = std::make_shared(*this);
- clone->cloneChildren();
+
+ if (transformers) { clone->transformers = transformers->clone(); clone->children.push_back(clone->transformers); }
+
+ clone->qualifier = qualifier->clone();
+ clone->children.push_back(clone->qualifier);
+
return clone;
}
void ASTQualifiedColumnsRegexpMatcher::appendColumnName(WriteBuffer & ostr) const
{
- const auto & qualifier = children.at(0);
qualifier->appendColumnName(ostr);
writeCString(".COLUMNS(", ostr);
writeQuotedString(original_pattern, ostr);
writeChar(')', ostr);
}
-void ASTQualifiedColumnsRegexpMatcher::setPattern(String pattern)
+void ASTQualifiedColumnsRegexpMatcher::setPattern(String pattern, bool set_matcher)
{
original_pattern = std::move(pattern);
+
+ if (!set_matcher)
+ return;
+
column_matcher = std::make_shared(original_pattern, RE2::Quiet);
if (!column_matcher->ok())
throw DB::Exception(
@@ -166,35 +203,35 @@ void ASTQualifiedColumnsRegexpMatcher::formatImpl(const FormatSettings & setting
{
settings.ostr << (settings.hilite ? hilite_keyword : "");
- const auto & qualifier = children.at(0);
qualifier->formatImpl(settings, state, frame);
settings.ostr << ".COLUMNS" << (settings.hilite ? hilite_none : "") << "(";
settings.ostr << quoteString(original_pattern);
settings.ostr << ")";
- /// Format column transformers
- size_t children_size = children.size();
-
- for (size_t i = 1; i < children_size; ++i)
+ if (transformers)
{
- const auto & child = children[i];
- settings.ostr << ' ';
- child->formatImpl(settings, state, frame);
+ transformers->formatImpl(settings, state, frame);
}
}
ASTPtr ASTQualifiedColumnsListMatcher::clone() const
{
auto clone = std::make_shared(*this);
+
+ if (transformers) { clone->transformers = transformers->clone(); clone->children.push_back(clone->transformers); }
+
+ clone->qualifier = qualifier->clone();
clone->column_list = column_list->clone();
- clone->cloneChildren();
+
+ clone->children.push_back(clone->qualifier);
+ clone->children.push_back(clone->column_list);
+
return clone;
}
void ASTQualifiedColumnsListMatcher::appendColumnName(WriteBuffer & ostr) const
{
- const auto & qualifier = children.at(0);
qualifier->appendColumnName(ostr);
writeCString(".COLUMNS(", ostr);
@@ -208,19 +245,10 @@ void ASTQualifiedColumnsListMatcher::appendColumnName(WriteBuffer & ostr) const
writeChar(')', ostr);
}
-void ASTQualifiedColumnsListMatcher::updateTreeHashImpl(SipHash & hash_state) const
-{
- column_list->updateTreeHash(hash_state);
- IAST::updateTreeHashImpl(hash_state);
-}
-
void ASTQualifiedColumnsListMatcher::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
settings.ostr << (settings.hilite ? hilite_keyword : "");
-
- const auto & qualifier = children.at(0);
qualifier->formatImpl(settings, state, frame);
-
settings.ostr << ".COLUMNS" << (settings.hilite ? hilite_none : "") << "(";
for (ASTs::const_iterator it = column_list->children.begin(); it != column_list->children.end(); ++it)
@@ -232,14 +260,9 @@ void ASTQualifiedColumnsListMatcher::formatImpl(const FormatSettings & settings,
}
settings.ostr << ")";
- /// Format column transformers
- size_t children_size = children.size();
-
- for (size_t i = 1; i < children_size; ++i)
+ if (transformers)
{
- const auto & child = children[i];
- settings.ostr << ' ';
- child->formatImpl(settings, state, frame);
+ transformers->formatImpl(settings, state, frame);
}
}
diff --git a/src/Parsers/ASTColumnsMatcher.h b/src/Parsers/ASTColumnsMatcher.h
index 7ce246608b9..f31a8bd9a22 100644
--- a/src/Parsers/ASTColumnsMatcher.h
+++ b/src/Parsers/ASTColumnsMatcher.h
@@ -24,10 +24,13 @@ public:
void appendColumnName(WriteBuffer & ostr) const override;
void setPattern(String pattern);
+ const String & getPattern() const;
const std::shared_ptr & getMatcher() const;
bool isColumnMatching(const String & column_name) const;
void updateTreeHashImpl(SipHash & hash_state) const override;
+ ASTPtr expression;
+ ASTPtr transformers;
protected:
void formatImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const override;
@@ -43,9 +46,10 @@ public:
String getID(char) const override { return "ColumnsListMatcher"; }
ASTPtr clone() const override;
void appendColumnName(WriteBuffer & ostr) const override;
- void updateTreeHashImpl(SipHash & hash_state) const override;
+ ASTPtr expression;
ASTPtr column_list;
+ ASTPtr transformers;
protected:
void formatImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const override;
};
@@ -59,10 +63,12 @@ public:
void appendColumnName(WriteBuffer & ostr) const override;
const std::shared_ptr & getMatcher() const;
- void setPattern(String pattern);
+ void setPattern(String pattern, bool set_matcher = true);
void setMatcher(std::shared_ptr matcher);
void updateTreeHashImpl(SipHash & hash_state) const override;
+ ASTPtr qualifier;
+ ASTPtr transformers;
protected:
void formatImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const override;
@@ -78,9 +84,10 @@ public:
String getID(char) const override { return "QualifiedColumnsListMatcher"; }
ASTPtr clone() const override;
void appendColumnName(WriteBuffer & ostr) const override;
- void updateTreeHashImpl(SipHash & hash_state) const override;
+ ASTPtr qualifier;
ASTPtr column_list;
+ ASTPtr transformers;
protected:
void formatImpl(const FormatSettings & settings, FormatState &, FormatStateStacked) const override;
};
diff --git a/src/Parsers/ASTColumnsTransformers.cpp b/src/Parsers/ASTColumnsTransformers.cpp
index 16752fa115e..f3bbeb6167b 100644
--- a/src/Parsers/ASTColumnsTransformers.cpp
+++ b/src/Parsers/ASTColumnsTransformers.cpp
@@ -19,6 +19,15 @@ namespace ErrorCodes
extern const int CANNOT_COMPILE_REGEXP;
}
+void ASTColumnsTransformerList::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
+{
+ for (const auto & child : children)
+ {
+ settings.ostr << ' ';
+ child->formatImpl(settings, state, frame);
+ }
+}
+
void IASTColumnsTransformer::transform(const ASTPtr & transformer, ASTs & nodes)
{
if (const auto * apply = transformer->as())
diff --git a/src/Parsers/ASTColumnsTransformers.h b/src/Parsers/ASTColumnsTransformers.h
index 5179726e8cb..f67993724c1 100644
--- a/src/Parsers/ASTColumnsTransformers.h
+++ b/src/Parsers/ASTColumnsTransformers.h
@@ -9,6 +9,23 @@ namespace re2
namespace DB
{
+
+/// A list of column transformers
+class ASTColumnsTransformerList : public IAST
+{
+public:
+ String getID(char) const override { return "ColumnsTransformerList"; }
+ ASTPtr clone() const override
+ {
+ auto clone = std::make_shared(*this);
+ clone->cloneChildren();
+ return clone;
+ }
+
+protected:
+ void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
+};
+
class IASTColumnsTransformer : public IAST
{
public:
diff --git a/src/Parsers/ASTExplainQuery.h b/src/Parsers/ASTExplainQuery.h
index 156ffdeacb9..cb8b3199c81 100644
--- a/src/Parsers/ASTExplainQuery.h
+++ b/src/Parsers/ASTExplainQuery.h
@@ -6,6 +6,10 @@
namespace DB
{
+namespace ErrorCodes
+{
+ extern const int BAD_ARGUMENTS;
+}
/// AST, EXPLAIN or other query with meaning of explanation query instead of execution
class ASTExplainQuery : public ASTQueryWithOutput
@@ -23,6 +27,45 @@ public:
CurrentTransaction, /// 'EXPLAIN CURRENT TRANSACTION'
};
+ static String toString(ExplainKind kind)
+ {
+ switch (kind)
+ {
+ case ParsedAST: return "EXPLAIN AST";
+ case AnalyzedSyntax: return "EXPLAIN SYNTAX";
+ case QueryTree: return "EXPLAIN QUERY TREE";
+ case QueryPlan: return "EXPLAIN";
+ case QueryPipeline: return "EXPLAIN PIPELINE";
+ case QueryEstimates: return "EXPLAIN ESTIMATE";
+ case TableOverride: return "EXPLAIN TABLE OVERRIDE";
+ case CurrentTransaction: return "EXPLAIN CURRENT TRANSACTION";
+ }
+
+ UNREACHABLE();
+ }
+
+ static ExplainKind fromString(const String & str)
+ {
+ if (str == "EXPLAIN AST")
+ return ParsedAST;
+ if (str == "EXPLAIN SYNTAX")
+ return AnalyzedSyntax;
+ if (str == "EXPLAIN QUERY TREE")
+ return QueryTree;
+ if (str == "EXPLAIN" || str == "EXPLAIN PLAN")
+ return QueryPlan;
+ if (str == "EXPLAIN PIPELINE")
+ return QueryPipeline;
+ if (str == "EXPLAIN ESTIMATE")
+ return QueryEstimates;
+ if (str == "EXPLAIN TABLE OVERRIDE")
+ return TableOverride;
+ if (str == "EXPLAIN CURRENT TRANSACTION")
+ return CurrentTransaction;
+
+ throw Exception(ErrorCodes::BAD_ARGUMENTS, "Unknown explain kind '{}'", str);
+ }
+
explicit ASTExplainQuery(ExplainKind kind_) : kind(kind_) {}
String getID(char delim) const override { return "Explain" + (delim + toString(kind)); }
@@ -103,23 +146,6 @@ private:
/// Used by EXPLAIN TABLE OVERRIDE
ASTPtr table_function;
ASTPtr table_override;
-
- static String toString(ExplainKind kind)
- {
- switch (kind)
- {
- case ParsedAST: return "EXPLAIN AST";
- case AnalyzedSyntax: return "EXPLAIN SYNTAX";
- case QueryTree: return "EXPLAIN QUERY TREE";
- case QueryPlan: return "EXPLAIN";
- case QueryPipeline: return "EXPLAIN PIPELINE";
- case QueryEstimates: return "EXPLAIN ESTIMATE";
- case TableOverride: return "EXPLAIN TABLE OVERRIDE";
- case CurrentTransaction: return "EXPLAIN CURRENT TRANSACTION";
- }
-
- UNREACHABLE();
- }
};
}
diff --git a/src/Parsers/ASTQualifiedAsterisk.cpp b/src/Parsers/ASTQualifiedAsterisk.cpp
index b755e4eb98c..2dcf481adb7 100644
--- a/src/Parsers/ASTQualifiedAsterisk.cpp
+++ b/src/Parsers/ASTQualifiedAsterisk.cpp
@@ -7,22 +7,18 @@ namespace DB
void ASTQualifiedAsterisk::appendColumnName(WriteBuffer & ostr) const
{
- const auto & qualifier = children.at(0);
qualifier->appendColumnName(ostr);
writeCString(".*", ostr);
}
void ASTQualifiedAsterisk::formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const
{
- const auto & qualifier = children.at(0);
qualifier->formatImpl(settings, state, frame);
settings.ostr << ".*";
- /// Format column transformers
- for (ASTs::const_iterator it = children.begin() + 1; it != children.end(); ++it)
+ if (transformers)
{
- settings.ostr << ' ';
- (*it)->formatImpl(settings, state, frame);
+ transformers->formatImpl(settings, state, frame);
}
}
diff --git a/src/Parsers/ASTQualifiedAsterisk.h b/src/Parsers/ASTQualifiedAsterisk.h
index 1b644532f53..e67b4cd82dd 100644
--- a/src/Parsers/ASTQualifiedAsterisk.h
+++ b/src/Parsers/ASTQualifiedAsterisk.h
@@ -17,11 +17,18 @@ public:
ASTPtr clone() const override
{
auto clone = std::make_shared(*this);
- clone->cloneChildren();
+
+ if (transformers) { clone->transformers = transformers->clone(); clone->children.push_back(clone->transformers); }
+
+ clone->qualifier = qualifier->clone();
+ clone->children.push_back(clone->qualifier);
+
return clone;
}
void appendColumnName(WriteBuffer & ostr) const override;
+ ASTPtr qualifier;
+ ASTPtr transformers;
protected:
void formatImpl(const FormatSettings & settings, FormatState & state, FormatStateStacked frame) const override;
};
diff --git a/src/Parsers/ExpressionElementParsers.cpp b/src/Parsers/ExpressionElementParsers.cpp
index 5951128c285..231897605e0 100644
--- a/src/Parsers/ExpressionElementParsers.cpp
+++ b/src/Parsers/ExpressionElementParsers.cpp
@@ -28,6 +28,8 @@
#include
#include
#include
+#include
+#include
#include
#include
#include
@@ -116,8 +118,40 @@ bool ParserSubquery::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
}
else if (ASTPtr explain_node; explain.parse(pos, explain_node, expected))
{
- /// Replace SELECT * FROM (EXPLAIN SELECT ...) with SELECT * FROM viewExplain(EXPLAIN SELECT ...)
- result_node = buildSelectFromTableFunction(makeASTFunction("viewExplain", explain_node));
+ const auto & explain_query = explain_node->as();
+
+ if (explain_query.getTableFunction() || explain_query.getTableOverride())
+ throw Exception("EXPLAIN in a subquery cannot have a table function or table override", ErrorCodes::BAD_ARGUMENTS);
+
+ /// Replace subquery `(EXPLAIN SELECT ...)`
+ /// with `(SELECT * FROM viewExplain("", "", SELECT ...))`
+
+ String kind_str = ASTExplainQuery::toString(explain_query.getKind());
+
+ String settings_str;
+ if (ASTPtr settings_ast = explain_query.getSettings())
+ {
+ if (!settings_ast->as())
+ throw Exception("EXPLAIN settings must be a SET query", ErrorCodes::BAD_ARGUMENTS);
+ settings_str = queryToString(settings_ast);
+ }
+
+ const ASTPtr & explained_ast = explain_query.getExplainedQuery();
+ if (explained_ast)
+ {
+ auto view_explain = makeASTFunction("viewExplain",
+ std::make_shared(kind_str),
+ std::make_shared(settings_str),
+ explained_ast);
+ result_node = buildSelectFromTableFunction(view_explain);
+ }
+ else
+ {
+ auto view_explain = makeASTFunction("viewExplain",
+ std::make_shared(kind_str),
+ std::make_shared(settings_str));
+ result_node = buildSelectFromTableFunction(view_explain);
+ }
}
else
{
@@ -1623,13 +1657,21 @@ bool ParserAsterisk::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
++pos;
auto asterisk = std::make_shared();
+ auto transformers = std::make_shared();
ParserColumnsTransformers transformers_p(allowed_transformers);
ASTPtr transformer;
while (transformers_p.parse(pos, transformer, expected))
{
- asterisk->children.push_back(transformer);
+ transformers->children.push_back(transformer);
}
- node = asterisk;
+
+ if (!transformers->children.empty())
+ {
+ asterisk->transformers = std::move(transformers);
+ asterisk->children.push_back(asterisk->transformers);
+ }
+
+ node = std::move(asterisk);
return true;
}
return false;
@@ -1638,7 +1680,7 @@ bool ParserAsterisk::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
bool ParserQualifiedAsterisk::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
- if (!ParserCompoundIdentifier(true, true).parse(pos, node, expected))
+ if (!ParserCompoundIdentifier(false, true).parse(pos, node, expected))
return false;
if (pos->type != TokenType::Dot)
@@ -1650,13 +1692,23 @@ bool ParserQualifiedAsterisk::parseImpl(Pos & pos, ASTPtr & node, Expected & exp
++pos;
auto res = std::make_shared();
- res->children.push_back(node);
+ auto transformers = std::make_shared();
ParserColumnsTransformers transformers_p;
ASTPtr transformer;
while (transformers_p.parse(pos, transformer, expected))
{
- res->children.push_back(transformer);
+ transformers->children.push_back(transformer);
}
+
+ res->qualifier = std::move(node);
+ res->children.push_back(res->qualifier);
+
+ if (!transformers->children.empty())
+ {
+ res->transformers = std::move(transformers);
+ res->children.push_back(res->transformers);
+ }
+
node = std::move(res);
return true;
}
@@ -1680,28 +1732,44 @@ static bool parseColumnsMatcherBody(IParser::Pos & pos, ASTPtr & node, Expected
return false;
++pos;
+ auto transformers = std::make_shared();
+ ParserColumnsTransformers transformers_p(allowed_transformers);
+ ASTPtr transformer;
+ while (transformers_p.parse(pos, transformer, expected))
+ {
+ transformers->children.push_back(transformer);
+ }
+
ASTPtr res;
if (column_list)
{
auto list_matcher = std::make_shared();
- list_matcher->column_list = column_list;
- res = list_matcher;
+
+ list_matcher->column_list = std::move(column_list);
+ list_matcher->children.push_back(list_matcher->column_list);
+
+ if (!transformers->children.empty())
+ {
+ list_matcher->transformers = std::move(transformers);
+ list_matcher->children.push_back(list_matcher->transformers);
+ }
+
+ node = std::move(list_matcher);
}
else
{
auto regexp_matcher = std::make_shared();
regexp_matcher->setPattern(regex_node->as().value.get());
- res = regexp_matcher;
+
+ if (!transformers->children.empty())
+ {
+ regexp_matcher->transformers = std::move(transformers);
+ regexp_matcher->children.push_back(regexp_matcher->transformers);
+ }
+
+ node = std::move(regexp_matcher);
}
- ParserColumnsTransformers transformers_p(allowed_transformers);
- ASTPtr transformer;
- while (transformers_p.parse(pos, transformer, expected))
- {
- res->children.push_back(transformer);
- }
-
- node = std::move(res);
return true;
}
@@ -1717,29 +1785,19 @@ bool ParserColumnsMatcher::parseImpl(Pos & pos, ASTPtr & node, Expected & expect
bool ParserQualifiedColumnsMatcher::parseImpl(Pos & pos, ASTPtr & node, Expected & expected)
{
- if (!ParserCompoundIdentifier(true, true).parse(pos, node, expected))
+ if (!ParserCompoundIdentifier(false, true).parse(pos, node, expected))
return false;
auto identifier_node = node;
- const auto & identifier_node_typed = identifier_node->as();
+ auto & identifier_node_typed = identifier_node->as();
+ auto & name_parts = identifier_node_typed.name_parts;
/// ParserCompoundIdentifier parse identifier.COLUMNS
- if (identifier_node_typed.name_parts.size() == 1 || identifier_node_typed.name_parts.back() != "COLUMNS")
+ if (name_parts.size() == 1 || name_parts.back() != "COLUMNS")
return false;
- /// TODO: ASTTableIdentifier can contain only 2 parts
-
- if (identifier_node_typed.name_parts.size() == 2)
- {
- auto table_name = identifier_node_typed.name_parts[0];
- identifier_node = std::make_shared(table_name);
- }
- else
- {
- throw Exception(ErrorCodes::LOGICAL_ERROR,
- "Expected identifier to contain no more than 2 parts. Actual {}",
- identifier_node_typed.full_name);
- }
+ name_parts.pop_back();
+ identifier_node = std::make_shared(std::move(name_parts), false, std::move(node->children));
if (!parseColumnsMatcherBody(pos, node, expected, allowed_transformers))
return false;
@@ -1747,28 +1805,36 @@ bool ParserQualifiedColumnsMatcher::parseImpl(Pos & pos, ASTPtr & node, Expected
if (auto * columns_list_matcher = node->as())
{
auto result = std::make_shared();
+ result->qualifier = std::move(identifier_node);
result->column_list = std::move(columns_list_matcher->column_list);
- result->children.reserve(columns_list_matcher->children.size() + 1);
- result->children.push_back(std::move(identifier_node));
+ result->children.push_back(result->qualifier);
+ result->children.push_back(result->column_list);
- for (auto && child : columns_list_matcher->children)
- result->children.push_back(std::move(child));
+ if (columns_list_matcher->transformers)
+ {
+ result->transformers = std::move(columns_list_matcher->transformers);
+ result->children.push_back(result->transformers);
+ }
- node = result;
+ node = std::move(result);
}
else if (auto * column_regexp_matcher = node->as())
{
auto result = std::make_shared();
+ result->setPattern(column_regexp_matcher->getPattern(), false);
result->setMatcher(column_regexp_matcher->getMatcher());
- result->children.reserve(column_regexp_matcher->children.size() + 1);
- result->children.push_back(std::move(identifier_node));
+ result->qualifier = std::move(identifier_node);
+ result->children.push_back(result->qualifier);
- for (auto && child : column_regexp_matcher->children)
- result->children.push_back(std::move(child));
+ if (column_regexp_matcher->transformers)
+ {
+ result->transformers = std::move(column_regexp_matcher->transformers);
+ result->children.push_back(result->transformers);
+ }
- node = result;
+ node = std::move(result);
}
else
{
diff --git a/src/Parsers/ExpressionListParsers.cpp b/src/Parsers/ExpressionListParsers.cpp
index 06befbef95e..2e20a68f9b1 100644
--- a/src/Parsers/ExpressionListParsers.cpp
+++ b/src/Parsers/ExpressionListParsers.cpp
@@ -4,6 +4,7 @@
#include
#include
+#include
#include
#include
#include
@@ -2194,7 +2195,7 @@ struct ParserExpressionImpl
using Layers = std::vector>;
Action tryParseOperand(Layers & layers, IParser::Pos & pos, Expected & expected);
- static Action tryParseOperator(Layers & layers, IParser::Pos & pos, Expected & expected);
+ Action tryParseOperator(Layers & layers, IParser::Pos & pos, Expected & expected);
};
@@ -2523,8 +2524,6 @@ Action ParserExpressionImpl::tryParseOperand(Layers & layers, IParser::Pos & pos
Action ParserExpressionImpl::tryParseOperator(Layers & layers, IParser::Pos & pos, Expected & expected)
{
- ASTPtr tmp;
-
/// ParserExpression can be called in this part of the query:
/// ALTER TABLE partition_all2 CLEAR INDEX [ p ] IN PARTITION ALL
///
@@ -2544,17 +2543,17 @@ Action ParserExpressionImpl::tryParseOperator(Layers & layers, IParser::Pos & po
if (cur_op == operators_table.end())
{
+ ASTPtr alias;
ParserAlias alias_parser(layers.back()->allow_alias_without_as_keyword);
- auto old_pos = pos;
+
if (layers.back()->allow_alias &&
!layers.back()->parsed_alias &&
- alias_parser.parse(pos, tmp, expected) &&
- layers.back()->insertAlias(tmp))
+ alias_parser.parse(pos, alias, expected) &&
+ layers.back()->insertAlias(alias))
{
layers.back()->parsed_alias = true;
return Action::OPERATOR;
}
- pos = old_pos;
return Action::NONE;
}
@@ -2618,33 +2617,57 @@ Action ParserExpressionImpl::tryParseOperator(Layers & layers, IParser::Pos & po
layers.back()->pushOperand(function);
}
+ /// Dot (TupleElement operator) can be a beginning of a .* or .COLUMNS expressions
+ if (op.type == OperatorType::TupleElement)
+ {
+ ASTPtr tmp;
+ if (asterisk_parser.parse(pos, tmp, expected) ||
+ columns_matcher_parser.parse(pos, tmp, expected))
+ {
+ if (auto * asterisk = tmp->as())
+ {
+ if (!layers.back()->popOperand(asterisk->expression))
+ return Action::NONE;
+ }
+ else if (auto * columns_list_matcher = tmp->as())
+ {
+ if (!layers.back()->popOperand(columns_list_matcher->expression))
+ return Action::NONE;
+ }
+ else if (auto * columns_regexp_matcher = tmp->as())
+ {
+ if (!layers.back()->popOperand(columns_regexp_matcher->expression))
+ return Action::NONE;
+ }
+
+ layers.back()->pushOperand(std::move(tmp));
+ return Action::OPERATOR;
+ }
+ }
+
layers.back()->pushOperator(op);
- if (op.type == OperatorType::ArrayElement)
- layers.push_back(std::make_unique());
-
-
- Action next = Action::OPERAND;
-
/// isNull & isNotNull are postfix unary operators
if (op.type == OperatorType::IsNull)
- next = Action::OPERATOR;
-
- if (op.type == OperatorType::StartBetween || op.type == OperatorType::StartNotBetween)
- layers.back()->between_counter++;
+ return Action::OPERATOR;
if (op.type == OperatorType::Cast)
{
- next = Action::OPERATOR;
-
ASTPtr type_ast;
if (!ParserDataType().parse(pos, type_ast, expected))
return Action::NONE;
layers.back()->pushOperand(std::make_shared(queryToString(type_ast)));
+ return Action::OPERATOR;
}
- return next;
+ if (op.type == OperatorType::ArrayElement)
+ layers.push_back(std::make_unique());
+
+ if (op.type == OperatorType::StartBetween || op.type == OperatorType::StartNotBetween)
+ layers.back()->between_counter++;
+
+ return Action::OPERAND;
}
}
diff --git a/src/Storages/Distributed/DirectoryMonitor.cpp b/src/Storages/Distributed/DirectoryMonitor.cpp
index 39e91e19014..04fac6c3402 100644
--- a/src/Storages/Distributed/DirectoryMonitor.cpp
+++ b/src/Storages/Distributed/DirectoryMonitor.cpp
@@ -60,6 +60,7 @@ namespace ErrorCodes
extern const int TOO_MANY_PARTITIONS;
extern const int DISTRIBUTED_TOO_MANY_PENDING_BYTES;
extern const int ARGUMENT_OUT_OF_BOUND;
+ extern const int LOGICAL_ERROR;
}
@@ -365,18 +366,22 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
const std::string & relative_path_,
ConnectionPoolPtr pool_,
ActionBlocker & monitor_blocker_,
- BackgroundSchedulePool & bg_pool)
+ BackgroundSchedulePool & bg_pool,
+ bool initialize_from_disk)
: storage(storage_)
, pool(std::move(pool_))
, disk(disk_)
, relative_path(relative_path_)
, path(fs::path(disk->getPath()) / relative_path / "")
+ , broken_relative_path(fs::path(relative_path) / "broken")
+ , broken_path(fs::path(path) / "broken" / "")
, should_batch_inserts(storage.getDistributedSettingsRef().monitor_batch_inserts)
, split_batch_on_failure(storage.getDistributedSettingsRef().monitor_split_batch_on_failure)
, dir_fsync(storage.getDistributedSettingsRef().fsync_directories)
, min_batched_block_size_rows(storage.getContext()->getSettingsRef().min_insert_block_size_rows)
, min_batched_block_size_bytes(storage.getContext()->getSettingsRef().min_insert_block_size_bytes)
, current_batch_file_path(path + "current_batch.txt")
+ , pending_files(std::numeric_limits::max())
, default_sleep_time(storage.getDistributedSettingsRef().monitor_sleep_time_ms.totalMilliseconds())
, sleep_time(default_sleep_time)
, max_sleep_time(storage.getDistributedSettingsRef().monitor_max_sleep_time_ms.totalMilliseconds())
@@ -385,6 +390,11 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
, metric_pending_files(CurrentMetrics::DistributedFilesToInsert, 0)
, metric_broken_files(CurrentMetrics::BrokenDistributedFilesToInsert, 0)
{
+ fs::create_directory(broken_path);
+
+ if (initialize_from_disk)
+ initializeFilesFromDisk();
+
task_handle = bg_pool.createTask(getLoggerName() + "/Bg", [this]{ run(); });
task_handle->activateAndSchedule();
}
@@ -392,35 +402,29 @@ StorageDistributedDirectoryMonitor::StorageDistributedDirectoryMonitor(
StorageDistributedDirectoryMonitor::~StorageDistributedDirectoryMonitor()
{
- if (!quit)
+ if (!pending_files.isFinished())
{
- quit = true;
+ pending_files.clearAndFinish();
task_handle->deactivate();
}
}
void StorageDistributedDirectoryMonitor::flushAllData()
{
- if (quit)
+ if (pending_files.isFinished())
return;
std::lock_guard lock{mutex};
-
- const auto & files = getFiles();
- if (!files.empty())
- {
- processFiles(files);
-
- /// Update counters.
- getFiles();
- }
+ if (!hasPendingFiles())
+ return;
+ processFiles();
}
void StorageDistributedDirectoryMonitor::shutdownAndDropAllData()
{
- if (!quit)
+ if (!pending_files.isFinished())
{
- quit = true;
+ pending_files.clearAndFinish();
task_handle->deactivate();
}
@@ -434,19 +438,21 @@ void StorageDistributedDirectoryMonitor::run()
std::lock_guard lock{mutex};
bool do_sleep = false;
- while (!quit)
+ while (!pending_files.isFinished())
{
do_sleep = true;
- const auto & files = getFiles();
- if (files.empty())
+ if (!hasPendingFiles())
break;
if (!monitor_blocker.isCancelled())
{
try
{
- do_sleep = !processFiles(files);
+ processFiles();
+ /// No errors while processing existing files.
+ /// Let's see maybe there are more files to process.
+ do_sleep = false;
std::lock_guard status_lock(status_mutex);
status.last_exception = std::exception_ptr{};
@@ -470,9 +476,7 @@ void StorageDistributedDirectoryMonitor::run()
}
}
else
- {
LOG_DEBUG(log, "Skipping send data over distributed table.");
- }
const auto now = std::chrono::system_clock::now();
if (now - last_decrease_time > decrease_error_count_period)
@@ -487,10 +491,7 @@ void StorageDistributedDirectoryMonitor::run()
break;
}
- /// Update counters.
- getFiles();
-
- if (!quit && do_sleep)
+ if (!pending_files.isFinished() && do_sleep)
task_handle->scheduleAfter(sleep_time.count());
}
@@ -568,41 +569,83 @@ ConnectionPoolPtr StorageDistributedDirectoryMonitor::createPool(const std::stri
settings.distributed_replica_error_cap);
}
-
-std::map StorageDistributedDirectoryMonitor::getFiles()
+bool StorageDistributedDirectoryMonitor::hasPendingFiles() const
{
- std::map files;
+ return fs::exists(current_batch_file_path) || !current_batch_file.empty() || !pending_files.empty();
+}
+
+void StorageDistributedDirectoryMonitor::initializeFilesFromDisk()
+{
+ /// NOTE: This method does not requires to hold status_mutex, hence, no TSA
+ /// annotations in the header file.
fs::directory_iterator end;
- for (fs::directory_iterator it{path}; it != end; ++it)
+
+ /// Initialize pending files
{
- const auto & file_path_str = it->path();
- if (!it->is_directory() && startsWith(fs::path(file_path_str).extension(), ".bin"))
+ size_t bytes_count = 0;
+
+ for (fs::directory_iterator it{path}; it != end; ++it)
{
- files[parse(fs::path(file_path_str).stem())] = file_path_str;
+ const auto & file_path = it->path();
+ const auto & base_name = file_path.stem().string();
+ if (!it->is_directory() && startsWith(fs::path(file_path).extension(), ".bin") && parse(base_name))
+ {
+ const std::string & file_path_str = file_path.string();
+ if (!pending_files.push(file_path_str))
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add pending file");
+ bytes_count += fs::file_size(file_path);
+ }
+ else if (base_name != "tmp" && base_name != "broken")
+ {
+ /// It is OK to log current_batch.txt here too (useful for debugging).
+ LOG_WARNING(log, "Unexpected file {} in {}", file_path.string(), path);
+ }
}
+
+ LOG_TRACE(log, "Files set to {}", pending_files.size());
+ LOG_TRACE(log, "Bytes set to {}", bytes_count);
+
+ metric_pending_files.changeTo(pending_files.size());
+ status.files_count = pending_files.size();
+ status.bytes_count = bytes_count;
}
- return files;
+ /// Initialize broken files
+ {
+ size_t broken_bytes_count = 0;
+ size_t broken_files = 0;
+
+ for (fs::directory_iterator it{broken_path}; it != end; ++it)
+ {
+ const auto & file_path = it->path();
+ if (!it->is_directory() && startsWith(fs::path(file_path).extension(), ".bin") && parse(file_path.stem()))
+ broken_bytes_count += fs::file_size(file_path);
+ else
+ LOG_WARNING(log, "Unexpected file {} in {}", file_path.string(), broken_path);
+ }
+
+ LOG_TRACE(log, "Broken files set to {}", broken_files);
+ LOG_TRACE(log, "Broken bytes set to {}", broken_bytes_count);
+
+ metric_broken_files.changeTo(broken_files);
+ status.broken_files_count = broken_files;
+ status.broken_bytes_count = broken_bytes_count;
+ }
}
-bool StorageDistributedDirectoryMonitor::processFiles(const std::map & files)
+void StorageDistributedDirectoryMonitor::processFiles()
{
if (should_batch_inserts)
- {
- processFilesWithBatching(files);
- }
+ processFilesWithBatching();
else
{
- for (const auto & file : files)
- {
- if (quit)
- return true;
+ /// Process unprocessed file.
+ if (!current_batch_file.empty())
+ processFile(current_batch_file);
- processFile(file.second);
- }
+ while (pending_files.tryPop(current_batch_file))
+ processFile(current_batch_file);
}
-
- return true;
}
void StorageDistributedDirectoryMonitor::processFile(const std::string & file_path)
@@ -649,7 +692,11 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
thread_trace_context->root_span.addAttribute(std::current_exception());
e.addMessage(fmt::format("While sending {}", file_path));
- maybeMarkAsBroken(file_path, e);
+ if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
+ {
+ markAsBroken(file_path);
+ current_batch_file.clear();
+ }
throw;
}
catch (...)
@@ -662,6 +709,7 @@ void StorageDistributedDirectoryMonitor::processFile(const std::string & file_pa
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
markAsSend(file_path);
+ current_batch_file.clear();
LOG_TRACE(log, "Finished processing `{}` (took {} ms)", file_path, watch.elapsedMilliseconds());
}
@@ -701,23 +749,19 @@ struct StorageDistributedDirectoryMonitor::BatchHeader
struct StorageDistributedDirectoryMonitor::Batch
{
- std::vector file_indices;
size_t total_rows = 0;
size_t total_bytes = 0;
bool recovered = false;
StorageDistributedDirectoryMonitor & parent;
- const std::map & file_index_to_path;
+ std::vector files;
bool split_batch_on_failure = true;
bool fsync = false;
bool dir_fsync = false;
- Batch(
- StorageDistributedDirectoryMonitor & parent_,
- const std::map & file_index_to_path_)
+ explicit Batch(StorageDistributedDirectoryMonitor & parent_)
: parent(parent_)
- , file_index_to_path(file_index_to_path_)
, split_batch_on_failure(parent.split_batch_on_failure)
, fsync(parent.storage.getDistributedSettingsRef().fsync_after_insert)
, dir_fsync(parent.dir_fsync)
@@ -732,7 +776,7 @@ struct StorageDistributedDirectoryMonitor::Batch
void send()
{
- if (file_indices.empty())
+ if (files.empty())
return;
CurrentMetrics::Increment metric_increment{CurrentMetrics::DistributedSend};
@@ -775,7 +819,7 @@ struct StorageDistributedDirectoryMonitor::Batch
}
catch (const Exception & e)
{
- if (split_batch_on_failure && file_indices.size() > 1 && isSplittableErrorCode(e.code(), e.isRemoteException()))
+ if (split_batch_on_failure && files.size() > 1 && isSplittableErrorCode(e.code(), e.isRemoteException()))
{
tryLogCurrentException(parent.log, "Trying to split batch due to");
sendSeparateFiles();
@@ -795,44 +839,28 @@ struct StorageDistributedDirectoryMonitor::Batch
}
else
{
- std::vector files;
- for (const auto && file_info : file_index_to_path | boost::adaptors::indexed())
- {
- if (file_info.index() > 8)
- {
- files.push_back("...");
- break;
- }
-
- files.push_back(file_info.value().second);
- }
- e.addMessage(fmt::format("While sending batch, nums: {}, files: {}", file_index_to_path.size(), fmt::join(files, "\n")));
-
+ e.addMessage(fmt::format("While sending a batch of {} files, files: {}", files.size(), fmt::join(files, "\n")));
throw;
}
}
if (!batch_broken)
{
- LOG_TRACE(parent.log, "Sent a batch of {} files (took {} ms).", file_indices.size(), watch.elapsedMilliseconds());
+ LOG_TRACE(parent.log, "Sent a batch of {} files (took {} ms).", files.size(), watch.elapsedMilliseconds());
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, parent.disk, parent.relative_path);
- for (UInt64 file_index : file_indices)
- parent.markAsSend(file_index_to_path.at(file_index));
+ for (const auto & file : files)
+ parent.markAsSend(file);
}
else if (!batch_marked_as_broken)
{
- LOG_ERROR(parent.log, "Marking a batch of {} files as broken.", file_indices.size());
+ LOG_ERROR(parent.log, "Marking a batch of {} files as broken, files: {}", files.size(), fmt::join(files, "\n"));
- for (UInt64 file_idx : file_indices)
- {
- auto file_path = file_index_to_path.find(file_idx);
- if (file_path != file_index_to_path.end())
- parent.markAsBroken(file_path->second);
- }
+ for (const auto & file : files)
+ parent.markAsBroken(file);
}
- file_indices.clear();
+ files.clear();
total_rows = 0;
total_bytes = 0;
recovered = false;
@@ -842,8 +870,11 @@ struct StorageDistributedDirectoryMonitor::Batch
void writeText(WriteBuffer & out)
{
- for (UInt64 file_idx : file_indices)
- out << file_idx << '\n';
+ for (const auto & file : files)
+ {
+ UInt64 file_index = parse(fs::path(file).stem());
+ out << file_index << '\n';
+ }
}
void readText(ReadBuffer & in)
@@ -852,8 +883,9 @@ struct StorageDistributedDirectoryMonitor::Batch
{
UInt64 idx;
in >> idx >> "\n";
- file_indices.push_back(idx);
+ files.push_back(fmt::format("{}/{}.bin", parent.path, idx));
}
+
recovered = true;
}
@@ -865,14 +897,9 @@ private:
IConnectionPool::Entry connection;
- for (UInt64 file_idx : file_indices)
+ for (const auto & file : files)
{
- auto file_path = file_index_to_path.find(file_idx);
- if (file_path == file_index_to_path.end())
- throw Exception(ErrorCodes::DISTRIBUTED_BROKEN_BATCH_INFO,
- "Failed to send batch: file with index {} is absent", file_idx);
-
- ReadBufferFromFile in(file_path->second);
+ ReadBufferFromFile in(file);
const auto & distributed_header = readDistributedHeader(in, parent.log);
OpenTelemetry::TracingContextHolder thread_trace_context(__PRETTY_FUNCTION__,
@@ -886,7 +913,7 @@ private:
compression_expected = connection->getCompression() == Protocol::Compression::Enable;
LOG_DEBUG(parent.log, "Sending a batch of {} files to {} ({} rows, {} bytes).",
- file_indices.size(),
+ files.size(),
connection->getDescription(),
formatReadableQuantity(total_rows),
formatReadableSizeWithBinarySuffix(total_bytes));
@@ -907,19 +934,11 @@ private:
{
size_t broken_files = 0;
- for (UInt64 file_idx : file_indices)
+ for (const auto & file : files)
{
- auto file_path = file_index_to_path.find(file_idx);
- if (file_path == file_index_to_path.end())
- {
- LOG_ERROR(parent.log, "Failed to send one file from batch: file with index {} is absent", file_idx);
- ++broken_files;
- continue;
- }
-
try
{
- ReadBufferFromFile in(file_path->second);
+ ReadBufferFromFile in(file);
const auto & distributed_header = readDistributedHeader(in, parent.log);
// this function is called in a separated thread, so we set up the trace context from the file
@@ -941,9 +960,11 @@ private:
}
catch (Exception & e)
{
- e.addMessage(fmt::format("While sending {}", file_path->second));
- parent.maybeMarkAsBroken(file_path->second, e);
- ++broken_files;
+ if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
+ {
+ parent.markAsBroken(file);
+ ++broken_files;
+ }
}
}
@@ -1023,13 +1044,18 @@ std::shared_ptr StorageDistributedDirectoryMonitor::createSourceFromFil
return std::make_shared(file_name);
}
-bool StorageDistributedDirectoryMonitor::addAndSchedule(size_t file_size, size_t ms)
+bool StorageDistributedDirectoryMonitor::addAndSchedule(const std::string & file_path, size_t file_size, size_t ms)
{
- if (quit)
+ /// NOTE: It is better not to throw in this case, since the file is already
+ /// on disk (see DistributedSink), and it will be processed next time.
+ if (pending_files.isFinished())
return false;
+ if (!pending_files.push(file_path))
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Cannot add pending file");
+
{
- std::lock_guard status_lock(status_mutex);
+ std::lock_guard lock(status_mutex);
metric_pending_files.add();
status.bytes_count += file_size;
++status.files_count;
@@ -1045,33 +1071,25 @@ StorageDistributedDirectoryMonitor::Status StorageDistributedDirectoryMonitor::g
return current_status;
}
-void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map & files)
+void StorageDistributedDirectoryMonitor::processFilesWithBatching()
{
- std::unordered_set file_indices_to_skip;
-
+ /// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch.
if (fs::exists(current_batch_file_path))
{
- /// Possibly, we failed to send a batch on the previous iteration. Try to send exactly the same batch.
- Batch batch(*this, files);
+ Batch batch(*this);
ReadBufferFromFile in{current_batch_file_path};
batch.readText(in);
- file_indices_to_skip.insert(batch.file_indices.begin(), batch.file_indices.end());
batch.send();
+
+ auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
+ fs::remove(current_batch_file_path);
}
std::unordered_map header_to_batch;
- for (const auto & file : files)
+ std::string file_path;
+ while (pending_files.tryPop(file_path))
{
- if (quit)
- return;
-
- UInt64 file_idx = file.first;
- const String & file_path = file.second;
-
- if (file_indices_to_skip.contains(file_idx))
- continue;
-
size_t total_rows = 0;
size_t total_bytes = 0;
Block header;
@@ -1110,8 +1128,9 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
}
catch (const Exception & e)
{
- if (maybeMarkAsBroken(file_path, e))
+ if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
{
+ markAsBroken(file_path);
tryLogCurrentException(log, "File is marked broken due to");
continue;
}
@@ -1125,9 +1144,9 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
std::move(distributed_header.client_info),
std::move(header)
);
- Batch & batch = header_to_batch.try_emplace(batch_header, *this, files).first->second;
+ Batch & batch = header_to_batch.try_emplace(batch_header, *this).first->second;
- batch.file_indices.push_back(file_idx);
+ batch.files.push_back(file_path);
batch.total_rows += total_rows;
batch.total_bytes += total_bytes;
@@ -1155,16 +1174,10 @@ void StorageDistributedDirectoryMonitor::processFilesWithBatching(const std::map
void StorageDistributedDirectoryMonitor::markAsBroken(const std::string & file_path)
{
- const auto last_path_separator_pos = file_path.rfind('/');
- const auto & base_path = file_path.substr(0, last_path_separator_pos + 1);
- const auto & file_name = file_path.substr(last_path_separator_pos + 1);
- const String & broken_path = fs::path(base_path) / "broken/";
- const String & broken_file_path = fs::path(broken_path) / file_name;
-
- fs::create_directory(broken_path);
+ const String & broken_file_path = fs::path(broken_path) / fs::path(file_path).filename();
auto dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, relative_path);
- auto broken_dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, fs::path(relative_path) / "broken/");
+ auto broken_dir_sync_guard = getDirectorySyncGuard(dir_fsync, disk, broken_relative_path);
{
std::lock_guard status_lock(status_mutex);
@@ -1198,21 +1211,9 @@ void StorageDistributedDirectoryMonitor::markAsSend(const std::string & file_pat
fs::remove(file_path);
}
-bool StorageDistributedDirectoryMonitor::maybeMarkAsBroken(const std::string & file_path, const Exception & e)
-{
- /// Mark file as broken if necessary.
- if (isFileBrokenErrorCode(e.code(), e.isRemoteException()))
- {
- markAsBroken(file_path);
- return true;
- }
- else
- return false;
-}
-
std::string StorageDistributedDirectoryMonitor::getLoggerName() const
{
- return storage.getStorageID().getFullTableName() + ".DirectoryMonitor";
+ return storage.getStorageID().getFullTableName() + ".DirectoryMonitor." + disk->getName();
}
void StorageDistributedDirectoryMonitor::updatePath(const std::string & new_relative_path)
diff --git a/src/Storages/Distributed/DirectoryMonitor.h b/src/Storages/Distributed/DirectoryMonitor.h
index 7015fca0311..99e949ddcff 100644
--- a/src/Storages/Distributed/DirectoryMonitor.h
+++ b/src/Storages/Distributed/DirectoryMonitor.h
@@ -1,6 +1,7 @@
#pragma once
#include
+#include
#include
#include
@@ -38,7 +39,8 @@ public:
const std::string & relative_path_,
ConnectionPoolPtr pool_,
ActionBlocker & monitor_blocker_,
- BackgroundSchedulePool & bg_pool);
+ BackgroundSchedulePool & bg_pool,
+ bool initialize_from_disk);
~StorageDistributedDirectoryMonitor();
@@ -53,7 +55,7 @@ public:
static std::shared_ptr createSourceFromFile(const String & file_name);
/// For scheduling via DistributedSink.
- bool addAndSchedule(size_t file_size, size_t ms);
+ bool addAndSchedule(const std::string & file_path, size_t file_size, size_t ms);
struct InternalStatus
{
@@ -78,14 +80,15 @@ public:
private:
void run();
- std::map getFiles();
- bool processFiles(const std::map & files);
+ bool hasPendingFiles() const;
+
+ void initializeFilesFromDisk();
+ void processFiles();
void processFile(const std::string & file_path);
- void processFilesWithBatching(const std::map & files);
+ void processFilesWithBatching();
void markAsBroken(const std::string & file_path);
void markAsSend(const std::string & file_path);
- bool maybeMarkAsBroken(const std::string & file_path, const Exception & e);
std::string getLoggerName() const;
@@ -95,25 +98,33 @@ private:
DiskPtr disk;
std::string relative_path;
std::string path;
+ std::string broken_relative_path;
+ std::string broken_path;
const bool should_batch_inserts = false;
const bool split_batch_on_failure = true;
const bool dir_fsync = false;
const size_t min_batched_block_size_rows = 0;
const size_t min_batched_block_size_bytes = 0;
- String current_batch_file_path;
+
+ /// This is pending data (due to some error) for should_batch_inserts==true
+ std::string current_batch_file_path;
+ /// This is pending data (due to some error) for should_batch_inserts==false
+ std::string current_batch_file;
struct BatchHeader;
struct Batch;
std::mutex status_mutex;
+
InternalStatus status;
+ ConcurrentBoundedQueue pending_files;
+
const std::chrono::milliseconds default_sleep_time;
std::chrono::milliseconds sleep_time;
const std::chrono::milliseconds max_sleep_time;
std::chrono::time_point last_decrease_time {std::chrono::system_clock::now()};
- std::atomic quit {false};
std::mutex mutex;
Poco::Logger * log;
ActionBlocker & monitor_blocker;
diff --git a/src/Storages/Distributed/DistributedSink.cpp b/src/Storages/Distributed/DistributedSink.cpp
index 38ff06f4744..8cee3e9ee91 100644
--- a/src/Storages/Distributed/DistributedSink.cpp
+++ b/src/Storages/Distributed/DistributedSink.cpp
@@ -724,6 +724,9 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
return guard;
};
+ std::vector bin_files;
+ bin_files.reserve(dir_names.size());
+
auto it = dir_names.begin();
/// on first iteration write block to a temporary directory for subsequent
/// hardlinking to ensure the inode is not freed until we're done
@@ -802,8 +805,8 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
}
// Create hardlink here to reuse increment number
- const std::string block_file_path(fs::path(path) / file_name);
- createHardLink(first_file_tmp_path, block_file_path);
+ bin_files.push_back(fs::path(path) / file_name);
+ createHardLink(first_file_tmp_path, bin_files.back());
auto dir_sync_guard = make_directory_sync_guard(*it);
}
++it;
@@ -814,8 +817,8 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
const std::string path(fs::path(disk_path) / (data_path + *it));
fs::create_directory(path);
- const std::string block_file_path(fs::path(path) / (toString(storage.file_names_increment.get()) + ".bin"));
- createHardLink(first_file_tmp_path, block_file_path);
+ bin_files.push_back(fs::path(path) / (toString(storage.file_names_increment.get()) + ".bin"));
+ createHardLink(first_file_tmp_path, bin_files.back());
auto dir_sync_guard = make_directory_sync_guard(*it);
}
@@ -826,10 +829,13 @@ void DistributedSink::writeToShard(const Cluster::ShardInfo & shard_info, const
/// Notify
auto sleep_ms = context->getSettingsRef().distributed_directory_monitor_sleep_time_ms;
- for (const auto & dir_name : dir_names)
+ for (size_t i = 0; i < dir_names.size(); ++i)
{
+ const auto & dir_name = dir_names[i];
+ const auto & bin_file = bin_files[i];
+
auto & directory_monitor = storage.requireDirectoryMonitor(disk, dir_name, /* startup= */ false);
- directory_monitor.addAndSchedule(file_size, sleep_ms.totalMilliseconds());
+ directory_monitor.addAndSchedule(bin_file, file_size, sleep_ms.totalMilliseconds());
}
}
diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.cpp b/src/Storages/MergeTree/MergeTreeDataWriter.cpp
index cff6da85efc..03a3d4fbd72 100644
--- a/src/Storages/MergeTree/MergeTreeDataWriter.cpp
+++ b/src/Storages/MergeTree/MergeTreeDataWriter.cpp
@@ -193,7 +193,8 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
if (!metadata_snapshot->hasPartitionKey()) /// Table is not partitioned.
{
result.emplace_back(Block(block), Row{});
- result[0].offsets = chunk_offsets;
+ if (chunk_offsets != nullptr)
+ result[0].offsets = std::move(chunk_offsets->offsets);
return result;
}
@@ -230,7 +231,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
/// do not interfere with possible calculated primary key columns of the same name.
result.emplace_back(Block(block), get_partition(0));
if (!chunk_offsets_with_partition.empty())
- result[0].offsets = chunk_offsets_with_partition[0];
+ result[0].offsets = std::move(chunk_offsets_with_partition[0]->offsets);
return result;
}
@@ -245,7 +246,7 @@ BlocksWithPartition MergeTreeDataWriter::splitBlockIntoParts(
}
for (size_t i = 0; i < chunk_offsets_with_partition.size(); ++i)
- result[i].offsets = chunk_offsets_with_partition[i];
+ result[i].offsets = std::move(chunk_offsets_with_partition[i]->offsets);
return result;
}
diff --git a/src/Storages/MergeTree/MergeTreeDataWriter.h b/src/Storages/MergeTree/MergeTreeDataWriter.h
index cbf8094f7fd..5dc7bf40922 100644
--- a/src/Storages/MergeTree/MergeTreeDataWriter.h
+++ b/src/Storages/MergeTree/MergeTreeDataWriter.h
@@ -22,15 +22,15 @@ struct BlockWithPartition
{
Block block;
Row partition;
- ChunkOffsetsPtr offsets;
+ std::vector offsets;
BlockWithPartition(Block && block_, Row && partition_)
: block(block_), partition(std::move(partition_))
{
}
- BlockWithPartition(Block && block_, Row && partition_, ChunkOffsetsPtr chunk_offsets_)
- : block(block_), partition(std::move(partition_)), offsets(chunk_offsets_)
+ BlockWithPartition(Block && block_, Row && partition_, std::vector && offsets_)
+ : block(block_), partition(std::move(partition_)), offsets(std::move(offsets_))
{
}
};
diff --git a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
index 7bd5df2b1dc..35e9cc46ebe 100644
--- a/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
+++ b/src/Storages/MergeTree/ReplicatedMergeTreeSink.cpp
@@ -41,15 +41,17 @@ struct ReplicatedMergeTreeSinkImpl::DelayedChunk
{
struct Partition
{
+ Poco::Logger * log;
MergeTreeDataWriter::TemporaryPart temp_part;
UInt64 elapsed_ns;
BlockIDsType block_id;
BlockWithPartition block_with_partition;
- std::unordered_map block_id_to_offset_idx;
+ std::unordered_map> block_id_to_offset_idx;
Partition() = default;
- Partition(MergeTreeDataWriter::TemporaryPart && temp_part_, UInt64 elapsed_ns_, BlockIDsType && block_id_, BlockWithPartition && block_)
- : temp_part(std::move(temp_part_)),
+ Partition(Poco::Logger * log_, MergeTreeDataWriter::TemporaryPart && temp_part_, UInt64 elapsed_ns_, BlockIDsType && block_id_, BlockWithPartition && block_)
+ : log(log_),
+ temp_part(std::move(temp_part_)),
elapsed_ns(elapsed_ns_),
block_id(std::move(block_id_)),
block_with_partition(std::move(block_))
@@ -64,11 +66,105 @@ struct ReplicatedMergeTreeSinkImpl::DelayedChunk
block_id_to_offset_idx.clear();
for (size_t i = 0; i < block_id.size(); ++i)
{
- block_id_to_offset_idx[block_id[i]] = i;
+ block_id_to_offset_idx[block_id[i]].push_back(i);
}
}
}
+ /// this function check if the block contains duplicate inserts.
+ /// if so, we keep only one insert for every duplicate ones.
+ bool filterSelfDuplicate()
+ {
+ if constexpr (async_insert)
+ {
+ std::vector dup_block_ids;
+ for (const auto & [hash_id, offset_indexes] : block_id_to_offset_idx)
+ {
+ /// It means more than one inserts have the same hash id, in this case, we should keep only one of them.
+ if (offset_indexes.size() > 1)
+ dup_block_ids.push_back(hash_id);
+ }
+ if (dup_block_ids.empty())
+ return false;
+
+ filterBlockDuplicate(dup_block_ids, true);
+ return true;
+ }
+ return false;
+ }
+
+ /// remove the conflict parts of block for rewriting again.
+ void filterBlockDuplicate(const std::vector & block_paths, bool self_dedup)
+ {
+ if constexpr (async_insert)
+ {
+ std::vector