Merge remote-tracking branch 'origin/master' into pr-right-joins

This commit is contained in:
Igor Nikonov 2024-11-13 19:33:26 +00:00
commit b8731dc4f4
196 changed files with 3105 additions and 933 deletions

View File

@ -42,7 +42,6 @@ Keep an eye out for upcoming meetups and events around the world. Somewhere else
Upcoming meetups
* [Barcelona Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096876/) - November 12
* [Ghent Meetup](https://www.meetup.com/clickhouse-belgium-user-group/events/303049405/) - November 19
* [Dubai Meetup](https://www.meetup.com/clickhouse-dubai-meetup-group/events/303096989/) - November 21
* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/303096434) - November 26
@ -53,6 +52,7 @@ Upcoming meetups
Recently completed meetups
* [Barcelona Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096876/) - November 12
* [Madrid Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096564/) - October 22
* [Singapore Meetup](https://www.meetup.com/clickhouse-singapore-meetup-group/events/303212064/) - October 3
* [Jakarta Meetup](https://www.meetup.com/clickhouse-indonesia-user-group/events/303191359/) - October 1

View File

@ -145,6 +145,7 @@
#define TSA_TRY_ACQUIRE_SHARED(...) __attribute__((try_acquire_shared_capability(__VA_ARGS__))) /// function tries to acquire a shared capability and returns a boolean value indicating success or failure
#define TSA_RELEASE_SHARED(...) __attribute__((release_shared_capability(__VA_ARGS__))) /// function releases the given shared capability
#define TSA_SCOPED_LOCKABLE __attribute__((scoped_lockable)) /// object of a class has scoped lockable capability
#define TSA_RETURN_CAPABILITY(...) __attribute__((lock_returned(__VA_ARGS__))) /// to return capabilities in functions
/// Macros for suppressing TSA warnings for specific reads/writes (instead of suppressing it for the whole function)
/// They use a lambda function to apply function attribute to a single statement. This enable us to suppress warnings locally instead of

View File

@ -113,7 +113,9 @@ RUN clickhouse local -q 'SELECT 1' >/dev/null 2>&1 && exit 0 || : \
/var/lib/apt/lists/* \
/var/cache/debconf \
/tmp/* \
&& apt-get autoremove --purge -yq dirmngr gnupg2
&& apt-get autoremove --purge -yq dirmngr gnupg2 \
&& chmod ugo+Xrw -R /etc/clickhouse-server /etc/clickhouse-client
# The last chmod is here to make the next one is No-op in docker official library Dockerfile
# post install
# we need to allow "others" access to clickhouse folder, because docker container

View File

@ -162,7 +162,7 @@ if [ -n "${RUN_INITDB_SCRIPTS}" ]; then
tries=${CLICKHOUSE_INIT_TIMEOUT:-1000}
while ! wget --spider --no-check-certificate -T 1 -q "$URL" 2>/dev/null; do
if [ "$tries" -le "0" ]; then
echo >&2 'ClickHouse init process failed.'
echo >&2 'ClickHouse init process timeout.'
exit 1
fi
tries=$(( tries-1 ))

View File

@ -46,7 +46,7 @@ Detailed table sizes with scale factor 100:
| orders | 150.000.000 | 6.15 GB |
| lineitem | 600.00.00 | 26.69 GB |
(The table sizes in ClickHouse are taken from `system.tables.total_bytes` and based on below table definitions.
(Compressed sizes in ClickHouse are taken from `system.tables.total_bytes` and based on below table definitions.)
Now create tables in ClickHouse.
@ -166,10 +166,37 @@ clickhouse-client --format_csv_delimiter '|' --query "INSERT INTO orders FORMAT
clickhouse-client --format_csv_delimiter '|' --query "INSERT INTO lineitem FORMAT CSV" < lineitem.tbl
```
The queries are generated by `./qgen -s <scaling_factor>`. Example queries for `s = 100`:
:::note
Instead of using tpch-kit and generating the tables by yourself, you can alternatively import the data from a public S3 bucket. Make sure
to create empty tables first using above `CREATE` statements.
```sql
-- Scaling factor 1
INSERT INTO nation SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/1/nation.tbl', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
INSERT INTO region SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/1/region.tbl', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
INSERT INTO part SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/1/part.tbl', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
INSERT INTO supplier SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/1/supplier.tbl', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
INSERT INTO partsupp SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/1/partsupp.tbl', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
INSERT INTO customer SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/1/customer.tbl', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
INSERT INTO orders SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/1/orders.tbl', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
INSERT INTO lineitem SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/1/lineitem.tbl', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
-- Scaling factor 100
INSERT INTO nation SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/100/nation.tbl.gz', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
INSERT INTO region SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/100/region.tbl.gz', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
INSERT INTO part SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/100/part.tbl.gz', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
INSERT INTO supplier SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/100/supplier.tbl.gz', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
INSERT INTO partsupp SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/100/partsupp.tbl.gz', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
INSERT INTO customer SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/100/customer.tbl.gz', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
INSERT INTO orders SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/100/orders.tbl.gz', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
INSERT INTO lineitem SELECT * FROM s3('https://clickhouse-datasets.s3.amazonaws.com/h/100/lineitem.tbl.gz', NOSIGN, CSV) SETTINGS format_csv_delimiter = '|', input_format_defaults_for_omitted_fields = 1, input_format_csv_empty_as_default = 1;
````
:::
## Queries
The queries are generated by `./qgen -s <scaling_factor>`. Example queries for `s = 100`:
**Correctness**
The result of the queries agrees with the official results unless mentioned otherwise. To verify, generate a TPC-H database with scale

View File

@ -7,119 +7,4 @@ toc_hidden: true
# List of Aggregate Functions
Standard aggregate functions:
- [count](../reference/count.md)
- [min](../reference/min.md)
- [max](../reference/max.md)
- [sum](../reference/sum.md)
- [avg](../reference/avg.md)
- [any](../reference/any.md)
- [stddevPop](../reference/stddevpop.md)
- [stddevPopStable](../reference/stddevpopstable.md)
- [stddevSamp](../reference/stddevsamp.md)
- [stddevSampStable](../reference/stddevsampstable.md)
- [varPop](../reference/varpop.md)
- [varSamp](../reference/varsamp.md)
- [corr](../reference/corr.md)
- [corr](../reference/corrstable.md)
- [corrMatrix](../reference/corrmatrix.md)
- [covarPop](../reference/covarpop.md)
- [covarStable](../reference/covarpopstable.md)
- [covarPopMatrix](../reference/covarpopmatrix.md)
- [covarSamp](../reference/covarsamp.md)
- [covarSampStable](../reference/covarsampstable.md)
- [covarSampMatrix](../reference/covarsampmatrix.md)
- [entropy](../reference/entropy.md)
- [exponentialMovingAverage](../reference/exponentialmovingaverage.md)
- [intervalLengthSum](../reference/intervalLengthSum.md)
- [kolmogorovSmirnovTest](../reference/kolmogorovsmirnovtest.md)
- [mannwhitneyutest](../reference/mannwhitneyutest.md)
- [median](../reference/median.md)
- [rankCorr](../reference/rankCorr.md)
- [sumKahan](../reference/sumkahan.md)
- [studentTTest](../reference/studentttest.md)
- [welchTTest](../reference/welchttest.md)
ClickHouse-specific aggregate functions:
- [aggThrow](../reference/aggthrow.md)
- [analysisOfVariance](../reference/analysis_of_variance.md)
- [any](../reference/any.md)
- [anyHeavy](../reference/anyheavy.md)
- [anyLast](../reference/anylast.md)
- [boundingRatio](../reference/boundrat.md)
- [first_value](../reference/first_value.md)
- [last_value](../reference/last_value.md)
- [argMin](../reference/argmin.md)
- [argMax](../reference/argmax.md)
- [avgWeighted](../reference/avgweighted.md)
- [topK](../reference/topk.md)
- [topKWeighted](../reference/topkweighted.md)
- [deltaSum](../reference/deltasum.md)
- [deltaSumTimestamp](../reference/deltasumtimestamp.md)
- [flameGraph](../reference/flame_graph.md)
- [groupArray](../reference/grouparray.md)
- [groupArrayLast](../reference/grouparraylast.md)
- [groupUniqArray](../reference/groupuniqarray.md)
- [groupArrayInsertAt](../reference/grouparrayinsertat.md)
- [groupArrayMovingAvg](../reference/grouparraymovingavg.md)
- [groupArrayMovingSum](../reference/grouparraymovingsum.md)
- [groupArraySample](../reference/grouparraysample.md)
- [groupArraySorted](../reference/grouparraysorted.md)
- [groupArrayIntersect](../reference/grouparrayintersect.md)
- [groupBitAnd](../reference/groupbitand.md)
- [groupBitOr](../reference/groupbitor.md)
- [groupBitXor](../reference/groupbitxor.md)
- [groupBitmap](../reference/groupbitmap.md)
- [groupBitmapAnd](../reference/groupbitmapand.md)
- [groupBitmapOr](../reference/groupbitmapor.md)
- [groupBitmapXor](../reference/groupbitmapxor.md)
- [sumWithOverflow](../reference/sumwithoverflow.md)
- [sumMap](../reference/summap.md)
- [sumMapWithOverflow](../reference/summapwithoverflow.md)
- [sumMapFiltered](../parametric-functions.md/#summapfiltered)
- [sumMapFilteredWithOverflow](../parametric-functions.md/#summapfilteredwithoverflow)
- [minMap](../reference/minmap.md)
- [maxMap](../reference/maxmap.md)
- [skewSamp](../reference/skewsamp.md)
- [skewPop](../reference/skewpop.md)
- [kurtSamp](../reference/kurtsamp.md)
- [kurtPop](../reference/kurtpop.md)
- [uniq](../reference/uniq.md)
- [uniqExact](../reference/uniqexact.md)
- [uniqCombined](../reference/uniqcombined.md)
- [uniqCombined64](../reference/uniqcombined64.md)
- [uniqHLL12](../reference/uniqhll12.md)
- [uniqTheta](../reference/uniqthetasketch.md)
- [quantile](../reference/quantile.md)
- [quantiles](../reference/quantiles.md)
- [quantileExact](../reference/quantileexact.md)
- [quantileExactLow](../reference/quantileexact.md#quantileexactlow)
- [quantileExactHigh](../reference/quantileexact.md#quantileexacthigh)
- [quantileExactWeighted](../reference/quantileexactweighted.md)
- [quantileTiming](../reference/quantiletiming.md)
- [quantileTimingWeighted](../reference/quantiletimingweighted.md)
- [quantileDeterministic](../reference/quantiledeterministic.md)
- [quantileTDigest](../reference/quantiletdigest.md)
- [quantileTDigestWeighted](../reference/quantiletdigestweighted.md)
- [quantileBFloat16](../reference/quantilebfloat16.md#quantilebfloat16)
- [quantileBFloat16Weighted](../reference/quantilebfloat16.md#quantilebfloat16weighted)
- [quantileDD](../reference/quantileddsketch.md#quantileddsketch)
- [simpleLinearRegression](../reference/simplelinearregression.md)
- [singleValueOrNull](../reference/singlevalueornull.md)
- [stochasticLinearRegression](../reference/stochasticlinearregression.md)
- [stochasticLogisticRegression](../reference/stochasticlogisticregression.md)
- [categoricalInformationValue](../reference/categoricalinformationvalue.md)
- [contingency](../reference/contingency.md)
- [cramersV](../reference/cramersv.md)
- [cramersVBiasCorrected](../reference/cramersvbiascorrected.md)
- [theilsU](../reference/theilsu.md)
- [maxIntersections](../reference/maxintersections.md)
- [maxIntersectionsPosition](../reference/maxintersectionsposition.md)
- [meanZTest](../reference/meanztest.md)
- [quantileGK](../reference/quantileGK.md)
- [quantileInterpolatedWeighted](../reference/quantileinterpolatedweighted.md)
- [sparkBar](../reference/sparkbar.md)
- [sumCount](../reference/sumcount.md)
- [largestTriangleThreeBuckets](../reference/largestTriangleThreeBuckets.md)
ClickHouse supports all standard SQL aggregate functions ([sum](../reference/sum.md), [avg](../reference/avg.md), [min](../reference/min.md), [max](../reference/max.md), [count](../reference/count.md)), as well as a wide range of other aggregate functions.

View File

@ -6,7 +6,9 @@ sidebar_label: AggregateFunction
# AggregateFunction
Aggregate functions can have an implementation-defined intermediate state that can be serialized to an `AggregateFunction(...)` data type and stored in a table, usually, by means of [a materialized view](../../sql-reference/statements/create/view.md). The common way to produce an aggregate function state is by calling the aggregate function with the `-State` suffix. To get the final result of aggregation in the future, you must use the same aggregate function with the `-Merge`suffix.
Aggregate functions have an implementation-defined intermediate state that can be serialized to an `AggregateFunction(...)` data type and stored in a table, usually, by means of [a materialized view](../../sql-reference/statements/create/view.md).
The common way to produce an aggregate function state is by calling the aggregate function with the `-State` suffix.
To get the final result of aggregation in the future, you must use the same aggregate function with the `-Merge`suffix.
`AggregateFunction(name, types_of_arguments...)` — parametric data type.

View File

@ -6,29 +6,8 @@ sidebar_position: 1
# Data Types in ClickHouse
ClickHouse can store various kinds of data in table cells. This section describes the supported data types and special considerations for using and/or implementing them if any.
This section describes the data types supported by ClickHouse, for example [integers](int-uint.md), [floats](float.md) and [strings](string.md).
:::note
You can check whether a data type name is case-sensitive in the [system.data_type_families](../../operations/system-tables/data_type_families.md#system_tables-data_type_families) table.
:::
ClickHouse data types include:
- **Integer types**: [signed and unsigned integers](./int-uint.md) (`UInt8`, `UInt16`, `UInt32`, `UInt64`, `UInt128`, `UInt256`, `Int8`, `Int16`, `Int32`, `Int64`, `Int128`, `Int256`)
- **Floating-point numbers**: [floats](./float.md)(`Float32` and `Float64`) and [`Decimal` values](./decimal.md)
- **Boolean**: ClickHouse has a [`Boolean` type](./boolean.md)
- **Strings**: [`String`](./string.md) and [`FixedString`](./fixedstring.md)
- **Dates**: use [`Date`](./date.md) and [`Date32`](./date32.md) for days, and [`DateTime`](./datetime.md) and [`DateTime64`](./datetime64.md) for instances in time
- **Object**: the [`Object`](./json.md) stores a JSON document in a single column (deprecated)
- **JSON**: the [`JSON` object](./newjson.md) stores a JSON document in a single column
- **UUID**: a performant option for storing [`UUID` values](./uuid.md)
- **Low cardinality types**: use an [`Enum`](./enum.md) when you have a handful of unique values, or use [`LowCardinality`](./lowcardinality.md) when you have up to 10,000 unique values of a column
- **Arrays**: any column can be defined as an [`Array` of values](./array.md)
- **Maps**: use [`Map`](./map.md) for storing key/value pairs
- **Aggregation function types**: use [`SimpleAggregateFunction`](./simpleaggregatefunction.md) and [`AggregateFunction`](./aggregatefunction.md) for storing the intermediate status of aggregate function results
- **Nested data structures**: A [`Nested` data structure](./nested-data-structures/index.md) is like a table inside a cell
- **Tuples**: A [`Tuple` of elements](./tuple.md), each having an individual type.
- **Nullable**: [`Nullable`](./nullable.md) allows you to store a value as `NULL` when a value is "missing" (instead of the column settings its default value for the data type)
- **IP addresses**: use [`IPv4`](./ipv4.md) and [`IPv6`](./ipv6.md) to efficiently store IP addresses
- **Geo types**: for [geographical data](./geo.md), including `Point`, `Ring`, `Polygon` and `MultiPolygon`
- **Special data types**: including [`Expression`](./special-data-types/expression.md), [`Set`](./special-data-types/set.md), [`Nothing`](./special-data-types/nothing.md) and [`Interval`](./special-data-types/interval.md)
System table [system.data_type_families](../../operations/system-tables/data_type_families.md#system_tables-data_type_families) provides an
overview of all available data types.
It also shows whether a data type is an alias to another data type and its name is case-sensitive (e.g. `bool` vs. `BOOL`).

View File

@ -7,7 +7,7 @@ keywords: [object, data type]
# Object Data Type (deprecated)
**This feature is not production-ready and is now deprecated.** If you need to work with JSON documents, consider using [this guide](/docs/en/integrations/data-formats/json/overview) instead. A new implementation to support JSON object is in progress and can be tracked [here](https://github.com/ClickHouse/ClickHouse/issues/54864).
**This feature is not production-ready and deprecated.** If you need to work with JSON documents, consider using [this guide](/docs/en/integrations/data-formats/json/overview) instead. A new implementation to support JSON object is in progress and can be tracked [here](https://github.com/ClickHouse/ClickHouse/issues/54864).
<hr />

View File

@ -5,7 +5,9 @@ sidebar_label: SimpleAggregateFunction
---
# SimpleAggregateFunction
`SimpleAggregateFunction(name, types_of_arguments...)` data type stores current value of the aggregate function, and does not store its full state as [`AggregateFunction`](../../sql-reference/data-types/aggregatefunction.md) does. This optimization can be applied to functions for which the following property holds: the result of applying a function `f` to a row set `S1 UNION ALL S2` can be obtained by applying `f` to parts of the row set separately, and then again applying `f` to the results: `f(S1 UNION ALL S2) = f(f(S1) UNION ALL f(S2))`. This property guarantees that partial aggregation results are enough to compute the combined one, so we do not have to store and process any extra data.
`SimpleAggregateFunction(name, types_of_arguments...)` data type stores current value (intermediate state) of the aggregate function, but not its full state as [`AggregateFunction`](../../sql-reference/data-types/aggregatefunction.md) does.
This optimization can be applied to functions for which the following property holds: the result of applying a function `f` to a row set `S1 UNION ALL S2` can be obtained by applying `f` to parts of the row set separately, and then again applying `f` to the results: `f(S1 UNION ALL S2) = f(f(S1) UNION ALL f(S2))`.
This property guarantees that partial aggregation results are enough to compute the combined one, so we do not have to store and process any extra data.
The common way to produce an aggregate function value is by calling the aggregate function with the [-SimpleState](../../sql-reference/aggregate-functions/combinators.md#agg-functions-combinator-simplestate) suffix.

View File

@ -4773,7 +4773,7 @@ Result:
## toUTCTimestamp
Convert DateTime/DateTime64 type value from other time zone to UTC timezone timestamp
Convert DateTime/DateTime64 type value from other time zone to UTC timezone timestamp. This function is mainly included for compatibility with Apache Spark and similar frameworks.
**Syntax**
@ -4799,14 +4799,14 @@ SELECT toUTCTimestamp(toDateTime('2023-03-16'), 'Asia/Shanghai');
Result:
``` text
┌─toUTCTimestamp(toDateTime('2023-03-16'),'Asia/Shanghai')┐
┌─toUTCTimestamp(toDateTime('2023-03-16'), 'Asia/Shanghai')┐
│ 2023-03-15 16:00:00 │
└─────────────────────────────────────────────────────────┘
```
## fromUTCTimestamp
Convert DateTime/DateTime64 type value from UTC timezone to other time zone timestamp
Convert DateTime/DateTime64 type value from UTC timezone to other time zone timestamp. This function is mainly included for compatibility with Apache Spark and similar frameworks.
**Syntax**
@ -4832,7 +4832,7 @@ SELECT fromUTCTimestamp(toDateTime64('2023-03-16 10:00:00', 3), 'Asia/Shanghai')
Result:
``` text
┌─fromUTCTimestamp(toDateTime64('2023-03-16 10:00:00',3),'Asia/Shanghai')─┐
┌─fromUTCTimestamp(toDateTime64('2023-03-16 10:00:00',3), 'Asia/Shanghai')─┐
│ 2023-03-16 18:00:00.000 │
└─────────────────────────────────────────────────────────────────────────┘
```

View File

@ -5,70 +5,4 @@ sidebar_position: 62
title: "Geo Functions"
---
## Geographical Coordinates Functions
- [greatCircleDistance](./coordinates.md#greatcircledistance)
- [geoDistance](./coordinates.md#geodistance)
- [greatCircleAngle](./coordinates.md#greatcircleangle)
- [pointInEllipses](./coordinates.md#pointinellipses)
- [pointInPolygon](./coordinates.md#pointinpolygon)
## Geohash Functions
- [geohashEncode](./geohash.md#geohashencode)
- [geohashDecode](./geohash.md#geohashdecode)
- [geohashesInBox](./geohash.md#geohashesinbox)
## H3 Indexes Functions
- [h3IsValid](./h3.md#h3isvalid)
- [h3GetResolution](./h3.md#h3getresolution)
- [h3EdgeAngle](./h3.md#h3edgeangle)
- [h3EdgeLengthM](./h3.md#h3edgelengthm)
- [h3EdgeLengthKm](./h3.md#h3edgelengthkm)
- [geoToH3](./h3.md#geotoh3)
- [h3ToGeo](./h3.md#h3togeo)
- [h3ToGeoBoundary](./h3.md#h3togeoboundary)
- [h3kRing](./h3.md#h3kring)
- [h3GetBaseCell](./h3.md#h3getbasecell)
- [h3HexAreaM2](./h3.md#h3hexaream2)
- [h3HexAreaKm2](./h3.md#h3hexareakm2)
- [h3IndexesAreNeighbors](./h3.md#h3indexesareneighbors)
- [h3ToChildren](./h3.md#h3tochildren)
- [h3ToParent](./h3.md#h3toparent)
- [h3ToString](./h3.md#h3tostring)
- [stringToH3](./h3.md#stringtoh3)
- [h3GetResolution](./h3.md#h3getresolution)
- [h3IsResClassIII](./h3.md#h3isresclassiii)
- [h3IsPentagon](./h3.md#h3ispentagon)
- [h3GetFaces](./h3.md#h3getfaces)
- [h3CellAreaM2](./h3.md#h3cellaream2)
- [h3CellAreaRads2](./h3.md#h3cellarearads2)
- [h3ToCenterChild](./h3.md#h3tocenterchild)
- [h3ExactEdgeLengthM](./h3.md#h3exactedgelengthm)
- [h3ExactEdgeLengthKm](./h3.md#h3exactedgelengthkm)
- [h3ExactEdgeLengthRads](./h3.md#h3exactedgelengthrads)
- [h3NumHexagons](./h3.md#h3numhexagons)
- [h3Line](./h3.md#h3line)
- [h3Distance](./h3.md#h3distance)
- [h3HexRing](./h3.md#h3hexring)
- [h3GetUnidirectionalEdge](./h3.md#h3getunidirectionaledge)
- [h3UnidirectionalEdgeIsValid](./h3.md#h3unidirectionaledgeisvalid)
- [h3GetOriginIndexFromUnidirectionalEdge](./h3.md#h3getoriginindexfromunidirectionaledge)
- [h3GetDestinationIndexFromUnidirectionalEdge](./h3.md#h3getdestinationindexfromunidirectionaledge)
- [h3GetIndexesFromUnidirectionalEdge](./h3.md#h3getindexesfromunidirectionaledge)
- [h3GetUnidirectionalEdgesFromHexagon](./h3.md#h3getunidirectionaledgesfromhexagon)
- [h3GetUnidirectionalEdgeBoundary](./h3.md#h3getunidirectionaledgeboundary)
## S2 Index Functions
- [geoToS2](./s2.md#geotos2)
- [s2ToGeo](./s2.md#s2togeo)
- [s2GetNeighbors](./s2.md#s2getneighbors)
- [s2CellsIntersect](./s2.md#s2cellsintersect)
- [s2CapContains](./s2.md#s2capcontains)
- [s2CapUnion](./s2.md#s2capunion)
- [s2RectAdd](./s2.md#s2rectadd)
- [s2RectContains](./s2.md#s2rectcontains)
- [s2RectUnion](./s2.md#s2rectunion)
- [s2RectIntersection](./s2.md#s2rectintersection)
Functions for working with geometric objects, for example [to calculate distances between points on a sphere](./coordinates.md), [compute geohashes](./geohash.md), and work with [h3 indexes](./h3.md).

View File

@ -24,7 +24,7 @@ All expressions in a query that have the same AST (the same record or same resul
## Types of Results
All functions return a single return as the result (not several values, and not zero values). The type of result is usually defined only by the types of arguments, not by the values. Exceptions are the tupleElement function (the a.N operator), and the toFixedString function.
All functions return a single value as the result (not several values, and not zero values). The type of result is usually defined only by the types of arguments, not by the values. Exceptions are the tupleElement function (the a.N operator), and the toFixedString function.
## Constants

View File

@ -279,7 +279,7 @@ For columns with a new or updated `MATERIALIZED` value expression, all existing
For columns with a new or updated `DEFAULT` value expression, the behavior depends on the ClickHouse version:
- In ClickHouse < v24.2, all existing rows are rewritten.
- ClickHouse >= v24.2 distinguishes if a row value in a column with `DEFAULT` value expression was explicitly specified when it was inserted, or not, i.e. calculated from the `DEFAULT` value expression. If the value was explicitly specified, ClickHouse keeps it as is. If the value was was calculated, ClickHouse changes it to the new or updated `MATERIALIZED` value expression.
- ClickHouse >= v24.2 distinguishes if a row value in a column with `DEFAULT` value expression was explicitly specified when it was inserted, or not, i.e. calculated from the `DEFAULT` value expression. If the value was explicitly specified, ClickHouse keeps it as is. If the value was calculated, ClickHouse changes it to the new or updated `MATERIALIZED` value expression.
Syntax:

View File

@ -6,16 +6,4 @@ sidebar_label: CREATE
# CREATE Queries
Create queries make a new entity of one of the following kinds:
- [DATABASE](/docs/en/sql-reference/statements/create/database.md)
- [TABLE](/docs/en/sql-reference/statements/create/table.md)
- [VIEW](/docs/en/sql-reference/statements/create/view.md)
- [DICTIONARY](/docs/en/sql-reference/statements/create/dictionary.md)
- [FUNCTION](/docs/en/sql-reference/statements/create/function.md)
- [USER](/docs/en/sql-reference/statements/create/user.md)
- [ROLE](/docs/en/sql-reference/statements/create/role.md)
- [ROW POLICY](/docs/en/sql-reference/statements/create/row-policy.md)
- [QUOTA](/docs/en/sql-reference/statements/create/quota.md)
- [SETTINGS PROFILE](/docs/en/sql-reference/statements/create/settings-profile.md)
- [NAMED COLLECTION](/docs/en/sql-reference/statements/create/named-collection.md)
CREATE queries create (for example) new [databases](/docs/en/sql-reference/statements/create/database.md), [tables](/docs/en/sql-reference/statements/create/table.md) and [views](/docs/en/sql-reference/statements/create/view.md).

View File

@ -6,27 +6,4 @@ sidebar_label: List of statements
# ClickHouse SQL Statements
Statements represent various kinds of action you can perform using SQL queries. Each kind of statement has its own syntax and usage details that are described separately:
- [SELECT](/docs/en/sql-reference/statements/select/index.md)
- [INSERT INTO](/docs/en/sql-reference/statements/insert-into.md)
- [CREATE](/docs/en/sql-reference/statements/create/index.md)
- [ALTER](/docs/en/sql-reference/statements/alter/index.md)
- [SYSTEM](/docs/en/sql-reference/statements/system.md)
- [SHOW](/docs/en/sql-reference/statements/show.md)
- [GRANT](/docs/en/sql-reference/statements/grant.md)
- [REVOKE](/docs/en/sql-reference/statements/revoke.md)
- [ATTACH](/docs/en/sql-reference/statements/attach.md)
- [CHECK TABLE](/docs/en/sql-reference/statements/check-table.md)
- [DESCRIBE TABLE](/docs/en/sql-reference/statements/describe-table.md)
- [DETACH](/docs/en/sql-reference/statements/detach.md)
- [DROP](/docs/en/sql-reference/statements/drop.md)
- [EXISTS](/docs/en/sql-reference/statements/exists.md)
- [KILL](/docs/en/sql-reference/statements/kill.md)
- [OPTIMIZE](/docs/en/sql-reference/statements/optimize.md)
- [RENAME](/docs/en/sql-reference/statements/rename.md)
- [SET](/docs/en/sql-reference/statements/set.md)
- [SET ROLE](/docs/en/sql-reference/statements/set-role.md)
- [TRUNCATE](/docs/en/sql-reference/statements/truncate.md)
- [USE](/docs/en/sql-reference/statements/use.md)
- [EXPLAIN](/docs/en/sql-reference/statements/explain.md)
Users interact with ClickHouse using SQL statements. ClickHouse supports common SQL statements like [SELECT](select/index.md) and [CREATE](create/index.md), but it also provides specialized statements like [KILL](kill.md) and [OPTIMIZE](optimize.md).

View File

@ -12,9 +12,12 @@
#include <Compression/ParallelCompressedWriteBuffer.h>
#include <Compression/CompressedReadBuffer.h>
#include <Compression/CompressedReadBufferFromFile.h>
#include <Compression/getCompressionCodecForFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <IO/copyData.h>
#include <Parsers/parseQuery.h>
#include <Parsers/queryToString.h>
#include <Parsers/ExpressionElementParsers.h>
#include <Compression/CompressionFactory.h>
#include <Common/TerminalSize.h>
@ -43,29 +46,24 @@ namespace CurrentMetrics
namespace
{
/// Outputs sizes of uncompressed and compressed blocks for compressed file.
/// Outputs method, sizes of uncompressed and compressed blocks for compressed file.
void checkAndWriteHeader(DB::ReadBuffer & in, DB::WriteBuffer & out)
{
while (!in.eof())
{
in.ignore(16); /// checksum
char header[COMPRESSED_BLOCK_HEADER_SIZE];
in.readStrict(header, COMPRESSED_BLOCK_HEADER_SIZE);
UInt32 size_compressed = unalignedLoad<UInt32>(&header[1]);
UInt32 size_compressed;
UInt32 size_decompressed;
auto codec = DB::getCompressionCodecForFile(in, size_compressed, size_decompressed, true /* skip_to_next_block */);
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
throw DB::Exception(DB::ErrorCodes::TOO_LARGE_SIZE_COMPRESSED, "Too large size_compressed. Most likely corrupted data.");
UInt32 size_decompressed = unalignedLoad<UInt32>(&header[5]);
DB::writeText(queryToString(codec->getFullCodecDesc()), out);
DB::writeChar('\t', out);
DB::writeText(size_decompressed, out);
DB::writeChar('\t', out);
DB::writeText(size_compressed, out);
DB::writeChar('\n', out);
in.ignore(size_compressed - COMPRESSED_BLOCK_HEADER_SIZE);
}
}

View File

@ -14,6 +14,7 @@
#include <Databases/registerDatabases.h>
#include <Databases/DatabaseFilesystem.h>
#include <Databases/DatabaseMemory.h>
#include <Databases/DatabaseAtomic.h>
#include <Databases/DatabasesOverlay.h>
#include <Storages/System/attachSystemTables.h>
#include <Storages/System/attachInformationSchemaTables.h>
@ -257,12 +258,12 @@ static DatabasePtr createMemoryDatabaseIfNotExists(ContextPtr context, const Str
return system_database;
}
static DatabasePtr createClickHouseLocalDatabaseOverlay(const String & name_, ContextPtr context_)
static DatabasePtr createClickHouseLocalDatabaseOverlay(const String & name_, ContextPtr context)
{
auto databaseCombiner = std::make_shared<DatabasesOverlay>(name_, context_);
databaseCombiner->registerNextDatabase(std::make_shared<DatabaseFilesystem>(name_, "", context_));
databaseCombiner->registerNextDatabase(std::make_shared<DatabaseMemory>(name_, context_));
return databaseCombiner;
auto overlay = std::make_shared<DatabasesOverlay>(name_, context);
overlay->registerNextDatabase(std::make_shared<DatabaseAtomic>(name_, fs::weakly_canonical(context->getPath()), UUIDHelpers::generateV4(), context));
overlay->registerNextDatabase(std::make_shared<DatabaseFilesystem>(name_, "", context));
return overlay;
}
/// If path is specified and not empty, will try to setup server environment and load existing metadata
@ -811,7 +812,12 @@ void LocalServer::processConfig()
DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase();
std::string default_database = server_settings[ServerSetting::default_database];
DatabaseCatalog::instance().attachDatabase(default_database, createClickHouseLocalDatabaseOverlay(default_database, global_context));
{
DatabasePtr database = createClickHouseLocalDatabaseOverlay(default_database, global_context);
if (UUID uuid = database->getUUID(); uuid != UUIDHelpers::Nil)
DatabaseCatalog::instance().addUUIDMapping(uuid);
DatabaseCatalog::instance().attachDatabase(default_database, database);
}
global_context->setCurrentDatabase(default_database);
if (getClientConfiguration().has("path"))

View File

@ -1,27 +1,22 @@
#include <unistd.h>
#include <fcntl.h>
#include <base/phdr_cache.h>
#include <Common/EnvironmentChecks.h>
#include <Common/StringUtils.h>
#include <Common/getHashOfLoadedBinary.h>
#include <new>
#include <iostream>
#include <vector>
#include <string>
#include <string_view>
#include <utility> /// pair
#include <fmt/format.h>
#if defined(SANITIZE_COVERAGE)
# include <Common/Coverage.h>
#endif
#include "config.h"
#include "config_tools.h"
#include <Common/EnvironmentChecks.h>
#include <Common/Coverage.h>
#include <Common/StringUtils.h>
#include <Common/getHashOfLoadedBinary.h>
#include <Common/IO.h>
#include <base/phdr_cache.h>
#include <base/coverage.h>
#include <filesystem>
#include <iostream>
#include <new>
#include <string>
#include <string_view>
#include <utility> /// pair
#include <vector>
/// Universal executable for various clickhouse applications
int mainEntryClickHouseServer(int argc, char ** argv);
@ -238,9 +233,12 @@ int main(int argc_, char ** argv_)
/// clickhouse # spawn local
/// clickhouse local # spawn local
/// clickhouse "select ..." # spawn local
/// clickhouse /tmp/repro --enable-analyzer
///
if (main_func == printHelp && !argv.empty() && (argv.size() == 1 || argv[1][0] == '-'
|| std::string_view(argv[1]).contains(' ')))
std::error_code ec;
if (main_func == printHelp && !argv.empty()
&& (argv.size() == 1 || argv[1][0] == '-' || std::string_view(argv[1]).contains(' ')
|| std::filesystem::is_regular_file(std::filesystem::path{argv[1]}, ec)))
{
main_func = mainEntryClickHouseLocal;
}

View File

@ -88,6 +88,7 @@ void FunctionNode::resolveAsFunction(FunctionBasePtr function_value)
function_name = function_value->getName();
function = std::move(function_value);
kind = FunctionKind::ORDINARY;
nulls_action = NullsAction::EMPTY;
}
void FunctionNode::resolveAsAggregateFunction(AggregateFunctionPtr aggregate_function_value)
@ -95,6 +96,12 @@ void FunctionNode::resolveAsAggregateFunction(AggregateFunctionPtr aggregate_fun
function_name = aggregate_function_value->getName();
function = std::move(aggregate_function_value);
kind = FunctionKind::AGGREGATE;
/** When the function is resolved, we do not need the nulls action anymore.
* The only thing that the nulls action does is map from one function to another.
* Thus, the nulls action is encoded in the function name and does not make sense anymore.
* Keeping the nulls action may lead to incorrect comparison of functions, e.g., count() and count() IGNORE NULLS are the same function.
*/
nulls_action = NullsAction::EMPTY;
}
void FunctionNode::resolveAsWindowFunction(AggregateFunctionPtr window_function_value)

View File

@ -48,9 +48,15 @@ ASTPtr JoinNode::toASTTableJoin() const
auto join_expression_ast = children[join_expression_child_index]->toAST();
if (is_using_join_expression)
join_ast->using_expression_list = std::move(join_expression_ast);
{
join_ast->using_expression_list = join_expression_ast;
join_ast->children.push_back(join_ast->using_expression_list);
}
else
join_ast->on_expression = std::move(join_expression_ast);
{
join_ast->on_expression = join_expression_ast;
join_ast->children.push_back(join_ast->on_expression);
}
}
return join_ast;

View File

@ -85,10 +85,9 @@ QueryTreeNodePtr createResolvedFunction(const ContextPtr & context, const String
}
FunctionNodePtr createResolvedAggregateFunction(
const String & name, const QueryTreeNodePtr & argument, const Array & parameters = {}, NullsAction action = NullsAction::EMPTY)
const String & name, const QueryTreeNodePtr & argument, const Array & parameters = {})
{
auto function_node = std::make_shared<FunctionNode>(name);
function_node->setNullsAction(action);
if (!parameters.empty())
{
@ -100,7 +99,7 @@ FunctionNodePtr createResolvedAggregateFunction(
function_node->getArguments().getNodes() = { argument };
AggregateFunctionProperties properties;
auto aggregate_function = AggregateFunctionFactory::instance().get(name, action, {argument->getResultType()}, parameters, properties);
auto aggregate_function = AggregateFunctionFactory::instance().get(name, NullsAction::EMPTY, {argument->getResultType()}, parameters, properties);
function_node->resolveAsAggregateFunction(std::move(aggregate_function));
return function_node;

View File

@ -3,7 +3,6 @@
#include <memory>
#include <Common/Exception.h>
#include "Analyzer/Passes/OptimizeGroupByInjectiveFunctionsPass.h"
#include <IO/WriteHelpers.h>
#include <IO/Operators.h>
@ -16,39 +15,39 @@
#include <Analyzer/ColumnNode.h>
#include <Analyzer/FunctionNode.h>
#include <Analyzer/InDepthQueryTreeVisitor.h>
#include <Analyzer/Utils.h>
#include <Analyzer/Passes/AggregateFunctionOfGroupByKeysPass.h>
#include <Analyzer/Passes/AggregateFunctionsArithmericOperationsPass.h>
#include <Analyzer/Passes/ArrayExistsToHasPass.h>
#include <Analyzer/Passes/AutoFinalOnQueryPass.h>
#include <Analyzer/Passes/ComparisonTupleEliminationPass.h>
#include <Analyzer/Passes/ConvertOrLikeChainPass.h>
#include <Analyzer/Passes/ConvertQueryToCNFPass.h>
#include <Analyzer/Passes/CountDistinctPass.h>
#include <Analyzer/Passes/CrossToInnerJoinPass.h>
#include <Analyzer/Passes/FunctionToSubcolumnsPass.h>
#include <Analyzer/Passes/FuseFunctionsPass.h>
#include <Analyzer/Passes/GroupingFunctionsResolvePass.h>
#include <Analyzer/Passes/IfChainToMultiIfPass.h>
#include <Analyzer/Passes/IfConstantConditionPass.h>
#include <Analyzer/Passes/IfTransformStringsToEnumPass.h>
#include <Analyzer/Passes/LogicalExpressionOptimizerPass.h>
#include <Analyzer/Passes/MultiIfToIfPass.h>
#include <Analyzer/Passes/NormalizeCountVariantsPass.h>
#include <Analyzer/Passes/OptimizeDateOrDateTimeConverterWithPreimagePass.h>
#include <Analyzer/Passes/OptimizeGroupByFunctionKeysPass.h>
#include <Analyzer/Passes/OptimizeGroupByInjectiveFunctionsPass.h>
#include <Analyzer/Passes/OptimizeRedundantFunctionsInOrderByPass.h>
#include <Analyzer/Passes/OrderByLimitByDuplicateEliminationPass.h>
#include <Analyzer/Passes/OrderByTupleEliminationPass.h>
#include <Analyzer/Passes/QueryAnalysisPass.h>
#include <Analyzer/Passes/RemoveUnusedProjectionColumnsPass.h>
#include <Analyzer/Passes/RewriteSumFunctionWithSumAndCountPass.h>
#include <Analyzer/Passes/CountDistinctPass.h>
#include <Analyzer/Passes/UniqToCountPass.h>
#include <Analyzer/Passes/FunctionToSubcolumnsPass.h>
#include <Analyzer/Passes/RewriteAggregateFunctionWithIfPass.h>
#include <Analyzer/Passes/SumIfToCountIfPass.h>
#include <Analyzer/Passes/MultiIfToIfPass.h>
#include <Analyzer/Passes/IfConstantConditionPass.h>
#include <Analyzer/Passes/IfChainToMultiIfPass.h>
#include <Analyzer/Passes/OrderByTupleEliminationPass.h>
#include <Analyzer/Passes/NormalizeCountVariantsPass.h>
#include <Analyzer/Passes/AggregateFunctionsArithmericOperationsPass.h>
#include <Analyzer/Passes/UniqInjectiveFunctionsEliminationPass.h>
#include <Analyzer/Passes/OrderByLimitByDuplicateEliminationPass.h>
#include <Analyzer/Passes/FuseFunctionsPass.h>
#include <Analyzer/Passes/OptimizeGroupByFunctionKeysPass.h>
#include <Analyzer/Passes/IfTransformStringsToEnumPass.h>
#include <Analyzer/Passes/ConvertOrLikeChainPass.h>
#include <Analyzer/Passes/OptimizeRedundantFunctionsInOrderByPass.h>
#include <Analyzer/Passes/GroupingFunctionsResolvePass.h>
#include <Analyzer/Passes/AutoFinalOnQueryPass.h>
#include <Analyzer/Passes/ArrayExistsToHasPass.h>
#include <Analyzer/Passes/ComparisonTupleEliminationPass.h>
#include <Analyzer/Passes/LogicalExpressionOptimizerPass.h>
#include <Analyzer/Passes/CrossToInnerJoinPass.h>
#include <Analyzer/Passes/RewriteSumFunctionWithSumAndCountPass.h>
#include <Analyzer/Passes/ShardNumColumnToFunctionPass.h>
#include <Analyzer/Passes/ConvertQueryToCNFPass.h>
#include <Analyzer/Passes/AggregateFunctionOfGroupByKeysPass.h>
#include <Analyzer/Passes/OptimizeDateOrDateTimeConverterWithPreimagePass.h>
#include <Analyzer/Passes/SumIfToCountIfPass.h>
#include <Analyzer/Passes/UniqInjectiveFunctionsEliminationPass.h>
#include <Analyzer/Passes/UniqToCountPass.h>
#include <Analyzer/Utils.h>
namespace DB
{

View File

@ -1,3 +1,4 @@
#include <Interpreters/ProcessorsProfileLog.h>
#include <Common/FieldVisitorToString.h>
#include <DataTypes/DataTypesNumber.h>
@ -676,6 +677,8 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
"tuple"});
}
}
logProcessorProfile(context, io.pipeline.getProcessors());
}
scalars_cache.emplace(node_with_hash, scalar_block);

View File

@ -1,5 +1,7 @@
#include <Client/ClientApplicationBase.h>
#include <filesystem>
namespace DB
{
@ -108,6 +110,7 @@ void ClientApplicationBase::parseAndCheckOptions(OptionsDescription & options_de
{
/// Two special cases for better usability:
/// - if the option contains a whitespace, it might be a query: clickhouse "SELECT 1"
/// - if the option is a filesystem file, then it's likely a queries file (clickhouse repro.sql)
/// These are relevant for interactive usage - user-friendly, but questionable in general.
/// In case of ambiguity or for scripts, prefer using proper options.
@ -115,8 +118,11 @@ void ClientApplicationBase::parseAndCheckOptions(OptionsDescription & options_de
po::variable_value value(boost::any(op.value), false);
const char * option;
std::error_code ec;
if (token.contains(' '))
option = "query";
else if (std::filesystem::is_regular_file(std::filesystem::path{token}, ec))
option = "queries-file";
else
throw Exception(ErrorCodes::BAD_ARGUMENTS, "Positional option `{}` is not supported.", token);

View File

@ -331,7 +331,7 @@ AsynchronousMetrics::~AsynchronousMetrics()
AsynchronousMetricValues AsynchronousMetrics::getValues() const
{
std::lock_guard lock(data_mutex);
SharedLockGuard lock(values_mutex);
return values;
}
@ -1807,7 +1807,10 @@ void AsynchronousMetrics::update(TimePoint update_time, bool force_update)
first_run = false;
// Finally, update the current metrics.
values = new_values;
{
std::lock_guard values_lock(values_mutex);
values.swap(new_values);
}
}
}

View File

@ -4,6 +4,7 @@
#include <Common/MemoryStatisticsOS.h>
#include <Common/ThreadPool.h>
#include <Common/Stopwatch.h>
#include <Common/SharedMutex.h>
#include <IO/ReadBufferFromFile.h>
#include <condition_variable>
@ -100,6 +101,7 @@ private:
std::condition_variable wait_cond;
bool quit TSA_GUARDED_BY(thread_mutex) = false;
/// Protects all raw data and serializes multiple updates.
mutable std::mutex data_mutex;
/// Some values are incremental and we have to calculate the difference.
@ -107,7 +109,15 @@ private:
bool first_run TSA_GUARDED_BY(data_mutex) = true;
TimePoint previous_update_time TSA_GUARDED_BY(data_mutex);
AsynchronousMetricValues values TSA_GUARDED_BY(data_mutex);
/// Protects saved values.
mutable SharedMutex values_mutex;
/// Values store the result of the last update prepared for reading.
#ifdef OS_LINUX
AsynchronousMetricValues values TSA_GUARDED_BY(values_mutex);
#else
/// When SharedMutex == std::shared_mutex it may not be annotated with the 'capability'.
AsynchronousMetricValues values;
#endif
#if defined(OS_LINUX) || defined(OS_FREEBSD)
MemoryStatisticsOS memory_stat TSA_GUARDED_BY(data_mutex);

View File

@ -1,23 +1,47 @@
#pragma once
#include <Common/OvercommitTracker.h>
#include <base/defines.h>
#include <Common/Exception.h>
#include <Common/OvercommitTracker.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
};
/** LockGuard provides RAII-style locking mechanism for a mutex.
** It's intended to be used like std::unique_ptr but with TSA annotations
** It's intended to be used like std::unique_lock but with TSA annotations
*/
template <typename Mutex>
class TSA_SCOPED_LOCKABLE LockGuard
{
public:
explicit LockGuard(Mutex & mutex_) TSA_ACQUIRE(mutex_) : mutex(mutex_) { mutex.lock(); }
~LockGuard() TSA_RELEASE() { mutex.unlock(); }
explicit LockGuard(Mutex & mutex_) TSA_ACQUIRE(mutex_) : mutex(mutex_) { lock(); }
~LockGuard() TSA_RELEASE() { if (locked) unlock(); }
void lock() TSA_ACQUIRE()
{
/// Don't allow recursive_mutex for now.
if (locked)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't lock twice the same mutex");
mutex.lock();
locked = true;
}
void unlock() TSA_RELEASE()
{
if (!locked)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't unlock the mutex without locking it first");
mutex.unlock();
locked = false;
}
private:
Mutex & mutex;
bool locked = false;
};
template <template<typename> typename TLockGuard, typename Mutex>

View File

@ -10,33 +10,50 @@
namespace DB
{
using Checksum = CityHash_v1_0_2::uint128;
CompressionCodecPtr getCompressionCodecForFile(const IDataPartStorage & data_part_storage, const String & relative_path)
CompressionCodecPtr
getCompressionCodecForFile(ReadBuffer & read_buffer, UInt32 & size_compressed, UInt32 & size_decompressed, bool skip_to_next_block)
{
auto read_buffer = data_part_storage.readFile(relative_path, {}, std::nullopt, std::nullopt);
read_buffer->ignore(sizeof(Checksum));
read_buffer.ignore(sizeof(Checksum));
UInt8 header_size = ICompressionCodec::getHeaderSize();
size_t starting_bytes = read_buffer.count();
PODArray<char> compressed_buffer;
compressed_buffer.resize(header_size);
read_buffer->readStrict(compressed_buffer.data(), header_size);
read_buffer.readStrict(compressed_buffer.data(), header_size);
uint8_t method = ICompressionCodec::readMethod(compressed_buffer.data());
size_compressed = unalignedLoad<UInt32>(&compressed_buffer[1]);
size_decompressed = unalignedLoad<UInt32>(&compressed_buffer[5]);
if (method == static_cast<uint8_t>(CompressionMethodByte::Multiple))
{
compressed_buffer.resize(1);
read_buffer->readStrict(compressed_buffer.data(), 1);
read_buffer.readStrict(compressed_buffer.data(), 1);
compressed_buffer.resize(1 + compressed_buffer[0]);
read_buffer->readStrict(compressed_buffer.data() + 1, compressed_buffer[0]);
read_buffer.readStrict(compressed_buffer.data() + 1, compressed_buffer[0]);
auto codecs_bytes = CompressionCodecMultiple::getCodecsBytesFromData(compressed_buffer.data());
Codecs codecs;
for (auto byte : codecs_bytes)
codecs.push_back(CompressionCodecFactory::instance().get(byte));
if (skip_to_next_block)
read_buffer.ignore(size_compressed - (read_buffer.count() - starting_bytes));
return std::make_shared<CompressionCodecMultiple>(codecs);
}
if (skip_to_next_block)
read_buffer.ignore(size_compressed - (read_buffer.count() - starting_bytes));
return CompressionCodecFactory::instance().get(method);
}
CompressionCodecPtr getCompressionCodecForFile(const IDataPartStorage & data_part_storage, const String & relative_path)
{
auto read_buffer = data_part_storage.readFile(relative_path, {}, std::nullopt, std::nullopt);
UInt32 size_compressed;
UInt32 size_decompressed;
return getCompressionCodecForFile(*read_buffer, size_compressed, size_decompressed, false);
}
}

View File

@ -13,4 +13,8 @@ namespace DB
/// from metadata.
CompressionCodecPtr getCompressionCodecForFile(const IDataPartStorage & data_part_storage, const String & relative_path);
/// Same as above which is used by clickhouse-compressor to print compression statistics of each data block.
CompressionCodecPtr
getCompressionCodecForFile(ReadBuffer & read_buffer, UInt32 & size_compressed, UInt32 & size_decompressed, bool skip_to_next_block);
}

View File

@ -2892,6 +2892,9 @@ Possible values:
**See Also**
- [ORDER BY Clause](../../sql-reference/statements/select/order-by.md/#optimize_read_in_order)
)", 0) \
DECLARE(Bool, read_in_order_use_virtual_row, false, R"(
Use virtual row while reading in order of primary key or its monotonic function fashion. It is useful when searching over multiple parts as only relevant ones are touched.
)", 0) \
DECLARE(Bool, optimize_read_in_window_order, true, R"(
Enable ORDER BY optimization in window clause for reading data in corresponding order in MergeTree tables.
@ -4557,7 +4560,7 @@ Possible values:
- 0 - Disable
- 1 - Enable
)", 0) \
DECLARE(Bool, query_plan_merge_filters, false, R"(
DECLARE(Bool, query_plan_merge_filters, true, R"(
Allow to merge filters in the query plan
)", 0) \
DECLARE(Bool, query_plan_filter_push_down, true, R"(
@ -4858,9 +4861,9 @@ Allows to record the filesystem caching log for each query
DECLARE(Bool, read_from_filesystem_cache_if_exists_otherwise_bypass_cache, false, R"(
Allow to use the filesystem cache in passive mode - benefit from the existing cache entries, but don't put more entries into the cache. If you set this setting for heavy ad-hoc queries and leave it disabled for short real-time queries, this will allows to avoid cache threshing by too heavy queries and to improve the overall system efficiency.
)", 0) \
DECLARE(Bool, skip_download_if_exceeds_query_cache, true, R"(
DECLARE(Bool, filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit, true, R"(
Skip download from remote filesystem if exceeds query cache size
)", 0) \
)", 0) ALIAS(skip_download_if_exceeds_query_cache) \
DECLARE(UInt64, filesystem_cache_max_download_size, (128UL * 1024 * 1024 * 1024), R"(
Max remote filesystem cache size that can be downloaded by a single query
)", 0) \
@ -4872,6 +4875,9 @@ Limit on size of a single batch of file segments that a read buffer can request
)", 0) \
DECLARE(UInt64, filesystem_cache_reserve_space_wait_lock_timeout_milliseconds, 1000, R"(
Wait time to lock cache for space reservation in filesystem cache
)", 0) \
DECLARE(Bool, filesystem_cache_prefer_bigger_buffer_size, true, R"(
Prefer bigger buffer size if filesystem cache is enabled to avoid writing small file segments which deteriorate cache performance. On the other hand, enabling this setting might increase memory usage.
)", 0) \
DECLARE(UInt64, temporary_data_in_cache_reserve_space_wait_lock_timeout_milliseconds, (10 * 60 * 1000), R"(
Wait time to lock cache for space reservation for temporary data in filesystem cache

View File

@ -76,7 +76,11 @@ static std::initializer_list<std::pair<ClickHouseVersion, SettingsChangesHistory
{"backup_restore_keeper_max_retries_while_initializing", 0, 20, "New setting."},
{"backup_restore_keeper_max_retries_while_handling_error", 0, 20, "New setting."},
{"backup_restore_finish_timeout_after_error_sec", 0, 180, "New setting."},
{"query_plan_merge_filters", false, true, "Allow to merge filters in the query plan. This is required to properly support filter-push-down with a new analyzer."},
{"parallel_replicas_local_plan", false, true, "Use local plan for local replica in a query with parallel replicas"},
{"filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit", 1, 1, "Rename of setting skip_download_if_exceeds_query_cache_limit"},
{"filesystem_cache_prefer_bigger_buffer_size", true, true, "New setting"},
{"read_in_order_use_virtual_row", false, false, "Use virtual row while reading in order of primary key or its monotonic function fashion. It is useful when searching over multiple parts as only relevant ones are touched."},
}
},
{"24.10",

View File

@ -6,7 +6,6 @@
#include <Databases/DatabaseReplicated.h>
#include <IO/ReadBufferFromFile.h>
#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/DDLTask.h>
#include <Interpreters/DatabaseCatalog.h>
@ -19,6 +18,7 @@
#include <Common/filesystemHelpers.h>
#include <Core/Settings.h>
namespace fs = std::filesystem;
namespace DB
@ -60,9 +60,6 @@ DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, c
, db_uuid(uuid)
{
assert(db_uuid != UUIDHelpers::Nil);
fs::create_directories(fs::path(getContext()->getPath()) / "metadata");
fs::create_directories(path_to_table_symlinks);
tryCreateMetadataSymlink();
}
DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, ContextPtr context_)
@ -70,6 +67,20 @@ DatabaseAtomic::DatabaseAtomic(String name_, String metadata_path_, UUID uuid, C
{
}
void DatabaseAtomic::createDirectories()
{
std::lock_guard lock(mutex);
createDirectoriesUnlocked();
}
void DatabaseAtomic::createDirectoriesUnlocked()
{
DatabaseOnDisk::createDirectoriesUnlocked();
fs::create_directories(fs::path(getContext()->getPath()) / "metadata");
fs::create_directories(path_to_table_symlinks);
tryCreateMetadataSymlink();
}
String DatabaseAtomic::getTableDataPath(const String & table_name) const
{
std::lock_guard lock(mutex);
@ -108,6 +119,7 @@ void DatabaseAtomic::attachTable(ContextPtr /* context_ */, const String & name,
assert(relative_table_path != data_path && !relative_table_path.empty());
DetachedTables not_in_use;
std::lock_guard lock(mutex);
createDirectoriesUnlocked();
not_in_use = cleanupDetachedTables();
auto table_id = table->getStorageID();
assertDetachedTableNotInUse(table_id.uuid);
@ -208,11 +220,15 @@ void DatabaseAtomic::renameTable(ContextPtr local_context, const String & table_
if (exchange && !supportsAtomicRename(&message))
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "RENAME EXCHANGE is not supported because exchanging files is not supported by the OS ({})", message);
createDirectories();
waitDatabaseStarted();
auto & other_db = dynamic_cast<DatabaseAtomic &>(to_database);
bool inside_database = this == &other_db;
if (!inside_database)
other_db.createDirectories();
String old_metadata_path = getObjectMetadataPath(table_name);
String new_metadata_path = to_database.getObjectMetadataPath(to_table_name);
@ -333,6 +349,7 @@ void DatabaseAtomic::commitCreateTable(const ASTCreateQuery & query, const Stora
const String & table_metadata_tmp_path, const String & table_metadata_path,
ContextPtr query_context)
{
createDirectories();
DetachedTables not_in_use;
auto table_data_path = getTableDataPath(query);
try
@ -469,6 +486,9 @@ void DatabaseAtomic::beforeLoadingMetadata(ContextMutablePtr /*context*/, Loadin
if (mode < LoadingStrictnessLevel::FORCE_RESTORE)
return;
if (!fs::exists(path_to_table_symlinks))
return;
/// Recreate symlinks to table data dirs in case of force restore, because some of them may be broken
for (const auto & table_path : fs::directory_iterator(path_to_table_symlinks))
{
@ -611,6 +631,7 @@ void DatabaseAtomic::renameDatabase(ContextPtr query_context, const String & new
{
/// CREATE, ATTACH, DROP, DETACH and RENAME DATABASE must hold DDLGuard
createDirectories();
waitDatabaseStarted();
bool check_ref_deps = query_context->getSettingsRef()[Setting::check_referential_table_dependencies];
@ -702,4 +723,5 @@ void registerDatabaseAtomic(DatabaseFactory & factory)
};
factory.registerDatabase("Atomic", create_fn);
}
}

View File

@ -76,6 +76,9 @@ protected:
using DetachedTables = std::unordered_map<UUID, StoragePtr>;
[[nodiscard]] DetachedTables cleanupDetachedTables() TSA_REQUIRES(mutex);
void createDirectories();
void createDirectoriesUnlocked() TSA_REQUIRES(mutex);
void tryCreateMetadataSymlink();
virtual bool allowMoveTableToOtherDatabaseEngine(IDatabase & /*to_database*/) const { return false; }

View File

@ -47,6 +47,7 @@ DatabaseLazy::DatabaseLazy(const String & name_, const String & metadata_path_,
: DatabaseOnDisk(name_, metadata_path_, std::filesystem::path("data") / escapeForFileName(name_) / "", "DatabaseLazy (" + name_ + ")", context_)
, expiration_time(expiration_time_)
{
createDirectories();
}

View File

@ -180,7 +180,18 @@ DatabaseOnDisk::DatabaseOnDisk(
, metadata_path(metadata_path_)
, data_path(data_path_)
{
fs::create_directories(local_context->getPath() + data_path);
}
void DatabaseOnDisk::createDirectories()
{
std::lock_guard lock(mutex);
createDirectoriesUnlocked();
}
void DatabaseOnDisk::createDirectoriesUnlocked()
{
fs::create_directories(std::filesystem::path(getContext()->getPath()) / data_path);
fs::create_directories(metadata_path);
}
@ -198,6 +209,8 @@ void DatabaseOnDisk::createTable(
const StoragePtr & table,
const ASTPtr & query)
{
createDirectories();
const auto & settings = local_context->getSettingsRef();
const auto & create = query->as<ASTCreateQuery &>();
assert(table_name == create.getTable());
@ -265,7 +278,6 @@ void DatabaseOnDisk::createTable(
}
commitCreateTable(create, table, table_metadata_tmp_path, table_metadata_path, local_context);
removeDetachedPermanentlyFlag(local_context, table_name, table_metadata_path, false);
}
@ -293,6 +305,8 @@ void DatabaseOnDisk::commitCreateTable(const ASTCreateQuery & query, const Stora
{
try
{
createDirectories();
/// Add a table to the map of known tables.
attachTable(query_context, query.getTable(), table, getTableDataPath(query));
@ -426,6 +440,7 @@ void DatabaseOnDisk::renameTable(
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Moving tables between databases of different engines is not supported");
}
createDirectories();
waitDatabaseStarted();
auto table_data_relative_path = getTableDataPath(table_name);
@ -621,6 +636,9 @@ time_t DatabaseOnDisk::getObjectMetadataModificationTime(const String & object_n
void DatabaseOnDisk::iterateMetadataFiles(const IteratingFunction & process_metadata_file) const
{
if (!fs::exists(metadata_path))
return;
auto process_tmp_drop_metadata_file = [&](const String & file_name)
{
assert(getUUID() == UUIDHelpers::Nil);

View File

@ -99,6 +99,9 @@ protected:
virtual void removeDetachedPermanentlyFlag(ContextPtr context, const String & table_name, const String & table_metadata_path, bool attach);
virtual void setDetachedTableNotInUseForce(const UUID & /*uuid*/) {}
void createDirectories();
void createDirectoriesUnlocked() TSA_REQUIRES(mutex);
const String metadata_path;
const String data_path;
};

View File

@ -416,6 +416,7 @@ public:
std::lock_guard lock{mutex};
return database_name;
}
/// Get UUID of database.
virtual UUID getUUID() const { return UUIDHelpers::Nil; }

View File

@ -62,6 +62,7 @@ DatabaseMaterializedMySQL::DatabaseMaterializedMySQL(
, settings(std::move(settings_))
, materialize_thread(context_, database_name_, mysql_database_name_, std::move(pool_), std::move(client_), binlog_client_, settings.get())
{
createDirectories();
}
DatabaseMaterializedMySQL::~DatabaseMaterializedMySQL() = default;

View File

@ -277,19 +277,6 @@ void AzureObjectStorage::removeObjectImpl(const StoredObject & object, const Sha
}
}
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void AzureObjectStorage::removeObject(const StoredObject & object)
{
removeObjectImpl(object, client.get(), false);
}
void AzureObjectStorage::removeObjects(const StoredObjects & objects)
{
auto client_ptr = client.get();
for (const auto & object : objects)
removeObjectImpl(object, client_ptr, false);
}
void AzureObjectStorage::removeObjectIfExists(const StoredObject & object)
{
removeObjectImpl(object, client.get(), true);

View File

@ -59,11 +59,6 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void removeObject(const StoredObject & object) override;
void removeObjects(const StoredObjects & objects) override;
void removeObjectIfExists(const StoredObject & object) override;
void removeObjectsIfExist(const StoredObjects & objects) override;

View File

@ -148,20 +148,6 @@ void CachedObjectStorage::removeCacheIfExists(const std::string & path_key_for_c
cache->removeKeyIfExists(getCacheKey(path_key_for_cache), FileCache::getCommonUser().user_id);
}
void CachedObjectStorage::removeObject(const StoredObject & object)
{
removeCacheIfExists(object.remote_path);
object_storage->removeObject(object);
}
void CachedObjectStorage::removeObjects(const StoredObjects & objects)
{
for (const auto & object : objects)
removeCacheIfExists(object.remote_path);
object_storage->removeObjects(objects);
}
void CachedObjectStorage::removeObjectIfExists(const StoredObject & object)
{
removeCacheIfExists(object.remote_path);

View File

@ -45,10 +45,6 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void removeObject(const StoredObject & object) override;
void removeObjects(const StoredObjects & objects) override;
void removeObjectIfExists(const StoredObject & object) override;
void removeObjectsIfExist(const StoredObjects & objects) override;

View File

@ -642,7 +642,10 @@ std::unique_ptr<ReadBufferFromFileBase> DiskObjectStorage::readFile(
};
/// Avoid cache fragmentation by choosing bigger buffer size.
bool prefer_bigger_buffer_size = object_storage->supportsCache() && read_settings.enable_filesystem_cache;
bool prefer_bigger_buffer_size = read_settings.filesystem_cache_prefer_bigger_buffer_size
&& object_storage->supportsCache()
&& read_settings.enable_filesystem_cache;
size_t buffer_size = prefer_bigger_buffer_size
? std::max<size_t>(settings.remote_fs_buffer_size, DBMS_DEFAULT_BUFFER_SIZE)
: settings.remote_fs_buffer_size;

View File

@ -480,8 +480,7 @@ struct WriteFileObjectStorageOperation final : public IDiskObjectStorageOperatio
void undo() override
{
if (object_storage.exists(object))
object_storage.removeObject(object);
object_storage.removeObjectIfExists(object);
}
void finalize() override
@ -543,8 +542,7 @@ struct CopyFileObjectStorageOperation final : public IDiskObjectStorageOperation
void undo() override
{
for (const auto & object : created_objects)
destination_object_storage.removeObject(object);
destination_object_storage.removeObjectsIfExist(created_objects);
}
void finalize() override

View File

@ -77,11 +77,6 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void removeObject(const StoredObject & object) override;
void removeObjects(const StoredObjects & objects) override;
void removeObjectIfExists(const StoredObject & object) override;
void removeObjectsIfExist(const StoredObjects & objects) override;
@ -117,6 +112,11 @@ private:
void initializeHDFSFS() const;
std::string extractObjectKeyFromURL(const StoredObject & object) const;
/// Remove file. Throws exception if file doesn't exists or it's a directory.
void removeObject(const StoredObject & object);
void removeObjects(const StoredObjects & objects);
const Poco::Util::AbstractConfiguration & config;
mutable HDFSBuilderWrapper hdfs_builder;

View File

@ -161,11 +161,11 @@ public:
virtual bool isRemote() const = 0;
/// Remove object. Throws exception if object doesn't exists.
virtual void removeObject(const StoredObject & object) = 0;
// virtual void removeObject(const StoredObject & object) = 0;
/// Remove multiple objects. Some object storages can do batch remove in a more
/// optimal way.
virtual void removeObjects(const StoredObjects & objects) = 0;
// virtual void removeObjects(const StoredObjects & objects) = 0;
/// Remove object on path if exists
virtual void removeObjectIfExists(const StoredObject & object) = 0;

View File

@ -81,7 +81,7 @@ std::unique_ptr<WriteBufferFromFileBase> LocalObjectStorage::writeObject( /// NO
return std::make_unique<WriteBufferFromFile>(object.remote_path, buf_size);
}
void LocalObjectStorage::removeObject(const StoredObject & object)
void LocalObjectStorage::removeObject(const StoredObject & object) const
{
/// For local object storage files are actually removed when "metadata" is removed.
if (!exists(object))
@ -91,7 +91,7 @@ void LocalObjectStorage::removeObject(const StoredObject & object)
ErrnoException::throwFromPath(ErrorCodes::CANNOT_UNLINK, object.remote_path, "Cannot unlink file {}", object.remote_path);
}
void LocalObjectStorage::removeObjects(const StoredObjects & objects)
void LocalObjectStorage::removeObjects(const StoredObjects & objects) const
{
for (const auto & object : objects)
removeObject(object);

View File

@ -42,10 +42,6 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void removeObject(const StoredObject & object) override;
void removeObjects(const StoredObjects & objects) override;
void removeObjectIfExists(const StoredObject & object) override;
void removeObjectsIfExist(const StoredObjects & objects) override;
@ -82,6 +78,10 @@ public:
ReadSettings patchSettings(const ReadSettings & read_settings) const override;
private:
void removeObject(const StoredObject & object) const;
void removeObjects(const StoredObjects & objects) const;
String key_prefix;
LoggerPtr log;
std::string description;

View File

@ -203,7 +203,7 @@ void MetadataStorageFromPlainObjectStorageTransaction::unlinkFile(const std::str
{
auto object_key = metadata_storage.object_storage->generateObjectKeyForPath(path, std::nullopt /* key_prefix */);
auto object = StoredObject(object_key.serialize());
metadata_storage.object_storage->removeObject(object);
metadata_storage.object_storage->removeObjectIfExists(object);
}
void MetadataStorageFromPlainObjectStorageTransaction::removeDirectory(const std::string & path)
@ -211,7 +211,7 @@ void MetadataStorageFromPlainObjectStorageTransaction::removeDirectory(const std
if (metadata_storage.object_storage->isWriteOnce())
{
for (auto it = metadata_storage.iterateDirectory(path); it->isValid(); it->next())
metadata_storage.object_storage->removeObject(StoredObject(it->path()));
metadata_storage.object_storage->removeObjectIfExists(StoredObject(it->path()));
}
else
{

View File

@ -107,7 +107,7 @@ void MetadataStorageFromPlainObjectStorageCreateDirectoryOperation::undo(std::un
auto metric = object_storage->getMetadataStorageMetrics().directory_map_size;
CurrentMetrics::sub(metric, 1);
object_storage->removeObject(StoredObject(metadata_object_key.serialize(), path / PREFIX_PATH_FILE_NAME));
object_storage->removeObjectIfExists(StoredObject(metadata_object_key.serialize(), path / PREFIX_PATH_FILE_NAME));
}
else if (write_created)
object_storage->removeObjectIfExists(StoredObject(metadata_object_key.serialize(), path / PREFIX_PATH_FILE_NAME));
@ -247,7 +247,7 @@ void MetadataStorageFromPlainObjectStorageRemoveDirectoryOperation::execute(std:
auto metadata_object_key = createMetadataObjectKey(key_prefix, metadata_key_prefix);
auto metadata_object = StoredObject(/*remote_path*/ metadata_object_key.serialize(), /*local_path*/ path / PREFIX_PATH_FILE_NAME);
object_storage->removeObject(metadata_object);
object_storage->removeObjectIfExists(metadata_object);
{
std::lock_guard lock(path_map.mutex);

View File

@ -326,21 +326,11 @@ void S3ObjectStorage::removeObjectsImpl(const StoredObjects & objects, bool if_e
ProfileEvents::DiskS3DeleteObjects);
}
void S3ObjectStorage::removeObject(const StoredObject & object)
{
removeObjectImpl(object, false);
}
void S3ObjectStorage::removeObjectIfExists(const StoredObject & object)
{
removeObjectImpl(object, true);
}
void S3ObjectStorage::removeObjects(const StoredObjects & objects)
{
removeObjectsImpl(objects, false);
}
void S3ObjectStorage::removeObjectsIfExist(const StoredObjects & objects)
{
removeObjectsImpl(objects, true);

View File

@ -101,13 +101,6 @@ public:
ObjectStorageIteratorPtr iterate(const std::string & path_prefix, size_t max_keys) const override;
/// Uses `DeleteObjectRequest`.
void removeObject(const StoredObject & object) override;
/// Uses `DeleteObjectsRequest` if it is allowed by `s3_capabilities`, otherwise `DeleteObjectRequest`.
/// `DeleteObjectsRequest` is not supported on GCS, see https://issuetracker.google.com/issues/162653700 .
void removeObjects(const StoredObjects & objects) override;
/// Uses `DeleteObjectRequest`.
void removeObjectIfExists(const StoredObject & object) override;

View File

@ -254,16 +254,6 @@ std::unique_ptr<WriteBufferFromFileBase> WebObjectStorage::writeObject( /// NOLI
throwNotAllowed();
}
void WebObjectStorage::removeObject(const StoredObject &)
{
throwNotAllowed();
}
void WebObjectStorage::removeObjects(const StoredObjects &)
{
throwNotAllowed();
}
void WebObjectStorage::removeObjectIfExists(const StoredObject &)
{
throwNotAllowed();

View File

@ -47,10 +47,6 @@ public:
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const WriteSettings & write_settings = {}) override;
void removeObject(const StoredObject & object) override;
void removeObjects(const StoredObjects & objects) override;
void removeObjectIfExists(const StoredObject & object) override;
void removeObjectsIfExist(const StoredObjects & objects) override;

View File

@ -1033,6 +1033,9 @@ private:
size_t tuple_size,
size_t input_rows_count) const
{
if (0 == tuple_size)
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "Comparison of zero-sized tuples is not implemented");
ColumnsWithTypeAndName less_columns(tuple_size);
ColumnsWithTypeAndName equal_columns(tuple_size - 1);
ColumnsWithTypeAndName tmp_columns(2);

View File

@ -4410,7 +4410,7 @@ private:
variant_column = IColumn::mutate(column);
/// Otherwise we should filter column.
else
variant_column = column->filter(filter, variant_size_hint)->assumeMutable();
variant_column = IColumn::mutate(column->filter(filter, variant_size_hint));
assert_cast<ColumnLowCardinality &>(*variant_column).nestedRemoveNullable();
return createVariantFromDescriptorsAndOneNonEmptyVariant(variant_types, std::move(discriminators), std::move(variant_column), variant_discr);

View File

@ -668,6 +668,9 @@ private:
temporary_columns[0] = arguments[0];
size_t tuple_size = type1.getElements().size();
if (tuple_size == 0)
return ColumnTuple::create(input_rows_count);
Columns tuple_columns(tuple_size);
for (size_t i = 0; i < tuple_size; ++i)

View File

@ -61,6 +61,7 @@ struct ReadSettings
bool filesystem_cache_allow_background_download = true;
bool filesystem_cache_allow_background_download_for_metadata_files_in_packed_storage = true;
bool filesystem_cache_allow_background_download_during_fetch = true;
bool filesystem_cache_prefer_bigger_buffer_size = true;
bool use_page_cache_for_disks_without_file_cache = false;
bool read_from_page_cache_if_exists_otherwise_bypass_cache = false;
@ -68,7 +69,7 @@ struct ReadSettings
std::shared_ptr<PageCache> page_cache;
size_t filesystem_cache_max_download_size = (128UL * 1024 * 1024 * 1024);
bool skip_download_if_exceeds_query_cache = true;
bool filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit = true;
size_t remote_read_min_bytes_for_seek = DBMS_DEFAULT_BUFFER_SIZE;

View File

@ -37,7 +37,7 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
/// Case when bucket name represented in domain name of S3 URL.
/// E.g. (https://bucket-name.s3.region.amazonaws.com/key)
/// https://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html#virtual-hosted-style-access
static const RE2 virtual_hosted_style_pattern(R"((.+)\.(s3express[\-a-z0-9]+|s3|cos|obs|oss|eos)([.\-][a-z0-9\-.:]+))");
static const RE2 virtual_hosted_style_pattern(R"((.+)\.(s3express[\-a-z0-9]+|s3|cos|obs|oss-data-acc|oss|eos)([.\-][a-z0-9\-.:]+))");
/// Case when AWS Private Link Interface is being used
/// E.g. (bucket.vpce-07a1cd78f1bd55c5f-j3a3vg6w.s3.us-east-1.vpce.amazonaws.com/bucket-name/key)
@ -115,7 +115,15 @@ URI::URI(const std::string & uri_, bool allow_archive_path_syntax)
&& re2::RE2::FullMatch(uri.getAuthority(), virtual_hosted_style_pattern, &bucket, &name, &endpoint_authority_from_uri))
{
is_virtual_hosted_style = true;
endpoint = uri.getScheme() + "://" + name + endpoint_authority_from_uri;
if (name == "oss-data-acc")
{
bucket = bucket.substr(0, bucket.find('.'));
endpoint = uri.getScheme() + "://" + uri.getHost().substr(bucket.length() + 1);
}
else
{
endpoint = uri.getScheme() + "://" + name + endpoint_authority_from_uri;
}
validateBucket(bucket, uri);
if (!uri.getPath().empty())

View File

@ -212,6 +212,22 @@ TEST(S3UriTest, validPatterns)
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style);
}
{
S3::URI uri("https://bucket-test1.oss-cn-beijing-internal.aliyuncs.com/ab-test");
ASSERT_EQ("https://oss-cn-beijing-internal.aliyuncs.com", uri.endpoint);
ASSERT_EQ("bucket-test1", uri.bucket);
ASSERT_EQ("ab-test", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style);
}
{
S3::URI uri("https://bucket-test.cn-beijing-internal.oss-data-acc.aliyuncs.com/ab-test");
ASSERT_EQ("https://cn-beijing-internal.oss-data-acc.aliyuncs.com", uri.endpoint);
ASSERT_EQ("bucket-test", uri.bucket);
ASSERT_EQ("ab-test", uri.key);
ASSERT_EQ("", uri.version_id);
ASSERT_EQ(true, uri.is_virtual_hosted_style);
}
}
TEST(S3UriTest, versionIdChecks)

View File

@ -53,7 +53,7 @@ FileCacheQueryLimit::QueryContextPtr FileCacheQueryLimit::getOrSetQueryContext(
{
it->second = std::make_shared<QueryContext>(
settings.filesystem_cache_max_download_size,
!settings.skip_download_if_exceeds_query_cache);
!settings.filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit);
}
return it->second;

View File

@ -196,6 +196,7 @@ namespace Setting
extern const SettingsUInt64 filesystem_cache_segments_batch_size;
extern const SettingsBool filesystem_cache_enable_background_download_for_metadata_files_in_packed_storage;
extern const SettingsBool filesystem_cache_enable_background_download_during_fetch;
extern const SettingsBool filesystem_cache_prefer_bigger_buffer_size;
extern const SettingsBool http_make_head_request;
extern const SettingsUInt64 http_max_fields;
extern const SettingsUInt64 http_max_field_name_size;
@ -236,7 +237,7 @@ namespace Setting
extern const SettingsUInt64 remote_fs_read_backoff_max_tries;
extern const SettingsUInt64 remote_read_min_bytes_for_seek;
extern const SettingsBool throw_on_error_from_cache_on_write_operations;
extern const SettingsBool skip_download_if_exceeds_query_cache;
extern const SettingsBool filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit;
extern const SettingsBool s3_allow_parallel_part_upload;
extern const SettingsBool use_page_cache_for_disks_without_file_cache;
extern const SettingsUInt64 use_structure_from_insertion_table_in_table_functions;
@ -5751,9 +5752,10 @@ ReadSettings Context::getReadSettings() const
res.filesystem_cache_allow_background_download_for_metadata_files_in_packed_storage
= settings_ref[Setting::filesystem_cache_enable_background_download_for_metadata_files_in_packed_storage];
res.filesystem_cache_allow_background_download_during_fetch = settings_ref[Setting::filesystem_cache_enable_background_download_during_fetch];
res.filesystem_cache_prefer_bigger_buffer_size = settings_ref[Setting::filesystem_cache_prefer_bigger_buffer_size];
res.filesystem_cache_max_download_size = settings_ref[Setting::filesystem_cache_max_download_size];
res.skip_download_if_exceeds_query_cache = settings_ref[Setting::skip_download_if_exceeds_query_cache];
res.filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit = settings_ref[Setting::filesystem_cache_skip_download_if_exceeds_per_query_cache_write_limit];
res.page_cache = getPageCache();
res.use_page_cache_for_disks_without_file_cache = settings_ref[Setting::use_page_cache_for_disks_without_file_cache];

View File

@ -5,9 +5,11 @@
#include <Core/Settings.h>
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeTuple.h>
#include <IO/WriteBufferFromString.h>
#include <IO/WriteHelpers.h>
#include <Interpreters/Context.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/addTypeConversionToAST.h>
#include <Interpreters/misc.h>
#include <Parsers/ASTExpressionList.h>
@ -19,9 +21,8 @@
#include <Parsers/ASTWithElement.h>
#include <Parsers/queryToString.h>
#include <Processors/Executors/PullingAsyncPipelineExecutor.h>
#include <Common/ProfileEvents.h>
#include <Common/FieldVisitorToString.h>
#include <IO/WriteBufferFromString.h>
#include <Common/ProfileEvents.h>
namespace ProfileEvents
{
@ -67,6 +68,18 @@ bool ExecuteScalarSubqueriesMatcher::needChildVisit(ASTPtr & node, const ASTPtr
return false;
}
if (auto * tables = node->as<ASTTablesInSelectQueryElement>())
{
/// Contrary to what's said in the code block above, ARRAY JOIN needs to resolve the subquery if possible
/// and assign an alias for 02367_optimize_trivial_count_with_array_join to pass. Otherwise it will fail in
/// ArrayJoinedColumnsVisitor (`No alias for non-trivial value in ARRAY JOIN: _a`)
/// This looks 100% as a incomplete code working on top of a bug, but this code has already been made obsolete
/// by the new analyzer, so it's an inconvenience we can live with until we deprecate it.
if (child == tables->array_join)
return true;
return false;
}
return true;
}
@ -246,6 +259,8 @@ void ExecuteScalarSubqueriesMatcher::visit(const ASTSubquery & subquery, ASTPtr
if (tmp_block.rows() != 0)
throw Exception(ErrorCodes::INCORRECT_RESULT_OF_SCALAR_SUBQUERY, "Scalar subquery returned more than one row");
logProcessorProfile(data.getContext(), io.pipeline.getProcessors());
}
block = materializeBlock(block);

View File

@ -1,21 +1,22 @@
#include <chrono>
#include <variant>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/Set.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <IO/Operators.h>
#include <Common/logger_useful.h>
#include <Core/Block.h>
#include <Core/Settings.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <IO/Operators.h>
#include <Interpreters/InterpreterSelectWithUnionQuery.h>
#include <Interpreters/PreparedSets.h>
#include <Interpreters/ProcessorsProfileLog.h>
#include <Interpreters/Set.h>
#include <Processors/Executors/CompletedPipelineExecutor.h>
#include <Processors/QueryPlan/BuildQueryPipelineSettings.h>
#include <Processors/QueryPlan/CreatingSetsStep.h>
#include <Processors/QueryPlan/Optimizations/QueryPlanOptimizationSettings.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Processors/Sinks/EmptySink.h>
#include <Processors/Sinks/NullSink.h>
#include <Processors/QueryPlan/QueryPlan.h>
#include <Core/Block.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <QueryPipeline/SizeLimits.h>
#include <Common/logger_useful.h>
namespace DB
{
@ -239,6 +240,8 @@ SetPtr FutureSetFromSubquery::buildOrderedSetInplace(const ContextPtr & context)
if (!set_and_key->set->isCreated())
return nullptr;
logProcessorProfile(context, pipeline.getProcessors());
return set_and_key->set;
}

View File

@ -1,5 +1,6 @@
#include <Interpreters/ProcessorsProfileLog.h>
#include <Core/Settings.h>
#include <DataTypes/DataTypeArray.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
@ -8,16 +9,19 @@
#include <DataTypes/DataTypeNullable.h>
#include <DataTypes/DataTypeString.h>
#include <DataTypes/DataTypesNumber.h>
#include <Interpreters/Context.h>
#include <base/getFQDNOrHostName.h>
#include <Common/ClickHouseRevision.h>
#include <Common/DateLUTImpl.h>
#include <Common/logger_useful.h>
#include <array>
namespace DB
{
namespace Setting
{
extern const SettingsBool log_processors_profiles;
}
ColumnsDescription ProcessorProfileLogElement::getColumnsDescription()
{
return ColumnsDescription
@ -81,5 +85,57 @@ void ProcessorProfileLogElement::appendToBlock(MutableColumns & columns) const
columns[i++]->insert(output_bytes);
}
void logProcessorProfile(ContextPtr context, const Processors & processors)
{
const Settings & settings = context->getSettingsRef();
if (settings[Setting::log_processors_profiles])
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
{
ProcessorProfileLogElement processor_elem;
const auto time_now = std::chrono::system_clock::now();
processor_elem.event_time = timeInSeconds(time_now);
processor_elem.event_time_microseconds = timeInMicroseconds(time_now);
processor_elem.initial_query_id = context->getInitialQueryId();
processor_elem.query_id = context->getCurrentQueryId();
auto get_proc_id = [](const IProcessor & proc) -> UInt64 { return reinterpret_cast<std::uintptr_t>(&proc); };
for (const auto & processor : processors)
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_step_name = processor->getPlanStepName();
processor_elem.plan_step_description = processor->getPlanStepDescription();
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = static_cast<UInt64>(processor->getElapsedNs() / 1000U);
processor_elem.input_wait_elapsed_us = static_cast<UInt64>(processor->getInputWaitElapsedNs() / 1000U);
processor_elem.output_wait_elapsed_us = static_cast<UInt64>(processor->getOutputWaitElapsedNs() / 1000U);
auto stats = processor->getProcessorDataStats();
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
}
}
}
}
}

View File

@ -50,4 +50,5 @@ public:
using SystemLog<ProcessorProfileLogElement>::SystemLog;
};
void logProcessorProfile(ContextPtr context, const Processors & processors);
}

View File

@ -1,6 +1,7 @@
#include <base/getFQDNOrHostName.h>
#include <Common/DateLUT.h>
#include <Common/DateLUTImpl.h>
#include <Common/LockGuard.h>
#include <DataTypes/DataTypeDate.h>
#include <DataTypes/DataTypeDateTime.h>
#include <DataTypes/DataTypeDateTime64.h>
@ -16,7 +17,6 @@
#include <chrono>
#include <fmt/chrono.h>
#include <mutex>
namespace DB
@ -24,6 +24,15 @@ namespace DB
static auto logger = getLogger("QueryMetricLog");
String timePointToString(QueryMetricLog::TimePoint time)
{
/// fmtlib supports subsecond formatting in 10.0.0. We're in 9.1.0, so we need to add the milliseconds ourselves.
auto seconds = std::chrono::time_point_cast<std::chrono::seconds>(time);
auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(time - seconds).count();
return fmt::format("{:%Y.%m.%d %H:%M:%S}.{:06}", seconds, microseconds);
}
ColumnsDescription QueryMetricLogElement::getColumnsDescription()
{
ColumnsDescription result;
@ -87,36 +96,73 @@ void QueryMetricLog::shutdown()
Base::shutdown();
}
void QueryMetricLog::startQuery(const String & query_id, TimePoint start_time, UInt64 interval_milliseconds)
void QueryMetricLog::collectMetric(const ProcessList & process_list, String query_id)
{
QueryMetricLogStatus status;
status.interval_milliseconds = interval_milliseconds;
status.next_collect_time = start_time + std::chrono::milliseconds(interval_milliseconds);
auto current_time = std::chrono::system_clock::now();
const auto query_info = process_list.getQueryInfo(query_id, false, true, false);
if (!query_info)
{
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Query {} is not running anymore, so we couldn't get its QueryStatusInfo", query_id);
return;
}
LockGuard global_lock(queries_mutex);
auto it = queries.find(query_id);
/// The query might have finished while the scheduled task is running.
if (it == queries.end())
{
global_lock.unlock();
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Query {} not found in the list. Finished while this collecting task was running", query_id);
return;
}
auto & query_status = it->second;
if (!query_status.mutex)
{
global_lock.unlock();
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Query {} finished while this collecting task was running", query_id);
return;
}
LockGuard query_lock(query_status.getMutex());
global_lock.unlock();
auto elem = query_status.createLogMetricElement(query_id, *query_info, current_time);
if (elem)
add(std::move(elem.value()));
}
/// We use TSA_NO_THREAD_SAFETY_ANALYSIS to prevent TSA complaining that we're modifying the query_status fields
/// without locking the mutex. Since we're building it from scratch, there's no harm in not holding it.
/// If we locked it to make TSA happy, TSAN build would falsely complain about
/// lock-order-inversion (potential deadlock)
/// which is not a real issue since QueryMetricLogStatus's mutex cannot be locked by anything else
/// until we add it to the queries map.
void QueryMetricLog::startQuery(const String & query_id, TimePoint start_time, UInt64 interval_milliseconds) TSA_NO_THREAD_SAFETY_ANALYSIS
{
QueryMetricLogStatus query_status;
QueryMetricLogStatusInfo & info = query_status.info;
info.interval_milliseconds = interval_milliseconds;
info.next_collect_time = start_time;
auto context = getContext();
const auto & process_list = context->getProcessList();
status.task = context->getSchedulePool().createTask("QueryMetricLog", [this, &process_list, query_id] {
auto current_time = std::chrono::system_clock::now();
const auto query_info = process_list.getQueryInfo(query_id, false, true, false);
if (!query_info)
{
LOG_TRACE(logger, "Query {} is not running anymore, so we couldn't get its QueryStatusInfo", query_id);
return;
}
auto elem = createLogMetricElement(query_id, *query_info, current_time);
if (elem)
add(std::move(elem.value()));
info.task = context->getSchedulePool().createTask("QueryMetricLog", [this, &process_list, query_id] {
collectMetric(process_list, query_id);
});
std::lock_guard lock(queries_mutex);
status.task->scheduleAfter(interval_milliseconds);
queries.emplace(query_id, std::move(status));
LockGuard global_lock(queries_mutex);
query_status.scheduleNext(query_id);
queries.emplace(query_id, std::move(query_status));
}
void QueryMetricLog::finishQuery(const String & query_id, TimePoint finish_time, QueryStatusInfoPtr query_info)
{
std::unique_lock lock(queries_mutex);
LockGuard global_lock(queries_mutex);
auto it = queries.find(query_id);
/// finishQuery may be called from logExceptionBeforeStart when the query has not even started
@ -124,9 +170,19 @@ void QueryMetricLog::finishQuery(const String & query_id, TimePoint finish_time,
if (it == queries.end())
return;
auto & query_status = it->second;
decltype(query_status.mutex) query_mutex;
LockGuard query_lock(query_status.getMutex());
/// Move the query mutex here so that we hold it until the end, after removing the query from queries.
query_mutex = std::move(query_status.mutex);
query_status.mutex = {};
global_lock.unlock();
if (query_info)
{
auto elem = createLogMetricElement(query_id, *query_info, finish_time, false);
auto elem = query_status.createLogMetricElement(query_id, *query_info, finish_time, false);
if (elem)
add(std::move(elem.value()));
}
@ -139,51 +195,58 @@ void QueryMetricLog::finishQuery(const String & query_id, TimePoint finish_time,
/// that order.
{
/// Take ownership of the task so that we can destroy it in this scope after unlocking `queries_mutex`.
auto task = std::move(it->second.task);
auto task = std::move(query_status.info.task);
/// Build an empty task for the old task to make sure it does not lock any mutex on its destruction.
it->second.task = {};
query_status.info.task = {};
query_lock.unlock();
global_lock.lock();
queries.erase(query_id);
/// Ensure `queries_mutex` is unlocked before calling task's destructor at the end of this
/// scope which will lock `exec_mutex`.
lock.unlock();
global_lock.unlock();
}
}
std::optional<QueryMetricLogElement> QueryMetricLog::createLogMetricElement(const String & query_id, const QueryStatusInfo & query_info, TimePoint query_info_time, bool schedule_next)
void QueryMetricLogStatus::scheduleNext(String query_id)
{
/// fmtlib supports subsecond formatting in 10.0.0. We're in 9.1.0, so we need to add the milliseconds ourselves.
auto seconds = std::chrono::time_point_cast<std::chrono::seconds>(query_info_time);
auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(query_info_time - seconds).count();
LOG_DEBUG(logger, "Collecting query_metric_log for query {} with QueryStatusInfo from {:%Y.%m.%d %H:%M:%S}.{:06}. Schedule next: {}", query_id, seconds, microseconds, schedule_next);
std::unique_lock lock(queries_mutex);
auto query_status_it = queries.find(query_id);
/// The query might have finished while the scheduled task is running.
if (query_status_it == queries.end())
info.next_collect_time += std::chrono::milliseconds(info.interval_milliseconds);
const auto now = std::chrono::system_clock::now();
if (info.next_collect_time > now)
{
lock.unlock();
LOG_TRACE(logger, "Query {} finished already while this collecting task was running", query_id);
return {};
const auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(info.next_collect_time - now).count();
info.task->scheduleAfter(wait_time);
}
auto & query_status = query_status_it->second;
if (query_info_time <= query_status.last_collect_time)
else
{
lock.unlock();
LOG_TRACE(logger, "The next collecting task for query {} should have already run at {}. Scheduling it right now",
query_id, timePointToString(info.next_collect_time));
info.task->schedule();
}
}
std::optional<QueryMetricLogElement> QueryMetricLogStatus::createLogMetricElement(const String & query_id, const QueryStatusInfo & query_info, TimePoint query_info_time, bool schedule_next)
{
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Collecting query_metric_log for query {} and interval {} ms with QueryStatusInfo from {}. Next collection time: {}",
query_id, info.interval_milliseconds, timePointToString(query_info_time),
schedule_next ? timePointToString(info.next_collect_time + std::chrono::milliseconds(info.interval_milliseconds)) : "finished");
if (query_info_time <= info.last_collect_time)
{
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_TRACE(logger, "Query {} has a more recent metrics collected. Skipping this one", query_id);
return {};
}
query_status.last_collect_time = query_info_time;
info.last_collect_time = query_info_time;
QueryMetricLogElement elem;
elem.event_time = timeInSeconds(query_info_time);
elem.event_time_microseconds = timeInMicroseconds(query_info_time);
elem.query_id = query_status_it->first;
elem.query_id = query_id;
elem.memory_usage = query_info.memory_usage > 0 ? query_info.memory_usage : 0;
elem.peak_memory_usage = query_info.peak_memory_usage > 0 ? query_info.peak_memory_usage : 0;
@ -192,7 +255,7 @@ std::optional<QueryMetricLogElement> QueryMetricLog::createLogMetricElement(cons
for (ProfileEvents::Event i = ProfileEvents::Event(0), end = ProfileEvents::end(); i < end; ++i)
{
const auto & new_value = (*(query_info.profile_counters))[i];
auto & old_value = query_status.last_profile_events[i];
auto & old_value = info.last_profile_events[i];
/// Profile event counters are supposed to be monotonic. However, at least the `NetworkReceiveBytes` can be inaccurate.
/// So, since in the future the counter should always have a bigger value than in the past, we skip this event.
@ -208,16 +271,13 @@ std::optional<QueryMetricLogElement> QueryMetricLog::createLogMetricElement(cons
}
else
{
LOG_TRACE(logger, "Query {} has no profile counters", query_id);
/// TODO: remove trace before 24.11 release after checking everything is fine on the CI
LOG_DEBUG(logger, "Query {} has no profile counters", query_id);
elem.profile_events = std::vector<ProfileEvents::Count>(ProfileEvents::end());
}
if (schedule_next)
{
query_status.next_collect_time += std::chrono::milliseconds(query_status.interval_milliseconds);
const auto wait_time = std::chrono::duration_cast<std::chrono::milliseconds>(query_status.next_collect_time - std::chrono::system_clock::now()).count();
query_status.task->scheduleAfter(wait_time);
}
scheduleNext(query_id);
return elem;
}

View File

@ -1,5 +1,6 @@
#pragma once
#include <base/defines.h>
#include <Common/ProfileEvents.h>
#include <Common/CurrentMetrics.h>
#include <Core/BackgroundSchedulePool.h>
@ -11,11 +12,17 @@
#include <chrono>
#include <ctime>
#include <mutex>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
};
/** QueryMetricLogElement is a log of query metric values measured at regular time interval.
*/
@ -34,7 +41,7 @@ struct QueryMetricLogElement
void appendToBlock(MutableColumns & columns) const;
};
struct QueryMetricLogStatus
struct QueryMetricLogStatusInfo
{
UInt64 interval_milliseconds;
std::chrono::system_clock::time_point last_collect_time;
@ -43,24 +50,47 @@ struct QueryMetricLogStatus
BackgroundSchedulePool::TaskHolder task;
};
struct QueryMetricLogStatus
{
using TimePoint = std::chrono::system_clock::time_point;
using Mutex = std::mutex;
QueryMetricLogStatusInfo info TSA_GUARDED_BY(getMutex());
/// We need to be able to move it for the hash map, so we need to add an indirection here.
std::unique_ptr<Mutex> mutex = std::make_unique<Mutex>();
/// Return a reference to the mutex, used for Thread Sanitizer annotations.
Mutex & getMutex() const TSA_RETURN_CAPABILITY(mutex)
{
if (!mutex)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Mutex cannot be NULL");
return *mutex;
}
void scheduleNext(String query_id) TSA_REQUIRES(getMutex());
std::optional<QueryMetricLogElement> createLogMetricElement(const String & query_id, const QueryStatusInfo & query_info, TimePoint query_info_time, bool schedule_next = true) TSA_REQUIRES(getMutex());
};
class QueryMetricLog : public SystemLog<QueryMetricLogElement>
{
using SystemLog<QueryMetricLogElement>::SystemLog;
using TimePoint = std::chrono::system_clock::time_point;
using Base = SystemLog<QueryMetricLogElement>;
public:
using TimePoint = std::chrono::system_clock::time_point;
void shutdown() final;
// Both startQuery and finishQuery are called from the thread that executes the query
/// Both startQuery and finishQuery are called from the thread that executes the query.
void startQuery(const String & query_id, TimePoint start_time, UInt64 interval_milliseconds);
void finishQuery(const String & query_id, TimePoint finish_time, QueryStatusInfoPtr query_info = nullptr);
private:
std::optional<QueryMetricLogElement> createLogMetricElement(const String & query_id, const QueryStatusInfo & query_info, TimePoint query_info_time, bool schedule_next = true);
void collectMetric(const ProcessList & process_list, String query_id);
std::recursive_mutex queries_mutex;
std::unordered_map<String, QueryMetricLogStatus> queries;
std::mutex queries_mutex;
std::unordered_map<String, QueryMetricLogStatus> queries TSA_GUARDED_BY(queries_mutex);
};
}

View File

@ -161,7 +161,13 @@ void QueryNormalizer::visit(ASTTablesInSelectQueryElement & node, const ASTPtr &
{
auto & join = node.table_join->as<ASTTableJoin &>();
if (join.on_expression)
{
ASTPtr original_on_expression = join.on_expression;
visit(join.on_expression, data);
if (join.on_expression != original_on_expression)
join.children = { join.on_expression };
}
}
}

View File

@ -6,6 +6,12 @@
namespace DB
{
namespace ErrorCode
{
extern const int LOGICAL_ERROR;
}
void RewriteArrayExistsFunctionMatcher::visit(ASTPtr & ast, Data & data)
{
if (auto * func = ast->as<ASTFunction>())
@ -20,21 +26,21 @@ void RewriteArrayExistsFunctionMatcher::visit(ASTPtr & ast, Data & data)
if (join->using_expression_list)
{
auto * it = std::find(join->children.begin(), join->children.end(), join->using_expression_list);
if (it == join->children.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not find join->using_expression_list in '{}'", join->formatForLogging());
visit(join->using_expression_list, data);
if (it && *it != join->using_expression_list)
*it = join->using_expression_list;
*it = join->using_expression_list;
}
if (join->on_expression)
{
auto * it = std::find(join->children.begin(), join->children.end(), join->on_expression);
if (it == join->children.end())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Could not find join->on_expression in '{}'", join->formatForLogging());
visit(join->on_expression, data);
if (it && *it != join->on_expression)
*it = join->on_expression;
*it = join->on_expression;
}
}
}

View File

@ -117,7 +117,6 @@ namespace Setting
extern const SettingsOverflowMode join_overflow_mode;
extern const SettingsString log_comment;
extern const SettingsBool log_formatted_queries;
extern const SettingsBool log_processors_profiles;
extern const SettingsBool log_profile_events;
extern const SettingsUInt64 log_queries_cut_to_length;
extern const SettingsBool log_queries;
@ -506,6 +505,7 @@ void logQueryFinish(
auto time_now = std::chrono::system_clock::now();
QueryStatusInfo info = process_list_elem->getInfo(true, settings[Setting::log_profile_events]);
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time_now, std::make_shared<QueryStatusInfo>(info));
elem.type = QueryLogElementType::QUERY_FINISH;
addStatusInfoToQueryLogElement(elem, info, query_ast, context);
@ -551,53 +551,8 @@ void logQueryFinish(
if (auto query_log = context->getQueryLog())
query_log->add(elem);
}
if (settings[Setting::log_processors_profiles])
{
if (auto processors_profile_log = context->getProcessorsProfileLog())
{
ProcessorProfileLogElement processor_elem;
processor_elem.event_time = elem.event_time;
processor_elem.event_time_microseconds = elem.event_time_microseconds;
processor_elem.initial_query_id = elem.client_info.initial_query_id;
processor_elem.query_id = elem.client_info.current_query_id;
auto get_proc_id = [](const IProcessor & proc) -> UInt64 { return reinterpret_cast<std::uintptr_t>(&proc); };
for (const auto & processor : query_pipeline.getProcessors())
{
std::vector<UInt64> parents;
for (const auto & port : processor->getOutputs())
{
if (!port.isConnected())
continue;
const IProcessor & next = port.getInputPort().getProcessor();
parents.push_back(get_proc_id(next));
}
processor_elem.id = get_proc_id(*processor);
processor_elem.parent_ids = std::move(parents);
processor_elem.plan_step = reinterpret_cast<std::uintptr_t>(processor->getQueryPlanStep());
processor_elem.plan_step_name = processor->getPlanStepName();
processor_elem.plan_step_description = processor->getPlanStepDescription();
processor_elem.plan_group = processor->getQueryPlanStepGroup();
processor_elem.processor_name = processor->getName();
processor_elem.elapsed_us = static_cast<UInt64>(processor->getElapsedNs() / 1000U);
processor_elem.input_wait_elapsed_us = static_cast<UInt64>(processor->getInputWaitElapsedNs() / 1000U);
processor_elem.output_wait_elapsed_us = static_cast<UInt64>(processor->getOutputWaitElapsedNs() / 1000U);
auto stats = processor->getProcessorDataStats();
processor_elem.input_rows = stats.input_rows;
processor_elem.input_bytes = stats.input_bytes;
processor_elem.output_rows = stats.output_rows;
processor_elem.output_bytes = stats.output_bytes;
processors_profile_log->add(processor_elem);
}
}
}
logProcessorProfile(context, query_pipeline.getProcessors());
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time_now, std::make_shared<QueryStatusInfo>(info));
}
@ -669,6 +624,7 @@ void logQueryException(
{
elem.query_duration_ms = start_watch.elapsedMilliseconds();
}
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time_now, info);
elem.query_cache_usage = QueryCache::Usage::None;
@ -698,8 +654,6 @@ void logQueryException(
query_span->addAttribute("clickhouse.exception_code", elem.exception_code);
query_span->finish();
}
logQueryMetricLogFinish(context, internal, elem.client_info.current_query_id, time_now, info);
}
void logExceptionBeforeStart(
@ -753,6 +707,8 @@ void logExceptionBeforeStart(
elem.client_info = context->getClientInfo();
logQueryMetricLogFinish(context, false, elem.client_info.current_query_id, std::chrono::system_clock::now(), nullptr);
elem.log_comment = settings[Setting::log_comment];
if (elem.log_comment.size() > settings[Setting::max_query_size])
elem.log_comment.resize(settings[Setting::max_query_size]);
@ -797,8 +753,6 @@ void logExceptionBeforeStart(
ProfileEvents::increment(ProfileEvents::FailedInsertQuery);
}
}
logQueryMetricLogFinish(context, false, elem.client_info.current_query_id, std::chrono::system_clock::now(), nullptr);
}
void validateAnalyzerSettings(ASTPtr ast, bool context_value)

View File

@ -1534,15 +1534,23 @@ static ColumnWithTypeAndName readColumnWithDateData(
for (size_t i = 0; i < orc_int_column->numElements; ++i)
{
Int32 days_num = static_cast<Int32>(orc_int_column->data[i]);
if (check_date_range && (days_num > DATE_LUT_MAX_EXTEND_DAY_NUM || days_num < -DAYNUM_OFFSET_EPOCH))
throw Exception(
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE,
"Input value {} of a column \"{}\" exceeds the range of type Date32",
days_num,
column_name);
if (!orc_int_column->hasNulls || orc_int_column->notNull[i])
{
Int32 days_num = static_cast<Int32>(orc_int_column->data[i]);
if (check_date_range && (days_num > DATE_LUT_MAX_EXTEND_DAY_NUM || days_num < -DAYNUM_OFFSET_EPOCH))
throw Exception(
ErrorCodes::VALUE_IS_OUT_OF_RANGE_OF_DATA_TYPE,
"Input value {} of a column \"{}\" exceeds the range of type Date32",
days_num,
column_name);
column_data.push_back(days_num);
column_data.push_back(days_num);
}
else
{
/// ORC library doesn't guarantee that orc_int_column->data[i] is initialized to zero when orc_int_column->notNull[i] is false since https://github.com/ClickHouse/ClickHouse/pull/69473
column_data.push_back(0);
}
}
return {std::move(internal_column), internal_type, column_name};

View File

@ -1,5 +1,5 @@
#include <Processors/Merges/Algorithms/IMergingAlgorithmWithDelayedChunk.h>
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB

View File

@ -1,5 +1,5 @@
#include <Processors/Merges/Algorithms/IMergingAlgorithmWithSharedChunks.h>
#include <Processors/Merges/Algorithms/MergeTreePartLevelInfo.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB
{

View File

@ -1,29 +0,0 @@
#pragma once
#include <Processors/Chunk.h>
namespace DB
{
/// To carry part level if chunk is produced by a merge tree source
class MergeTreePartLevelInfo : public ChunkInfoCloneable<MergeTreePartLevelInfo>
{
public:
MergeTreePartLevelInfo() = delete;
explicit MergeTreePartLevelInfo(ssize_t part_level)
: origin_merge_tree_part_level(part_level)
{ }
MergeTreePartLevelInfo(const MergeTreePartLevelInfo & other) = default;
size_t origin_merge_tree_part_level = 0;
};
inline size_t getPartLevelFromChunk(const Chunk & chunk)
{
const auto part_level_info = chunk.getChunkInfos().get<MergeTreePartLevelInfo>();
if (part_level_info)
return part_level_info->origin_merge_tree_part_level;
return 0;
}
}

View File

@ -0,0 +1,86 @@
#pragma once
#include <Processors/Chunk.h>
#include <Core/Block.h>
#include <Interpreters/ExpressionActions.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
/// To carry part level and virtual row if chunk is produced by a merge tree source
class MergeTreeReadInfo : public ChunkInfoCloneable<MergeTreeReadInfo>
{
public:
MergeTreeReadInfo() = delete;
explicit MergeTreeReadInfo(size_t part_level) :
origin_merge_tree_part_level(part_level) {}
explicit MergeTreeReadInfo(size_t part_level, const Block & pk_block_, ExpressionActionsPtr virtual_row_conversions_) :
origin_merge_tree_part_level(part_level), pk_block(pk_block_), virtual_row_conversions(std::move(virtual_row_conversions_)) {}
MergeTreeReadInfo(const MergeTreeReadInfo & other) = default;
size_t origin_merge_tree_part_level = 0;
/// If is virtual_row, block should not be empty.
Block pk_block;
ExpressionActionsPtr virtual_row_conversions;
};
inline size_t getPartLevelFromChunk(const Chunk & chunk)
{
const auto read_info = chunk.getChunkInfos().get<MergeTreeReadInfo>();
if (read_info)
return read_info->origin_merge_tree_part_level;
return 0;
}
inline bool isVirtualRow(const Chunk & chunk)
{
const auto read_info = chunk.getChunkInfos().get<MergeTreeReadInfo>();
if (read_info)
return read_info->pk_block.columns() > 0;
return false;
}
inline void setVirtualRow(Chunk & chunk, const Block & header, bool apply_virtual_row_conversions)
{
auto read_info = chunk.getChunkInfos().get<MergeTreeReadInfo>();
chassert(read_info);
Block & pk_block = read_info->pk_block;
// std::cerr << apply_virtual_row_conversions << std::endl;
// std::cerr << read_info->virtual_row_conversions->dumpActions() << std::endl;
if (apply_virtual_row_conversions)
read_info->virtual_row_conversions->execute(pk_block);
// std::cerr << "++++" << pk_block.dumpStructure() << std::endl;
Columns ordered_columns;
ordered_columns.reserve(pk_block.columns());
for (size_t i = 0; i < header.columns(); ++i)
{
const ColumnWithTypeAndName & col = header.getByPosition(i);
if (const auto * pk_col = pk_block.findByName(col.name))
{
if (!col.type->equals(*pk_col->type))
throw Exception(ErrorCodes::LOGICAL_ERROR,
"Virtual row has different type for {}. Expected {}, got {}",
col.name, col.dumpStructure(), pk_col->dumpStructure());
ordered_columns.push_back(pk_col->column);
}
else
ordered_columns.push_back(col.type->createColumnConstWithDefaultValue(1));
}
chunk.setColumns(ordered_columns, 1);
}
}

View File

@ -1,3 +1,4 @@
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
#include <Processors/Merges/Algorithms/MergingSortedAlgorithm.h>
#include <Processors/Transforms/ColumnGathererTransform.h>
#include <IO/WriteBuffer.h>
@ -16,12 +17,14 @@ MergingSortedAlgorithm::MergingSortedAlgorithm(
SortingQueueStrategy sorting_queue_strategy_,
UInt64 limit_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes)
bool use_average_block_sizes,
bool apply_virtual_row_conversions_)
: header(std::move(header_))
, merged_data(use_average_block_sizes, max_block_size_, max_block_size_bytes_)
, description(description_)
, limit(limit_)
, out_row_sources_buf(out_row_sources_buf_)
, apply_virtual_row_conversions(apply_virtual_row_conversions_)
, current_inputs(num_inputs)
, sorting_queue_strategy(sorting_queue_strategy_)
, cursors(num_inputs)
@ -49,6 +52,15 @@ void MergingSortedAlgorithm::addInput()
void MergingSortedAlgorithm::initialize(Inputs inputs)
{
for (auto & input : inputs)
{
if (!isVirtualRow(input.chunk))
continue;
setVirtualRow(input.chunk, header, apply_virtual_row_conversions);
input.skip_last_row = true;
}
removeConstAndSparse(inputs);
merged_data.initialize(header, inputs);
current_inputs = std::move(inputs);

View File

@ -22,7 +22,8 @@ public:
SortingQueueStrategy sorting_queue_strategy_,
UInt64 limit_ = 0,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false);
bool use_average_block_sizes = false,
bool apply_virtual_row_conversions_ = true);
void addInput();
@ -47,6 +48,8 @@ private:
/// If it is not nullptr then it should be populated during execution
WriteBuffer * out_row_sources_buf = nullptr;
bool apply_virtual_row_conversions;
/// Chunks currently being merged.
Inputs current_inputs;

View File

@ -1,3 +1,4 @@
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
#include <Processors/Merges/IMergingTransform.h>
namespace DB
@ -101,11 +102,16 @@ IProcessor::Status IMergingTransformBase::prepareInitializeInputs()
/// setNotNeeded after reading first chunk, because in optimismtic case
/// (e.g. with optimized 'ORDER BY primary_key LIMIT n' and small 'n')
/// we won't have to read any chunks anymore;
auto chunk = input.pull(limit_hint != 0);
if ((limit_hint && chunk.getNumRows() < limit_hint) || always_read_till_end)
/// If virtual row exists, let it pass through, so don't read more chunks.
auto chunk = input.pull(true);
bool virtual_row = isVirtualRow(chunk);
if (limit_hint == 0 && !virtual_row)
input.setNeeded();
if (!chunk.hasRows())
if (!virtual_row && ((limit_hint && chunk.getNumRows() < limit_hint) || always_read_till_end))
input.setNeeded();
if (!virtual_row && !chunk.hasRows())
{
if (!input.isFinished())
{

View File

@ -22,6 +22,7 @@ MergingSortedTransform::MergingSortedTransform(
bool always_read_till_end_,
WriteBuffer * out_row_sources_buf_,
bool use_average_block_sizes,
bool apply_virtual_row_conversions,
bool have_all_inputs_)
: IMergingTransform(
num_inputs,
@ -38,7 +39,8 @@ MergingSortedTransform::MergingSortedTransform(
sorting_queue_strategy,
limit_,
out_row_sources_buf_,
use_average_block_sizes)
use_average_block_sizes,
apply_virtual_row_conversions)
{
}

View File

@ -22,6 +22,7 @@ public:
bool always_read_till_end_ = false,
WriteBuffer * out_row_sources_buf_ = nullptr,
bool use_average_block_sizes = false,
bool apply_virtual_row_conversions = true,
bool have_all_inputs_ = true);
String getName() const override { return "MergingSortedTransform"; }

View File

@ -1,4 +1,5 @@
#include <Processors/QueryPlan/BufferChunksTransform.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB
{
@ -48,14 +49,27 @@ IProcessor::Status BufferChunksTransform::prepare()
}
else if (input.hasData())
{
auto chunk = pullChunk();
bool virtual_row;
auto chunk = pullChunk(virtual_row);
output.push(std::move(chunk));
if (virtual_row)
{
input.setNotNeeded();
return Status::PortFull;
}
}
}
if (input.hasData() && (num_buffered_rows < max_rows_to_buffer || num_buffered_bytes < max_bytes_to_buffer))
{
auto chunk = pullChunk();
bool virtual_row;
auto chunk = pullChunk(virtual_row);
if (virtual_row)
{
output.push(std::move(chunk));
input.setNotNeeded();
return Status::PortFull;
}
num_buffered_rows += chunk.getNumRows();
num_buffered_bytes += chunk.bytes();
chunks.push(std::move(chunk));
@ -71,10 +85,12 @@ IProcessor::Status BufferChunksTransform::prepare()
return Status::NeedData;
}
Chunk BufferChunksTransform::pullChunk()
Chunk BufferChunksTransform::pullChunk(bool & virtual_row)
{
auto chunk = input.pull();
num_processed_rows += chunk.getNumRows();
virtual_row = isVirtualRow(chunk);
if (!virtual_row)
num_processed_rows += chunk.getNumRows();
if (limit && num_processed_rows >= limit)
input.close();

View File

@ -24,7 +24,7 @@ public:
String getName() const override { return "BufferChunks"; }
private:
Chunk pullChunk();
Chunk pullChunk(bool & virtual_row);
InputPort & input;
OutputPort & output;

View File

@ -6,12 +6,23 @@
namespace DB
{
namespace Setting
{
extern const SettingsBool query_plan_merge_filters;
}
BuildQueryPipelineSettings BuildQueryPipelineSettings::fromContext(ContextPtr from)
{
const auto & query_settings = from->getSettingsRef();
BuildQueryPipelineSettings settings;
settings.actions_settings = ExpressionActionsSettings::fromSettings(from->getSettingsRef(), CompileExpressions::yes);
settings.actions_settings = ExpressionActionsSettings::fromSettings(query_settings, CompileExpressions::yes);
settings.process_list_element = from->getProcessListElement();
settings.progress_callback = from->getProgressCallback();
/// Setting query_plan_merge_filters is enabled by default.
/// But it can brake short-circuit without splitting filter step into smaller steps.
/// So, enable and disable this optimizations together.
settings.enable_multiple_filters_transforms_for_and_chain = query_settings[Setting::query_plan_merge_filters];
return settings;
}

View File

@ -17,6 +17,8 @@ using TemporaryFileLookupPtr = std::shared_ptr<ITemporaryFileLookup>;
struct BuildQueryPipelineSettings
{
bool enable_multiple_filters_transforms_for_and_chain = true;
ExpressionActionsSettings actions_settings;
QueryStatusPtr process_list_element;
ProgressCallback progress_callback = nullptr;

View File

@ -5,6 +5,11 @@
#include <Interpreters/ExpressionActions.h>
#include <IO/Operators.h>
#include <Common/JSONBuilder.h>
#include <DataTypes/DataTypeLowCardinality.h>
#include <DataTypes/DataTypesNumber.h>
#include <Functions/IFunction.h>
#include <stack>
#include <ranges>
namespace DB
{
@ -24,6 +29,92 @@ static ITransformingStep::Traits getTraits()
};
}
static bool isTrivialSubtree(const ActionsDAG::Node * node)
{
while (node->type == ActionsDAG::ActionType::ALIAS)
node = node->children.at(0);
return node->type != ActionsDAG::ActionType::FUNCTION && node->type != ActionsDAG::ActionType::ARRAY_JOIN;
}
struct ActionsAndName
{
ActionsDAG dag;
std::string name;
};
static ActionsAndName splitSingleAndFilter(ActionsDAG & dag, const ActionsDAG::Node * filter_node)
{
auto split_result = dag.split({filter_node}, true);
dag = std::move(split_result.second);
const auto * split_filter_node = split_result.split_nodes_mapping[filter_node];
auto filter_type = removeLowCardinality(split_filter_node->result_type);
if (!filter_type->onlyNull() && !isUInt8(removeNullable(filter_type)))
{
DataTypePtr cast_type = std::make_shared<DataTypeUInt8>();
if (filter_type->isNullable())
cast_type = std::make_shared<DataTypeNullable>(std::move(cast_type));
split_filter_node = &split_result.first.addCast(*split_filter_node, cast_type, {});
}
split_result.first.getOutputs().emplace(split_result.first.getOutputs().begin(), split_filter_node);
auto name = split_filter_node->result_name;
return ActionsAndName{std::move(split_result.first), std::move(name)};
}
/// Try to split the left most AND atom to a separate DAG.
static std::optional<ActionsAndName> trySplitSingleAndFilter(ActionsDAG & dag, const std::string & filter_name)
{
const auto * filter = &dag.findInOutputs(filter_name);
while (filter->type == ActionsDAG::ActionType::ALIAS)
filter = filter->children.at(0);
if (filter->type != ActionsDAG::ActionType::FUNCTION || filter->function_base->getName() != "and")
return {};
const ActionsDAG::Node * condition_to_split = nullptr;
std::stack<const ActionsDAG::Node *> nodes;
nodes.push(filter);
while (!nodes.empty())
{
const auto * node = nodes.top();
nodes.pop();
if (node->type == ActionsDAG::ActionType::FUNCTION && node->function_base->getName() == "and")
{
/// The order is important. We should take the left-most atom, so put conditions on stack in reverse order.
for (const auto * child : node->children | std::ranges::views::reverse)
nodes.push(child);
continue;
}
if (isTrivialSubtree(node))
continue;
/// Do not split subtree if it's the last non-trivial one.
/// So, split the first found condition only when there is a another one found.
if (condition_to_split)
return splitSingleAndFilter(dag, condition_to_split);
condition_to_split = node;
}
return {};
}
std::vector<ActionsAndName> splitAndChainIntoMultipleFilters(ActionsDAG & dag, const std::string & filter_name)
{
std::vector<ActionsAndName> res;
while (auto condition = trySplitSingleAndFilter(dag, filter_name))
res.push_back(std::move(*condition));
return res;
}
FilterStep::FilterStep(
const Header & input_header_,
ActionsDAG actions_dag_,
@ -50,6 +141,23 @@ FilterStep::FilterStep(
void FilterStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQueryPipelineSettings & settings)
{
std::vector<ActionsAndName> and_atoms;
/// Splitting AND filter condition to steps under the setting, which is enabled with merge_filters optimization.
/// This is needed to support short-circuit properly.
if (settings.enable_multiple_filters_transforms_for_and_chain && !actions_dag.hasStatefulFunctions())
and_atoms = splitAndChainIntoMultipleFilters(actions_dag, filter_column_name);
for (auto & and_atom : and_atoms)
{
auto expression = std::make_shared<ExpressionActions>(std::move(and_atom.dag), settings.getActionsSettings());
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type)
{
bool on_totals = stream_type == QueryPipelineBuilder::StreamType::Totals;
return std::make_shared<FilterTransform>(header, expression, and_atom.name, true, on_totals);
});
}
auto expression = std::make_shared<ExpressionActions>(std::move(actions_dag), settings.getActionsSettings());
pipeline.addSimpleTransform([&](const Block & header, QueryPipelineBuilder::StreamType stream_type)
@ -76,18 +184,45 @@ void FilterStep::transformPipeline(QueryPipelineBuilder & pipeline, const BuildQ
void FilterStep::describeActions(FormatSettings & settings) const
{
String prefix(settings.offset, settings.indent_char);
auto cloned_dag = actions_dag.clone();
std::vector<ActionsAndName> and_atoms;
if (!actions_dag.hasStatefulFunctions())
and_atoms = splitAndChainIntoMultipleFilters(cloned_dag, filter_column_name);
for (auto & and_atom : and_atoms)
{
auto expression = std::make_shared<ExpressionActions>(std::move(and_atom.dag));
settings.out << prefix << "AND column: " << and_atom.name << '\n';
expression->describeActions(settings.out, prefix);
}
settings.out << prefix << "Filter column: " << filter_column_name;
if (remove_filter_column)
settings.out << " (removed)";
settings.out << '\n';
auto expression = std::make_shared<ExpressionActions>(actions_dag.clone());
auto expression = std::make_shared<ExpressionActions>(std::move(cloned_dag));
expression->describeActions(settings.out, prefix);
}
void FilterStep::describeActions(JSONBuilder::JSONMap & map) const
{
auto cloned_dag = actions_dag.clone();
std::vector<ActionsAndName> and_atoms;
if (!actions_dag.hasStatefulFunctions())
and_atoms = splitAndChainIntoMultipleFilters(cloned_dag, filter_column_name);
for (auto & and_atom : and_atoms)
{
auto expression = std::make_shared<ExpressionActions>(std::move(and_atom.dag));
map.add("AND column", and_atom.name);
map.add("Expression", expression->toTree());
}
map.add("Filter Column", filter_column_name);
map.add("Removes Filter", remove_filter_column);

View File

@ -32,7 +32,7 @@ struct QueryPlanOptimizationSettings
bool merge_expressions = true;
/// If merge-filters optimization is enabled.
bool merge_filters = false;
bool merge_filters = true;
/// If filter push down optimization is enabled.
bool filter_push_down = true;

View File

@ -211,6 +211,8 @@ MatchedTrees::Matches matchTrees(const ActionsDAG::NodeRawConstPtrs & inner_dag,
MatchedTrees::Monotonicity monotonicity;
monotonicity.direction *= info.is_positive ? 1 : -1;
monotonicity.strict = info.is_strict;
monotonicity.child_match = &child_match;
monotonicity.child_node = monotonic_child;
if (child_match.monotonicity)
{

View File

@ -22,12 +22,16 @@ namespace DB
/// DAG for PK does not contain aliases and ambiguous nodes.
struct MatchedTrees
{
struct Match;
/// Monotonicity is calculated for monotonic functions chain.
/// Chain is not strict if there is any non-strict monotonic function.
struct Monotonicity
{
int direction = 1;
bool strict = true;
const Match * child_match = nullptr;
const ActionsDAG::Node * child_node = nullptr;
};
struct Match

View File

@ -124,7 +124,7 @@ SortingProperty applyOrder(QueryPlan::Node * parent, SortingProperty * propertie
auto common_prefix = commonPrefix(properties->sort_description, sorting_step->getSortDescription());
if (!common_prefix.empty())
/// Buffering is useful for reading from MergeTree, and it is applied in optimizeReadInOrder only.
sorting_step->convertToFinishSorting(common_prefix, /*use_buffering*/ false);
sorting_step->convertToFinishSorting(common_prefix, /*use_buffering*/ false, false);
}
auto scope = sorting_step->hasPartitions() ? SortingProperty::SortScope::Stream : SortingProperty::SortScope::Global;

View File

@ -324,12 +324,43 @@ void enrichFixedColumns(const ActionsDAG & dag, FixedColumns & fixed_columns)
}
}
const ActionsDAG::Node * addMonotonicChain(ActionsDAG & dag, const ActionsDAG::Node * node, const MatchedTrees::Match * match, const std::string & input_name)
{
if (!match->monotonicity)
return &dag.addInput(input_name, node->result_type);
if (node->type == ActionsDAG::ActionType::ALIAS)
return &dag.addAlias(*addMonotonicChain(dag, node->children.front(), match, input_name), node->result_name);
ActionsDAG::NodeRawConstPtrs args;
args.reserve(node->children.size());
for (const auto * child : node->children)
{
if (child == match->monotonicity->child_node)
args.push_back(addMonotonicChain(dag, match->monotonicity->child_node, match->monotonicity->child_match, input_name));
else
args.push_back(&dag.addColumn({child->column, child->result_type, child->result_name}));
}
return &dag.addFunction(node->function_base, std::move(args), {});
}
struct SortingInputOrder
{
InputOrderInfoPtr input_order{};
/// This is needed for virtual row optimization.
/// Convert the PR values to ORDER BY key.
/// If empty, the optimization cannot be applied.
std::optional<ActionsDAG> virtual_row_conversion{};
};
/// For the case when the order of keys is important (ORDER BY keys).
InputOrderInfoPtr buildInputOrderFromSortDescription(
SortingInputOrder buildInputOrderFromSortDescription(
const FixedColumns & fixed_columns,
const std::optional<ActionsDAG> & dag,
const SortDescription & description,
const KeyDescription & sorting_key,
const Names & pk_column_names,
size_t limit)
{
//std::cerr << "------- buildInputOrderInfo " << std::endl;
@ -369,6 +400,18 @@ InputOrderInfoPtr buildInputOrderFromSortDescription(
size_t next_description_column = 0;
size_t next_sort_key = 0;
bool can_optimize_virtual_row = true;
struct MatchInfo
{
const ActionsDAG::Node * source = nullptr;
const ActionsDAG::Node * fixed_column = nullptr;
const MatchedTrees::Match * monotonic = nullptr;
};
std::vector<MatchInfo> match_infos;
match_infos.reserve(description.size());
while (next_description_column < description.size() && next_sort_key < sorting_key.column_names.size())
{
const auto & sorting_key_column = sorting_key.column_names[next_sort_key];
@ -410,6 +453,7 @@ InputOrderInfoPtr buildInputOrderFromSortDescription(
//std::cerr << "====== (no dag) Found direct match" << std::endl;
match_infos.push_back({.source = sort_column_node});
++next_description_column;
++next_sort_key;
}
@ -438,24 +482,46 @@ InputOrderInfoPtr buildInputOrderFromSortDescription(
{
current_direction *= match.monotonicity->direction;
strict_monotonic = match.monotonicity->strict;
match_infos.push_back({.source = sort_node, .monotonic = &match});
}
else
match_infos.push_back({.source = sort_node});
++next_description_column;
++next_sort_key;
}
else if (fixed_key_columns.contains(sort_column_node))
{
if (next_sort_key == 0)
{
// Disable virtual row optimization.
// For example, when pk is (a,b), a = 1, order by b, virtual row should be
// disabled in the following case:
// 1st part (0, 100), (1, 2), (1, 3), (1, 4)
// 2nd part (0, 100), (1, 2), (1, 3), (1, 4).
can_optimize_virtual_row = false;
}
//std::cerr << "+++++++++ Found fixed key by match" << std::endl;
++next_sort_key;
}
else
{
//std::cerr << "====== Check for fixed const : " << bool(sort_node->column) << " fixed : " << fixed_columns.contains(sort_node) << std::endl;
bool is_fixed_column = sort_node->column || fixed_columns.contains(sort_node);
if (!is_fixed_column)
break;
if (!sort_node->column)
/// Virtual row for fixed column from order by is not supported now.
/// TODO: we can do it for the simple case,
/// But it's better to remove fixed columns from ORDER BY completely, e.g:
/// WHERE x = 42 ORDER BY x, y => WHERE x = 42 ORDER BY y
can_optimize_virtual_row = false;
match_infos.push_back({.source = sort_node, .fixed_column = sort_node});
order_key_prefix_descr.push_back(sort_column_description);
++next_description_column;
}
@ -477,9 +543,46 @@ InputOrderInfoPtr buildInputOrderFromSortDescription(
}
if (read_direction == 0 || order_key_prefix_descr.empty())
return nullptr;
return {};
return std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, read_direction, limit);
/// If the prefix description is used, we can't restore the full description from PK value.
/// TODO: partial sort description can be used as well. Implement support later.
if (order_key_prefix_descr.size() < description.size() || pk_column_names.size() < next_sort_key)
can_optimize_virtual_row = false;
auto order_info = std::make_shared<InputOrderInfo>(order_key_prefix_descr, next_sort_key, read_direction, limit);
std::optional<ActionsDAG> virtual_row_conversion;
if (can_optimize_virtual_row)
{
ActionsDAG virtual_row_dag;
virtual_row_dag.getOutputs().reserve(match_infos.size());
size_t next_pk_name = 0;
for (const auto & info : match_infos)
{
const ActionsDAG::Node * output;
if (info.fixed_column)
output = &virtual_row_dag.addColumn({info.fixed_column->column, info.fixed_column->result_type, info.fixed_column->result_name});
else
{
if (info.monotonic)
output = addMonotonicChain(virtual_row_dag, info.source, info.monotonic, pk_column_names[next_pk_name]);
else
{
output = &virtual_row_dag.addInput(pk_column_names[next_pk_name], info.source->result_type);
if (pk_column_names[next_pk_name] != info.source->result_name)
output = &virtual_row_dag.addAlias(*output, info.source->result_name);
}
++next_pk_name;
}
virtual_row_dag.getOutputs().push_back(output);
}
virtual_row_conversion = std::move(virtual_row_dag);
}
return {std::move(order_info), std::move(virtual_row_conversion)};
}
/// We may need a few different sort descriptions here.
@ -689,7 +792,7 @@ InputOrder buildInputOrderFromUnorderedKeys(
return { std::move(input_order), std::move(sort_description) }; // std::move(group_by_sort_description) };
}
InputOrderInfoPtr buildInputOrderFromSortDescription(
SortingInputOrder buildInputOrderFromSortDescription(
const ReadFromMergeTree * reading,
const FixedColumns & fixed_columns,
const std::optional<ActionsDAG> & dag,
@ -697,15 +800,17 @@ InputOrderInfoPtr buildInputOrderFromSortDescription(
size_t limit)
{
const auto & sorting_key = reading->getStorageMetadata()->getSortingKey();
const auto & pk_column_names = reading->getStorageMetadata()->getPrimaryKey().column_names;
return buildInputOrderFromSortDescription(
fixed_columns,
dag, description,
sorting_key,
pk_column_names,
limit);
}
InputOrderInfoPtr buildInputOrderFromSortDescription(
SortingInputOrder buildInputOrderFromSortDescription(
ReadFromMerge * merge,
const FixedColumns & fixed_columns,
const std::optional<ActionsDAG> & dag,
@ -714,28 +819,31 @@ InputOrderInfoPtr buildInputOrderFromSortDescription(
{
const auto & tables = merge->getSelectedTables();
InputOrderInfoPtr order_info;
SortingInputOrder order_info;
for (const auto & table : tables)
{
auto storage = std::get<StoragePtr>(table);
const auto & sorting_key = storage->getInMemoryMetadataPtr()->getSortingKey();
auto metadata = storage->getInMemoryMetadataPtr();
const auto & sorting_key = metadata->getSortingKey();
// const auto & pk_column_names = metadata->getPrimaryKey().column_names;
if (sorting_key.column_names.empty())
return nullptr;
return {};
auto table_order_info = buildInputOrderFromSortDescription(
fixed_columns,
dag, description,
sorting_key,
{},
limit);
if (!table_order_info)
return nullptr;
if (!table_order_info.input_order)
return {};
if (!order_info)
order_info = table_order_info;
else if (*order_info != *table_order_info)
return nullptr;
if (!order_info.input_order)
order_info = std::move(table_order_info);
else if (*order_info.input_order != *table_order_info.input_order)
return {};
}
return order_info;
@ -791,7 +899,7 @@ InputOrder buildInputOrderFromUnorderedKeys(
return order_info;
}
InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & node)
InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, bool & apply_virtual_row, QueryPlan::Node & node)
{
QueryPlan::Node * reading_node = findReadingStep(node, /*allow_existing_order=*/ false);
if (!reading_node)
@ -815,14 +923,21 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
dag, description,
limit);
if (order_info)
if (order_info.input_order)
{
bool can_read = reading->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
apply_virtual_row = order_info.virtual_row_conversion != std::nullopt;
bool can_read = reading->requestReadingInOrder(
order_info.input_order->used_prefix_of_sorting_key_size,
order_info.input_order->direction,
order_info.input_order->limit,
std::move(order_info.virtual_row_conversion));
if (!can_read)
return nullptr;
}
return order_info;
return order_info.input_order;
}
if (auto * merge = typeid_cast<ReadFromMerge *>(reading_node->step.get()))
{
@ -832,14 +947,14 @@ InputOrderInfoPtr buildInputOrderInfo(SortingStep & sorting, QueryPlan::Node & n
dag, description,
limit);
if (order_info)
if (order_info.input_order)
{
bool can_read = merge->requestReadingInOrder(order_info);
bool can_read = merge->requestReadingInOrder(order_info.input_order);
if (!can_read)
return nullptr;
}
return order_info;
return order_info.input_order;
}
return nullptr;
@ -873,7 +988,8 @@ InputOrder buildInputOrderInfo(AggregatingStep & aggregating, QueryPlan::Node &
bool can_read = reading->requestReadingInOrder(
order_info.input_order->used_prefix_of_sorting_key_size,
order_info.input_order->direction,
order_info.input_order->limit);
order_info.input_order->limit,
{});
if (!can_read)
return {};
}
@ -962,7 +1078,7 @@ InputOrder buildInputOrderInfo(DistinctStep & distinct, QueryPlan::Node & node)
if (!reading->requestReadingInOrder(
order_info.input_order->used_prefix_of_sorting_key_size,
order_info.input_order->direction,
order_info.input_order->limit))
order_info.input_order->limit, {}))
return {};
return order_info;
@ -1014,6 +1130,8 @@ void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
if (sorting->getType() != SortingStep::Type::Full)
return;
bool apply_virtual_row = false;
if (typeid_cast<UnionStep *>(node.children.front()->step.get()))
{
auto & union_node = node.children.front();
@ -1036,7 +1154,7 @@ void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
for (auto * child : union_node->children)
{
infos.push_back(buildInputOrderInfo(*sorting, *child));
infos.push_back(buildInputOrderInfo(*sorting, apply_virtual_row, *child));
if (infos.back())
{
@ -1088,13 +1206,13 @@ void optimizeReadInOrder(QueryPlan::Node & node, QueryPlan::Nodes & nodes)
}
}
sorting->convertToFinishSorting(*max_sort_descr, use_buffering);
sorting->convertToFinishSorting(*max_sort_descr, use_buffering, false);
}
else if (auto order_info = buildInputOrderInfo(*sorting, *node.children.front()))
else if (auto order_info = buildInputOrderInfo(*sorting, apply_virtual_row, *node.children.front()))
{
/// Use buffering only if have filter or don't have limit.
bool use_buffering = order_info->limit == 0;
sorting->convertToFinishSorting(order_info->sort_description_for_merging, use_buffering);
sorting->convertToFinishSorting(order_info->sort_description_for_merging, use_buffering, apply_virtual_row);
}
}
@ -1233,10 +1351,10 @@ size_t tryReuseStorageOrderingForWindowFunctions(QueryPlan::Node * parent_node,
if (order_info)
{
bool can_read = read_from_merge_tree->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit);
bool can_read = read_from_merge_tree->requestReadingInOrder(order_info->used_prefix_of_sorting_key_size, order_info->direction, order_info->limit, {});
if (!can_read)
return 0;
sorting->convertToFinishSorting(order_info->sort_description_for_merging, false);
sorting->convertToFinishSorting(order_info->sort_description_for_merging, false, false);
}
return 0;

View File

@ -27,6 +27,7 @@
#include <Processors/Transforms/FilterTransform.h>
#include <Processors/Transforms/ReverseTransform.h>
#include <Processors/Transforms/SelectByIndicesTransform.h>
#include <Processors/Transforms/VirtualRowTransform.h>
#include <QueryPipeline/QueryPipelineBuilder.h>
#include <Storages/MergeTree/MergeTreeDataSelectExecutor.h>
#include <Storages/MergeTree/MergeTreeIndexLegacyVectorSimilarity.h>
@ -175,7 +176,9 @@ namespace Setting
extern const SettingsBool use_skip_indexes;
extern const SettingsBool use_skip_indexes_if_final;
extern const SettingsBool use_uncompressed_cache;
extern const SettingsBool query_plan_merge_filters;
extern const SettingsUInt64 merge_tree_min_read_task_size;
extern const SettingsBool read_in_order_use_virtual_row;
}
namespace MergeTreeSetting
@ -206,6 +209,7 @@ static MergeTreeReaderSettings getMergeTreeReaderSettings(
.use_asynchronous_read_from_pool = settings[Setting::allow_asynchronous_read_from_io_pool_for_merge_tree]
&& (settings[Setting::max_streams_to_max_threads_ratio] > 1 || settings[Setting::max_streams_for_merge_tree_reading] > 1),
.enable_multiple_prewhere_read_steps = settings[Setting::enable_multiple_prewhere_read_steps],
.force_short_circuit_execution = settings[Setting::query_plan_merge_filters]
};
}
@ -680,7 +684,34 @@ Pipe ReadFromMergeTree::readInOrder(
if (set_total_rows_approx)
source->addTotalRowsApprox(total_rows);
pipes.emplace_back(std::move(source));
Pipe pipe(source);
if (virtual_row_conversion && (read_type == ReadType::InOrder))
{
const auto & index = part_with_ranges.data_part->getIndex();
const auto & primary_key = storage_snapshot->metadata->primary_key;
size_t mark_range_begin = part_with_ranges.ranges.front().begin;
ColumnsWithTypeAndName pk_columns;
size_t num_columns = virtual_row_conversion->getRequiredColumnsWithTypes().size();
pk_columns.reserve(num_columns);
for (size_t j = 0; j < num_columns; ++j)
{
auto column = primary_key.data_types[j]->createColumn()->cloneEmpty();
column->insert((*(*index)[j])[mark_range_begin]);
pk_columns.push_back({std::move(column), primary_key.data_types[j], primary_key.column_names[j]});
}
Block pk_block(std::move(pk_columns));
pipe.addSimpleTransform([&](const Block & header)
{
return std::make_shared<VirtualRowTransform>(header, pk_block, virtual_row_conversion);
});
}
pipes.emplace_back(std::move(pipe));
}
auto pipe = Pipe::unitePipes(std::move(pipes));
@ -1140,7 +1171,8 @@ Pipe ReadFromMergeTree::spreadMarkRangesAmongStreamsWithOrder(
if (pipe.numOutputPorts() > 1)
{
auto transform = std::make_shared<MergingSortedTransform>(
pipe.getHeader(), pipe.numOutputPorts(), sort_description, block_size.max_block_size_rows, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch);
pipe.getHeader(), pipe.numOutputPorts(), sort_description, block_size.max_block_size_rows, /*max_block_size_bytes=*/0, SortingQueueStrategy::Batch,
0, false, nullptr, false, /*apply_virtual_row_conversions*/ false);
pipe.addTransform(std::move(transform));
}
@ -1799,7 +1831,7 @@ void ReadFromMergeTree::updateSortDescription()
enable_vertical_final);
}
bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, size_t read_limit)
bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction, size_t read_limit, std::optional<ActionsDAG> virtual_row_conversion_)
{
/// if dirction is not set, use current one
if (!direction)
@ -1822,6 +1854,10 @@ bool ReadFromMergeTree::requestReadingInOrder(size_t prefix_size, int direction,
/// Let prefer in-order optimization over vertical FINAL for now
enable_vertical_final = false;
/// Disable virtual row for FINAL.
if (virtual_row_conversion_ && !isQueryWithFinal() && context->getSettingsRef()[Setting::read_in_order_use_virtual_row])
virtual_row_conversion = std::make_shared<ExpressionActions>(std::move(*virtual_row_conversion_));
updateSortDescription();
return true;
@ -2238,6 +2274,12 @@ void ReadFromMergeTree::describeActions(FormatSettings & format_settings) const
expression->describeActions(format_settings.out, prefix);
}
}
if (virtual_row_conversion)
{
format_settings.out << prefix << "Virtual row conversions" << '\n';
virtual_row_conversion->describeActions(format_settings.out, prefix);
}
}
void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
@ -2277,6 +2319,9 @@ void ReadFromMergeTree::describeActions(JSONBuilder::JSONMap & map) const
map.add("Prewhere info", std::move(prewhere_info_map));
}
if (virtual_row_conversion)
map.add("Virtual row conversions", virtual_row_conversion->toTree());
}
void ReadFromMergeTree::describeIndexes(FormatSettings & format_settings) const

View File

@ -187,7 +187,7 @@ public:
StorageMetadataPtr getStorageMetadata() const { return storage_snapshot->metadata; }
/// Returns `false` if requested reading cannot be performed.
bool requestReadingInOrder(size_t prefix_size, int direction, size_t limit);
bool requestReadingInOrder(size_t prefix_size, int direction, size_t limit, std::optional<ActionsDAG> virtual_row_conversion_);
bool readsInOrder() const;
const InputOrderInfoPtr & getInputOrder() const { return query_info.input_order_info; }
const SortDescription & getSortDescription() const override { return result_sort_description; }
@ -281,6 +281,9 @@ private:
std::optional<MergeTreeReadTaskCallback> read_task_callback;
bool enable_vertical_final = false;
bool enable_remove_parts_from_snapshot_optimization = true;
ExpressionActionsPtr virtual_row_conversion;
std::optional<size_t> number_of_current_replica;
};

View File

@ -145,11 +145,12 @@ void SortingStep::updateLimit(size_t limit_)
}
}
void SortingStep::convertToFinishSorting(SortDescription prefix_description_, bool use_buffering_)
void SortingStep::convertToFinishSorting(SortDescription prefix_description_, bool use_buffering_, bool apply_virtual_row_conversions_)
{
type = Type::FinishSorting;
prefix_description = std::move(prefix_description_);
use_buffering = use_buffering_;
apply_virtual_row_conversions = apply_virtual_row_conversions_;
}
void SortingStep::scatterByPartitionIfNeeded(QueryPipelineBuilder& pipeline)
@ -253,7 +254,10 @@ void SortingStep::mergingSorted(QueryPipelineBuilder & pipeline, const SortDescr
/*max_block_size_bytes=*/0,
SortingQueueStrategy::Batch,
limit_,
always_read_till_end);
always_read_till_end,
nullptr,
false,
apply_virtual_row_conversions);
pipeline.addTransform(std::move(transform));
}

View File

@ -84,7 +84,7 @@ public:
bool isSortingForMergeJoin() const { return is_sorting_for_merge_join; }
void convertToFinishSorting(SortDescription prefix_description, bool use_buffering_);
void convertToFinishSorting(SortDescription prefix_description, bool use_buffering_, bool apply_virtual_row_conversions_);
Type getType() const { return type; }
const Settings & getSettings() const { return sort_settings; }
@ -134,6 +134,7 @@ private:
UInt64 limit;
bool always_read_till_end = false;
bool use_buffering = false;
bool apply_virtual_row_conversions = false;
Settings sort_settings;
};

View File

@ -187,6 +187,7 @@ void MergeSortingTransform::consume(Chunk chunk)
{
bool have_all_inputs = false;
bool use_average_block_sizes = false;
bool apply_virtual_row = false;
external_merging_sorted = std::make_shared<MergingSortedTransform>(
header_without_constants,
@ -199,6 +200,7 @@ void MergeSortingTransform::consume(Chunk chunk)
/*always_read_till_end_=*/ false,
nullptr,
use_average_block_sizes,
apply_virtual_row,
have_all_inputs);
processors.emplace_back(external_merging_sorted);

View File

@ -0,0 +1,111 @@
#include <Processors/Transforms/VirtualRowTransform.h>
#include <Processors/Merges/Algorithms/MergeTreeReadInfo.h>
namespace DB
{
namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}
VirtualRowTransform::VirtualRowTransform(const Block & header_, const Block & pk_block_, ExpressionActionsPtr virtual_row_conversions_)
: IProcessor({header_}, {header_})
, input(inputs.front()), output(outputs.front())
, pk_block(pk_block_)
, virtual_row_conversions(std::move(virtual_row_conversions_))
{
}
VirtualRowTransform::Status VirtualRowTransform::prepare()
{
/// Check can output.
if (output.isFinished())
{
input.close();
return Status::Finished;
}
if (!output.canPush())
{
input.setNotNeeded();
return Status::PortFull;
}
/// Output if has data.
if (generated)
{
output.push(std::move(current_chunk));
generated = false;
return Status::PortFull;
}
if (can_generate)
return Status::Ready;
/// Check can input.
if (!has_input)
{
if (input.isFinished())
{
output.finish();
return Status::Finished;
}
input.setNeeded();
if (!input.hasData())
return Status::NeedData;
/// Set input port NotNeeded after chunk was pulled.
current_chunk = input.pull(true);
has_input = true;
}
/// Now transform.
return Status::Ready;
}
void VirtualRowTransform::work()
{
if (can_generate)
{
if (generated)
throw Exception(ErrorCodes::LOGICAL_ERROR, "VirtualRowTransform cannot consume chunk because it already was generated");
generated = true;
can_generate = false;
if (!is_first)
{
if (current_chunk.empty())
throw Exception(ErrorCodes::LOGICAL_ERROR, "Can't generate chunk in VirtualRowTransform");
return;
}
is_first = false;
Columns empty_columns;
const auto & header = getOutputs().front().getHeader();
empty_columns.reserve(header.columns());
for (size_t i = 0; i < header.columns(); ++i)
{
const ColumnWithTypeAndName & type_and_name = header.getByPosition(i);
empty_columns.push_back(type_and_name.type->createColumn()->cloneEmpty());
}
current_chunk.setColumns(empty_columns, 0);
current_chunk.getChunkInfos().add(std::make_shared<MergeTreeReadInfo>(0, pk_block, virtual_row_conversions));
}
else
{
if (!has_input)
throw Exception(ErrorCodes::LOGICAL_ERROR, "VirtualRowTransform cannot consume chunk because it wasn't read");
has_input = false;
can_generate = true;
}
}
}

View File

@ -0,0 +1,35 @@
#pragma once
#include <Processors/IProcessor.h>
#include <Storages/KeyDescription.h>
#include <Storages/MergeTree/IMergeTreeDataPart.h>
namespace DB
{
/// Virtual row is useful for read-in-order optimization when multiple parts exist.
class VirtualRowTransform : public IProcessor
{
public:
explicit VirtualRowTransform(const Block & header_, const Block & pk_block_, ExpressionActionsPtr virtual_row_conversions_);
String getName() const override { return "VirtualRowTransform"; }
Status prepare() override;
void work() override;
private:
InputPort & input;
OutputPort & output;
Chunk current_chunk;
bool has_input = false;
bool generated = false;
bool can_generate = true;
bool is_first = true;
Block pk_block;
ExpressionActionsPtr virtual_row_conversions;
};
}

View File

@ -10,8 +10,8 @@ namespace DB
/**
* Base cluster for Storages used in table functions like s3Cluster and hdfsCluster
* Needed for code simplification around parallel_distributed_insert_select
* Base cluster for Storages used in table functions like s3Cluster and hdfsCluster.
* Necessary for code simplification around parallel_distributed_insert_select.
*/
class IStorageCluster : public IStorage
{

Some files were not shown because too many files have changed in this diff Show More