diff --git a/README.md b/README.md
index abaf27abf11..3270cd19671 100644
--- a/README.md
+++ b/README.md
@@ -42,7 +42,6 @@ Keep an eye out for upcoming meetups and events around the world. Somewhere else
Upcoming meetups
-* [Barcelona Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096876/) - November 12
* [Ghent Meetup](https://www.meetup.com/clickhouse-belgium-user-group/events/303049405/) - November 19
* [Dubai Meetup](https://www.meetup.com/clickhouse-dubai-meetup-group/events/303096989/) - November 21
* [Paris Meetup](https://www.meetup.com/clickhouse-france-user-group/events/303096434) - November 26
@@ -53,6 +52,7 @@ Upcoming meetups
Recently completed meetups
+* [Barcelona Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096876/) - November 12
* [Madrid Meetup](https://www.meetup.com/clickhouse-spain-user-group/events/303096564/) - October 22
* [Singapore Meetup](https://www.meetup.com/clickhouse-singapore-meetup-group/events/303212064/) - October 3
* [Jakarta Meetup](https://www.meetup.com/clickhouse-indonesia-user-group/events/303191359/) - October 1
diff --git a/base/base/defines.h b/base/base/defines.h
index 5685a6d9833..a0c3c0d1de5 100644
--- a/base/base/defines.h
+++ b/base/base/defines.h
@@ -145,6 +145,7 @@
#define TSA_TRY_ACQUIRE_SHARED(...) __attribute__((try_acquire_shared_capability(__VA_ARGS__))) /// function tries to acquire a shared capability and returns a boolean value indicating success or failure
#define TSA_RELEASE_SHARED(...) __attribute__((release_shared_capability(__VA_ARGS__))) /// function releases the given shared capability
#define TSA_SCOPED_LOCKABLE __attribute__((scoped_lockable)) /// object of a class has scoped lockable capability
+#define TSA_RETURN_CAPABILITY(...) __attribute__((lock_returned(__VA_ARGS__))) /// to return capabilities in functions
/// Macros for suppressing TSA warnings for specific reads/writes (instead of suppressing it for the whole function)
/// They use a lambda function to apply function attribute to a single statement. This enable us to suppress warnings locally instead of
diff --git a/docker/server/Dockerfile.ubuntu b/docker/server/Dockerfile.ubuntu
index 0fe9a409ee4..e6bde845c4e 100644
--- a/docker/server/Dockerfile.ubuntu
+++ b/docker/server/Dockerfile.ubuntu
@@ -113,7 +113,9 @@ RUN clickhouse local -q 'SELECT 1' >/dev/null 2>&1 && exit 0 || : \
/var/lib/apt/lists/* \
/var/cache/debconf \
/tmp/* \
- && apt-get autoremove --purge -yq dirmngr gnupg2
+ && apt-get autoremove --purge -yq dirmngr gnupg2 \
+ && chmod ugo+Xrw -R /etc/clickhouse-server /etc/clickhouse-client
+# The last chmod is here to make the next one is No-op in docker official library Dockerfile
# post install
# we need to allow "others" access to clickhouse folder, because docker container
diff --git a/docker/server/entrypoint.sh b/docker/server/entrypoint.sh
index 2f87008f2e5..947244dd97f 100755
--- a/docker/server/entrypoint.sh
+++ b/docker/server/entrypoint.sh
@@ -162,7 +162,7 @@ if [ -n "${RUN_INITDB_SCRIPTS}" ]; then
tries=${CLICKHOUSE_INIT_TIMEOUT:-1000}
while ! wget --spider --no-check-certificate -T 1 -q "$URL" 2>/dev/null; do
if [ "$tries" -le "0" ]; then
- echo >&2 'ClickHouse init process failed.'
+ echo >&2 'ClickHouse init process timeout.'
exit 1
fi
tries=$(( tries-1 ))
diff --git a/docs/en/getting-started/example-datasets/tpch.md b/docs/en/getting-started/example-datasets/tpch.md
index de2c425b402..3ea4bffec38 100644
--- a/docs/en/getting-started/example-datasets/tpch.md
+++ b/docs/en/getting-started/example-datasets/tpch.md
@@ -46,7 +46,7 @@ Detailed table sizes with scale factor 100:
| orders | 150.000.000 | 6.15 GB |
| lineitem | 600.00.00 | 26.69 GB |
-(The table sizes in ClickHouse are taken from `system.tables.total_bytes` and based on below table definitions.
+(Compressed sizes in ClickHouse are taken from `system.tables.total_bytes` and based on below table definitions.)
Now create tables in ClickHouse.
diff --git a/docs/en/operations/server-configuration-parameters/settings.md b/docs/en/operations/server-configuration-parameters/settings.md
index c5f92ccdf68..ca4938b1a47 100644
--- a/docs/en/operations/server-configuration-parameters/settings.md
+++ b/docs/en/operations/server-configuration-parameters/settings.md
@@ -597,6 +597,30 @@ If number of tables is greater than this value, server will throw an exception.
400
```
+## max\_replicated\_table\_num\_to\_throw {#max-replicated-table-num-to-throw}
+If number of replicated tables is greater than this value, server will throw an exception. 0 means no limitation. Only count table in Atomic/Ordinary/Replicated/Lazy database engine.
+
+**Example**
+```xml
+400
+```
+
+## max\_dictionary\_num\_to\_throw {#max-dictionary-num-to-throw}
+If number of dictionaries is greater than this value, server will throw an exception. 0 means no limitation. Only count table in Atomic/Ordinary/Replicated/Lazy database engine.
+
+**Example**
+```xml
+400
+```
+
+## max\_view\_num\_to\_throw {#max-view-num-to-throw}
+If number of views is greater than this value, server will throw an exception. 0 means no limitation. Only count table in Atomic/Ordinary/Replicated/Lazy database engine.
+
+**Example**
+```xml
+400
+```
+
## max\_database\_num\_to\_throw {#max-table-num-to-throw}
If number of _database is greater than this value, server will throw an exception. 0 means no limitation.
Default value: 0
diff --git a/docs/en/sql-reference/aggregate-functions/reference/index.md b/docs/en/sql-reference/aggregate-functions/reference/index.md
index d7b287f764b..ee8f0d5882e 100644
--- a/docs/en/sql-reference/aggregate-functions/reference/index.md
+++ b/docs/en/sql-reference/aggregate-functions/reference/index.md
@@ -7,115 +7,4 @@ toc_hidden: true
# List of Aggregate Functions
-ClickHouse supports all standard SQL functions (sum, avg, min, max, count) and a wide range of aggregate functions for various applications:
-
-- [aggThrow](../reference/aggthrow.md)
-- [analysisOfVariance](../reference/analysis_of_variance.md)
-- [anyHeavy](../reference/anyheavy.md)
-- [anyLast](../reference/anylast.md)
-- [any](../reference/any.md)
-- [argMax](../reference/argmax.md)
-- [argMin](../reference/argmin.md)
-- [avgWeighted](../reference/avgweighted.md)
-- [avg](../reference/avg.md)
-- [boundingRatio](../reference/boundrat.md)
-- [categoricalInformationValue](../reference/categoricalinformationvalue.md)
-- [contingency](../reference/contingency.md)
-- [corrMatrix](../reference/corrmatrix.md)
-- [corr](../reference/corr.md)
-- [corr](../reference/corrstable.md)
-- [count](../reference/count.md)
-- [covarPopMatrix](../reference/covarpopmatrix.md)
-- [covarPop](../reference/covarpop.md)
-- [covarSampMatrix](../reference/covarsampmatrix.md)
-- [covarSampStable](../reference/covarsampstable.md)
-- [covarSamp](../reference/covarsamp.md)
-- [covarStable](../reference/covarpopstable.md)
-- [cramersVBiasCorrected](../reference/cramersvbiascorrected.md)
-- [cramersV](../reference/cramersv.md)
-- [deltaSumTimestamp](../reference/deltasumtimestamp.md)
-- [deltaSum](../reference/deltasum.md)
-- [entropy](../reference/entropy.md)
-- [exponentialMovingAverage](../reference/exponentialmovingaverage.md)
-- [first_value](../reference/first_value.md)
-- [flameGraph](../reference/flame_graph.md)
-- [groupArrayInsertAt](../reference/grouparrayinsertat.md)
-- [groupArrayIntersect](../reference/grouparrayintersect.md)
-- [groupArrayLast](../reference/grouparraylast.md)
-- [groupArrayMovingAvg](../reference/grouparraymovingavg.md)
-- [groupArrayMovingSum](../reference/grouparraymovingsum.md)
-- [groupArraySample](../reference/grouparraysample.md)
-- [groupArraySorted](../reference/grouparraysorted.md)
-- [groupArray](../reference/grouparray.md)
-- [groupBitAnd](../reference/groupbitand.md)
-- [groupBitOr](../reference/groupbitor.md)
-- [groupBitXor](../reference/groupbitxor.md)
-- [groupBitmapAnd](../reference/groupbitmapand.md)
-- [groupBitmapOr](../reference/groupbitmapor.md)
-- [groupBitmapXor](../reference/groupbitmapxor.md)
-- [groupBitmap](../reference/groupbitmap.md)
-- [groupUniqArray](../reference/groupuniqarray.md)
-- [intervalLengthSum](../reference/intervalLengthSum.md)
-- [kolmogorovSmirnovTest](../reference/kolmogorovsmirnovtest.md)
-- [kurtPop](../reference/kurtpop.md)
-- [kurtSamp](../reference/kurtsamp.md)
-- [largestTriangleThreeBuckets](../reference/largestTriangleThreeBuckets.md)
-- [last_value](../reference/last_value.md)
-- [mannwhitneyutest](../reference/mannwhitneyutest.md)
-- [maxIntersectionsPosition](../reference/maxintersectionsposition.md)
-- [maxIntersections](../reference/maxintersections.md)
-- [maxMap](../reference/maxmap.md)
-- [max](../reference/max.md)
-- [meanZTest](../reference/meanztest.md)
-- [median](../reference/median.md)
-- [minMap](../reference/minmap.md)
-- [min](../reference/min.md)
-- [quantileBFloat16Weighted](../reference/quantilebfloat16.md#quantilebfloat16weighted)
-- [quantileBFloat16](../reference/quantilebfloat16.md#quantilebfloat16)
-- [quantileDD](../reference/quantileddsketch.md#quantileddsketch)
-- [quantileDeterministic](../reference/quantiledeterministic.md)
-- [quantileExactHigh](../reference/quantileexact.md#quantileexacthigh)
-- [quantileExactLow](../reference/quantileexact.md#quantileexactlow)
-- [quantileExactWeighted](../reference/quantileexactweighted.md)
-- [quantileExact](../reference/quantileexact.md)
-- [quantileGK](../reference/quantileGK.md)
-- [quantileInterpolatedWeighted](../reference/quantileinterpolatedweighted.md)
-- [quantileTDigestWeighted](../reference/quantiletdigestweighted.md)
-- [quantileTDigest](../reference/quantiletdigest.md)
-- [quantileTimingWeighted](../reference/quantiletimingweighted.md)
-- [quantileTiming](../reference/quantiletiming.md)
-- [quantile](../reference/quantile.md)
-- [quantiles](../reference/quantiles.md)
-- [rankCorr](../reference/rankCorr.md)
-- [simpleLinearRegression](../reference/simplelinearregression.md)
-- [singleValueOrNull](../reference/singlevalueornull.md)
-- [skewPop](../reference/skewpop.md)
-- [skewSamp](../reference/skewsamp.md)
-- [sparkBar](../reference/sparkbar.md)
-- [stddevPopStable](../reference/stddevpopstable.md)
-- [stddevPop](../reference/stddevpop.md)
-- [stddevSampStable](../reference/stddevsampstable.md)
-- [stddevSamp](../reference/stddevsamp.md)
-- [stochasticLinearRegression](../reference/stochasticlinearregression.md)
-- [stochasticLogisticRegression](../reference/stochasticlogisticregression.md)
-- [studentTTest](../reference/studentttest.md)
-- [sumCount](../reference/sumcount.md)
-- [sumKahan](../reference/sumkahan.md)
-- [sumMapFilteredWithOverflow](../parametric-functions.md/#summapfilteredwithoverflow)
-- [sumMapFiltered](../parametric-functions.md/#summapfiltered)
-- [sumMapWithOverflow](../reference/summapwithoverflow.md)
-- [sumMap](../reference/summap.md)
-- [sumWithOverflow](../reference/sumwithoverflow.md)
-- [sum](../reference/sum.md)
-- [theilsU](../reference/theilsu.md)
-- [topKWeighted](../reference/topkweighted.md)
-- [topK](../reference/topk.md)
-- [uniqCombined64](../reference/uniqcombined64.md)
-- [uniqCombined](../reference/uniqcombined.md)
-- [uniqExact](../reference/uniqexact.md)
-- [uniqHLL12](../reference/uniqhll12.md)
-- [uniqTheta](../reference/uniqthetasketch.md)
-- [uniq](../reference/uniq.md)
-- [varPop](../reference/varpop.md)
-- [varSamp](../reference/varsamp.md)
-- [welchTTest](../reference/welchttest.md)
+ClickHouse supports all standard SQL aggregate functions ([sum](../reference/sum.md), [avg](../reference/avg.md), [min](../reference/min.md), [max](../reference/max.md), [count](../reference/count.md)), as well as a wide range of other aggregate functions.
diff --git a/docs/en/sql-reference/data-types/aggregatefunction.md b/docs/en/sql-reference/data-types/aggregatefunction.md
index 37f0d0e50ae..4cad27db68b 100644
--- a/docs/en/sql-reference/data-types/aggregatefunction.md
+++ b/docs/en/sql-reference/data-types/aggregatefunction.md
@@ -6,7 +6,9 @@ sidebar_label: AggregateFunction
# AggregateFunction
-Aggregate functions can have an implementation-defined intermediate state that can be serialized to an `AggregateFunction(...)` data type and stored in a table, usually, by means of [a materialized view](../../sql-reference/statements/create/view.md). The common way to produce an aggregate function state is by calling the aggregate function with the `-State` suffix. To get the final result of aggregation in the future, you must use the same aggregate function with the `-Merge`suffix.
+Aggregate functions have an implementation-defined intermediate state that can be serialized to an `AggregateFunction(...)` data type and stored in a table, usually, by means of [a materialized view](../../sql-reference/statements/create/view.md).
+The common way to produce an aggregate function state is by calling the aggregate function with the `-State` suffix.
+To get the final result of aggregation in the future, you must use the same aggregate function with the `-Merge`suffix.
`AggregateFunction(name, types_of_arguments...)` — parametric data type.
diff --git a/docs/en/sql-reference/data-types/index.md b/docs/en/sql-reference/data-types/index.md
index 2b89dd145e6..134678f71bb 100644
--- a/docs/en/sql-reference/data-types/index.md
+++ b/docs/en/sql-reference/data-types/index.md
@@ -6,29 +6,8 @@ sidebar_position: 1
# Data Types in ClickHouse
-ClickHouse can store various kinds of data in table cells. This section describes the supported data types and special considerations for using and/or implementing them if any.
+This section describes the data types supported by ClickHouse, for example [integers](int-uint.md), [floats](float.md) and [strings](string.md).
-:::note
-You can check whether a data type name is case-sensitive in the [system.data_type_families](../../operations/system-tables/data_type_families.md#system_tables-data_type_families) table.
-:::
-
-ClickHouse data types include:
-
-- **Integer types**: [signed and unsigned integers](./int-uint.md) (`UInt8`, `UInt16`, `UInt32`, `UInt64`, `UInt128`, `UInt256`, `Int8`, `Int16`, `Int32`, `Int64`, `Int128`, `Int256`)
-- **Floating-point numbers**: [floats](./float.md)(`Float32` and `Float64`) and [`Decimal` values](./decimal.md)
-- **Boolean**: ClickHouse has a [`Boolean` type](./boolean.md)
-- **Strings**: [`String`](./string.md) and [`FixedString`](./fixedstring.md)
-- **Dates**: use [`Date`](./date.md) and [`Date32`](./date32.md) for days, and [`DateTime`](./datetime.md) and [`DateTime64`](./datetime64.md) for instances in time
-- **Object**: the [`Object`](./json.md) stores a JSON document in a single column (deprecated)
-- **JSON**: the [`JSON` object](./newjson.md) stores a JSON document in a single column
-- **UUID**: a performant option for storing [`UUID` values](./uuid.md)
-- **Low cardinality types**: use an [`Enum`](./enum.md) when you have a handful of unique values, or use [`LowCardinality`](./lowcardinality.md) when you have up to 10,000 unique values of a column
-- **Arrays**: any column can be defined as an [`Array` of values](./array.md)
-- **Maps**: use [`Map`](./map.md) for storing key/value pairs
-- **Aggregation function types**: use [`SimpleAggregateFunction`](./simpleaggregatefunction.md) and [`AggregateFunction`](./aggregatefunction.md) for storing the intermediate status of aggregate function results
-- **Nested data structures**: A [`Nested` data structure](./nested-data-structures/index.md) is like a table inside a cell
-- **Tuples**: A [`Tuple` of elements](./tuple.md), each having an individual type.
-- **Nullable**: [`Nullable`](./nullable.md) allows you to store a value as `NULL` when a value is "missing" (instead of the column settings its default value for the data type)
-- **IP addresses**: use [`IPv4`](./ipv4.md) and [`IPv6`](./ipv6.md) to efficiently store IP addresses
-- **Geo types**: for [geographical data](./geo.md), including `Point`, `Ring`, `Polygon` and `MultiPolygon`
-- **Special data types**: including [`Expression`](./special-data-types/expression.md), [`Set`](./special-data-types/set.md), [`Nothing`](./special-data-types/nothing.md) and [`Interval`](./special-data-types/interval.md)
+System table [system.data_type_families](../../operations/system-tables/data_type_families.md#system_tables-data_type_families) provides an
+overview of all available data types.
+It also shows whether a data type is an alias to another data type and its name is case-sensitive (e.g. `bool` vs. `BOOL`).
diff --git a/docs/en/sql-reference/data-types/json.md b/docs/en/sql-reference/data-types/json.md
index e48b308a620..ce69f15f0fa 100644
--- a/docs/en/sql-reference/data-types/json.md
+++ b/docs/en/sql-reference/data-types/json.md
@@ -7,7 +7,7 @@ keywords: [object, data type]
# Object Data Type (deprecated)
-**This feature is not production-ready and is now deprecated.** If you need to work with JSON documents, consider using [this guide](/docs/en/integrations/data-formats/json/overview) instead. A new implementation to support JSON object is in progress and can be tracked [here](https://github.com/ClickHouse/ClickHouse/issues/54864).
+**This feature is not production-ready and deprecated.** If you need to work with JSON documents, consider using [this guide](/docs/en/integrations/data-formats/json/overview) instead. A new implementation to support JSON object is in progress and can be tracked [here](https://github.com/ClickHouse/ClickHouse/issues/54864).
diff --git a/docs/en/sql-reference/data-types/simpleaggregatefunction.md b/docs/en/sql-reference/data-types/simpleaggregatefunction.md
index 4fb74ac30e4..8edd8b5b8ff 100644
--- a/docs/en/sql-reference/data-types/simpleaggregatefunction.md
+++ b/docs/en/sql-reference/data-types/simpleaggregatefunction.md
@@ -5,7 +5,9 @@ sidebar_label: SimpleAggregateFunction
---
# SimpleAggregateFunction
-`SimpleAggregateFunction(name, types_of_arguments...)` data type stores current value of the aggregate function, and does not store its full state as [`AggregateFunction`](../../sql-reference/data-types/aggregatefunction.md) does. This optimization can be applied to functions for which the following property holds: the result of applying a function `f` to a row set `S1 UNION ALL S2` can be obtained by applying `f` to parts of the row set separately, and then again applying `f` to the results: `f(S1 UNION ALL S2) = f(f(S1) UNION ALL f(S2))`. This property guarantees that partial aggregation results are enough to compute the combined one, so we do not have to store and process any extra data.
+`SimpleAggregateFunction(name, types_of_arguments...)` data type stores current value (intermediate state) of the aggregate function, but not its full state as [`AggregateFunction`](../../sql-reference/data-types/aggregatefunction.md) does.
+This optimization can be applied to functions for which the following property holds: the result of applying a function `f` to a row set `S1 UNION ALL S2` can be obtained by applying `f` to parts of the row set separately, and then again applying `f` to the results: `f(S1 UNION ALL S2) = f(f(S1) UNION ALL f(S2))`.
+This property guarantees that partial aggregation results are enough to compute the combined one, so we do not have to store and process any extra data.
The common way to produce an aggregate function value is by calling the aggregate function with the [-SimpleState](../../sql-reference/aggregate-functions/combinators.md#agg-functions-combinator-simplestate) suffix.
diff --git a/docs/en/sql-reference/functions/date-time-functions.md b/docs/en/sql-reference/functions/date-time-functions.md
index 2357b5b2fdd..34dc6e996ee 100644
--- a/docs/en/sql-reference/functions/date-time-functions.md
+++ b/docs/en/sql-reference/functions/date-time-functions.md
@@ -4773,7 +4773,7 @@ Result:
## toUTCTimestamp
-Convert DateTime/DateTime64 type value from other time zone to UTC timezone timestamp
+Convert DateTime/DateTime64 type value from other time zone to UTC timezone timestamp. This function is mainly included for compatibility with Apache Spark and similar frameworks.
**Syntax**
@@ -4799,14 +4799,14 @@ SELECT toUTCTimestamp(toDateTime('2023-03-16'), 'Asia/Shanghai');
Result:
``` text
-┌─toUTCTimestamp(toDateTime('2023-03-16'),'Asia/Shanghai')┐
+┌─toUTCTimestamp(toDateTime('2023-03-16'), 'Asia/Shanghai')┐
│ 2023-03-15 16:00:00 │
└─────────────────────────────────────────────────────────┘
```
## fromUTCTimestamp
-Convert DateTime/DateTime64 type value from UTC timezone to other time zone timestamp
+Convert DateTime/DateTime64 type value from UTC timezone to other time zone timestamp. This function is mainly included for compatibility with Apache Spark and similar frameworks.
**Syntax**
@@ -4832,7 +4832,7 @@ SELECT fromUTCTimestamp(toDateTime64('2023-03-16 10:00:00', 3), 'Asia/Shanghai')
Result:
``` text
-┌─fromUTCTimestamp(toDateTime64('2023-03-16 10:00:00',3),'Asia/Shanghai')─┐
+┌─fromUTCTimestamp(toDateTime64('2023-03-16 10:00:00',3), 'Asia/Shanghai')─┐
│ 2023-03-16 18:00:00.000 │
└─────────────────────────────────────────────────────────────────────────┘
```
diff --git a/docs/en/sql-reference/functions/geo/index.md b/docs/en/sql-reference/functions/geo/index.md
index d46e60281e2..51b6868611a 100644
--- a/docs/en/sql-reference/functions/geo/index.md
+++ b/docs/en/sql-reference/functions/geo/index.md
@@ -5,70 +5,4 @@ sidebar_position: 62
title: "Geo Functions"
---
-
-## Geographical Coordinates Functions
-
-- [greatCircleDistance](./coordinates.md#greatcircledistance)
-- [geoDistance](./coordinates.md#geodistance)
-- [greatCircleAngle](./coordinates.md#greatcircleangle)
-- [pointInEllipses](./coordinates.md#pointinellipses)
-- [pointInPolygon](./coordinates.md#pointinpolygon)
-
-## Geohash Functions
-- [geohashEncode](./geohash.md#geohashencode)
-- [geohashDecode](./geohash.md#geohashdecode)
-- [geohashesInBox](./geohash.md#geohashesinbox)
-
-## H3 Indexes Functions
-
-- [h3IsValid](./h3.md#h3isvalid)
-- [h3GetResolution](./h3.md#h3getresolution)
-- [h3EdgeAngle](./h3.md#h3edgeangle)
-- [h3EdgeLengthM](./h3.md#h3edgelengthm)
-- [h3EdgeLengthKm](./h3.md#h3edgelengthkm)
-- [geoToH3](./h3.md#geotoh3)
-- [h3ToGeo](./h3.md#h3togeo)
-- [h3ToGeoBoundary](./h3.md#h3togeoboundary)
-- [h3kRing](./h3.md#h3kring)
-- [h3GetBaseCell](./h3.md#h3getbasecell)
-- [h3HexAreaM2](./h3.md#h3hexaream2)
-- [h3HexAreaKm2](./h3.md#h3hexareakm2)
-- [h3IndexesAreNeighbors](./h3.md#h3indexesareneighbors)
-- [h3ToChildren](./h3.md#h3tochildren)
-- [h3ToParent](./h3.md#h3toparent)
-- [h3ToString](./h3.md#h3tostring)
-- [stringToH3](./h3.md#stringtoh3)
-- [h3GetResolution](./h3.md#h3getresolution)
-- [h3IsResClassIII](./h3.md#h3isresclassiii)
-- [h3IsPentagon](./h3.md#h3ispentagon)
-- [h3GetFaces](./h3.md#h3getfaces)
-- [h3CellAreaM2](./h3.md#h3cellaream2)
-- [h3CellAreaRads2](./h3.md#h3cellarearads2)
-- [h3ToCenterChild](./h3.md#h3tocenterchild)
-- [h3ExactEdgeLengthM](./h3.md#h3exactedgelengthm)
-- [h3ExactEdgeLengthKm](./h3.md#h3exactedgelengthkm)
-- [h3ExactEdgeLengthRads](./h3.md#h3exactedgelengthrads)
-- [h3NumHexagons](./h3.md#h3numhexagons)
-- [h3Line](./h3.md#h3line)
-- [h3Distance](./h3.md#h3distance)
-- [h3HexRing](./h3.md#h3hexring)
-- [h3GetUnidirectionalEdge](./h3.md#h3getunidirectionaledge)
-- [h3UnidirectionalEdgeIsValid](./h3.md#h3unidirectionaledgeisvalid)
-- [h3GetOriginIndexFromUnidirectionalEdge](./h3.md#h3getoriginindexfromunidirectionaledge)
-- [h3GetDestinationIndexFromUnidirectionalEdge](./h3.md#h3getdestinationindexfromunidirectionaledge)
-- [h3GetIndexesFromUnidirectionalEdge](./h3.md#h3getindexesfromunidirectionaledge)
-- [h3GetUnidirectionalEdgesFromHexagon](./h3.md#h3getunidirectionaledgesfromhexagon)
-- [h3GetUnidirectionalEdgeBoundary](./h3.md#h3getunidirectionaledgeboundary)
-
-## S2 Index Functions
-
-- [geoToS2](./s2.md#geotos2)
-- [s2ToGeo](./s2.md#s2togeo)
-- [s2GetNeighbors](./s2.md#s2getneighbors)
-- [s2CellsIntersect](./s2.md#s2cellsintersect)
-- [s2CapContains](./s2.md#s2capcontains)
-- [s2CapUnion](./s2.md#s2capunion)
-- [s2RectAdd](./s2.md#s2rectadd)
-- [s2RectContains](./s2.md#s2rectcontains)
-- [s2RectUnion](./s2.md#s2rectunion)
-- [s2RectIntersection](./s2.md#s2rectintersection)
+Functions for working with geometric objects, for example [to calculate distances between points on a sphere](./coordinates.md), [compute geohashes](./geohash.md), and work with [h3 indexes](./h3.md).
diff --git a/docs/en/sql-reference/statements/alter/column.md b/docs/en/sql-reference/statements/alter/column.md
index 29df041ccc6..fb16dacb7c8 100644
--- a/docs/en/sql-reference/statements/alter/column.md
+++ b/docs/en/sql-reference/statements/alter/column.md
@@ -279,7 +279,7 @@ For columns with a new or updated `MATERIALIZED` value expression, all existing
For columns with a new or updated `DEFAULT` value expression, the behavior depends on the ClickHouse version:
- In ClickHouse < v24.2, all existing rows are rewritten.
-- ClickHouse >= v24.2 distinguishes if a row value in a column with `DEFAULT` value expression was explicitly specified when it was inserted, or not, i.e. calculated from the `DEFAULT` value expression. If the value was explicitly specified, ClickHouse keeps it as is. If the value was was calculated, ClickHouse changes it to the new or updated `MATERIALIZED` value expression.
+- ClickHouse >= v24.2 distinguishes if a row value in a column with `DEFAULT` value expression was explicitly specified when it was inserted, or not, i.e. calculated from the `DEFAULT` value expression. If the value was explicitly specified, ClickHouse keeps it as is. If the value was calculated, ClickHouse changes it to the new or updated `MATERIALIZED` value expression.
Syntax:
diff --git a/docs/en/sql-reference/statements/create/index.md b/docs/en/sql-reference/statements/create/index.md
index fa39526a53e..5854d7cf9d2 100644
--- a/docs/en/sql-reference/statements/create/index.md
+++ b/docs/en/sql-reference/statements/create/index.md
@@ -6,16 +6,4 @@ sidebar_label: CREATE
# CREATE Queries
-Create queries make a new entity of one of the following kinds:
-
-- [DATABASE](/docs/en/sql-reference/statements/create/database.md)
-- [TABLE](/docs/en/sql-reference/statements/create/table.md)
-- [VIEW](/docs/en/sql-reference/statements/create/view.md)
-- [DICTIONARY](/docs/en/sql-reference/statements/create/dictionary.md)
-- [FUNCTION](/docs/en/sql-reference/statements/create/function.md)
-- [USER](/docs/en/sql-reference/statements/create/user.md)
-- [ROLE](/docs/en/sql-reference/statements/create/role.md)
-- [ROW POLICY](/docs/en/sql-reference/statements/create/row-policy.md)
-- [QUOTA](/docs/en/sql-reference/statements/create/quota.md)
-- [SETTINGS PROFILE](/docs/en/sql-reference/statements/create/settings-profile.md)
-- [NAMED COLLECTION](/docs/en/sql-reference/statements/create/named-collection.md)
+CREATE queries create (for example) new [databases](/docs/en/sql-reference/statements/create/database.md), [tables](/docs/en/sql-reference/statements/create/table.md) and [views](/docs/en/sql-reference/statements/create/view.md).
diff --git a/docs/en/sql-reference/statements/index.md b/docs/en/sql-reference/statements/index.md
index 5aa61cf8d21..f288b30b27b 100644
--- a/docs/en/sql-reference/statements/index.md
+++ b/docs/en/sql-reference/statements/index.md
@@ -6,27 +6,4 @@ sidebar_label: List of statements
# ClickHouse SQL Statements
-Statements represent various kinds of action you can perform using SQL queries. Each kind of statement has it’s own syntax and usage details that are described separately:
-
-- [SELECT](/docs/en/sql-reference/statements/select/index.md)
-- [INSERT INTO](/docs/en/sql-reference/statements/insert-into.md)
-- [CREATE](/docs/en/sql-reference/statements/create/index.md)
-- [ALTER](/docs/en/sql-reference/statements/alter/index.md)
-- [SYSTEM](/docs/en/sql-reference/statements/system.md)
-- [SHOW](/docs/en/sql-reference/statements/show.md)
-- [GRANT](/docs/en/sql-reference/statements/grant.md)
-- [REVOKE](/docs/en/sql-reference/statements/revoke.md)
-- [ATTACH](/docs/en/sql-reference/statements/attach.md)
-- [CHECK TABLE](/docs/en/sql-reference/statements/check-table.md)
-- [DESCRIBE TABLE](/docs/en/sql-reference/statements/describe-table.md)
-- [DETACH](/docs/en/sql-reference/statements/detach.md)
-- [DROP](/docs/en/sql-reference/statements/drop.md)
-- [EXISTS](/docs/en/sql-reference/statements/exists.md)
-- [KILL](/docs/en/sql-reference/statements/kill.md)
-- [OPTIMIZE](/docs/en/sql-reference/statements/optimize.md)
-- [RENAME](/docs/en/sql-reference/statements/rename.md)
-- [SET](/docs/en/sql-reference/statements/set.md)
-- [SET ROLE](/docs/en/sql-reference/statements/set-role.md)
-- [TRUNCATE](/docs/en/sql-reference/statements/truncate.md)
-- [USE](/docs/en/sql-reference/statements/use.md)
-- [EXPLAIN](/docs/en/sql-reference/statements/explain.md)
+Users interact with ClickHouse using SQL statements. ClickHouse supports common SQL statements like [SELECT](select/index.md) and [CREATE](create/index.md), but it also provides specialized statements like [KILL](kill.md) and [OPTIMIZE](optimize.md).
diff --git a/programs/compressor/Compressor.cpp b/programs/compressor/Compressor.cpp
index 843eed06f39..27f0bd1614b 100644
--- a/programs/compressor/Compressor.cpp
+++ b/programs/compressor/Compressor.cpp
@@ -12,9 +12,12 @@
#include
#include
#include
+#include
+#include
#include
#include
#include
+#include
#include
#include
#include
@@ -43,29 +46,24 @@ namespace CurrentMetrics
namespace
{
-/// Outputs sizes of uncompressed and compressed blocks for compressed file.
+/// Outputs method, sizes of uncompressed and compressed blocks for compressed file.
void checkAndWriteHeader(DB::ReadBuffer & in, DB::WriteBuffer & out)
{
while (!in.eof())
{
- in.ignore(16); /// checksum
-
- char header[COMPRESSED_BLOCK_HEADER_SIZE];
- in.readStrict(header, COMPRESSED_BLOCK_HEADER_SIZE);
-
- UInt32 size_compressed = unalignedLoad(&header[1]);
+ UInt32 size_compressed;
+ UInt32 size_decompressed;
+ auto codec = DB::getCompressionCodecForFile(in, size_compressed, size_decompressed, true /* skip_to_next_block */);
if (size_compressed > DBMS_MAX_COMPRESSED_SIZE)
throw DB::Exception(DB::ErrorCodes::TOO_LARGE_SIZE_COMPRESSED, "Too large size_compressed. Most likely corrupted data.");
- UInt32 size_decompressed = unalignedLoad(&header[5]);
-
+ DB::writeText(queryToString(codec->getFullCodecDesc()), out);
+ DB::writeChar('\t', out);
DB::writeText(size_decompressed, out);
DB::writeChar('\t', out);
DB::writeText(size_compressed, out);
DB::writeChar('\n', out);
-
- in.ignore(size_compressed - COMPRESSED_BLOCK_HEADER_SIZE);
}
}
diff --git a/programs/local/LocalServer.cpp b/programs/local/LocalServer.cpp
index ebcb593dcb6..e6f8ecef097 100644
--- a/programs/local/LocalServer.cpp
+++ b/programs/local/LocalServer.cpp
@@ -14,6 +14,7 @@
#include
#include
#include
+#include
#include
#include
#include
@@ -257,12 +258,12 @@ static DatabasePtr createMemoryDatabaseIfNotExists(ContextPtr context, const Str
return system_database;
}
-static DatabasePtr createClickHouseLocalDatabaseOverlay(const String & name_, ContextPtr context_)
+static DatabasePtr createClickHouseLocalDatabaseOverlay(const String & name_, ContextPtr context)
{
- auto databaseCombiner = std::make_shared(name_, context_);
- databaseCombiner->registerNextDatabase(std::make_shared(name_, "", context_));
- databaseCombiner->registerNextDatabase(std::make_shared(name_, context_));
- return databaseCombiner;
+ auto overlay = std::make_shared(name_, context);
+ overlay->registerNextDatabase(std::make_shared(name_, fs::weakly_canonical(context->getPath()), UUIDHelpers::generateV4(), context));
+ overlay->registerNextDatabase(std::make_shared(name_, "", context));
+ return overlay;
}
/// If path is specified and not empty, will try to setup server environment and load existing metadata
@@ -811,7 +812,12 @@ void LocalServer::processConfig()
DatabaseCatalog::instance().initializeAndLoadTemporaryDatabase();
std::string default_database = server_settings[ServerSetting::default_database];
- DatabaseCatalog::instance().attachDatabase(default_database, createClickHouseLocalDatabaseOverlay(default_database, global_context));
+ {
+ DatabasePtr database = createClickHouseLocalDatabaseOverlay(default_database, global_context);
+ if (UUID uuid = database->getUUID(); uuid != UUIDHelpers::Nil)
+ DatabaseCatalog::instance().addUUIDMapping(uuid);
+ DatabaseCatalog::instance().attachDatabase(default_database, database);
+ }
global_context->setCurrentDatabase(default_database);
if (getClientConfiguration().has("path"))
diff --git a/programs/main.cpp b/programs/main.cpp
index 02ea1471108..d15c20867d1 100644
--- a/programs/main.cpp
+++ b/programs/main.cpp
@@ -1,27 +1,22 @@
-#include
-#include
+#include
+#include
+#include
+#include
-#include
-#include
-#include
-#include
-#include
-#include /// pair
-
-#include
+#if defined(SANITIZE_COVERAGE)
+# include
+#endif
#include "config.h"
#include "config_tools.h"
-#include
-#include
-#include
-#include
-#include
-
-#include
-#include
-
+#include
+#include
+#include
+#include
+#include
+#include /// pair
+#include
/// Universal executable for various clickhouse applications
int mainEntryClickHouseServer(int argc, char ** argv);
@@ -238,9 +233,12 @@ int main(int argc_, char ** argv_)
/// clickhouse # spawn local
/// clickhouse local # spawn local
/// clickhouse "select ..." # spawn local
+ /// clickhouse /tmp/repro --enable-analyzer
///
- if (main_func == printHelp && !argv.empty() && (argv.size() == 1 || argv[1][0] == '-'
- || std::string_view(argv[1]).contains(' ')))
+ std::error_code ec;
+ if (main_func == printHelp && !argv.empty()
+ && (argv.size() == 1 || argv[1][0] == '-' || std::string_view(argv[1]).contains(' ')
+ || std::filesystem::is_regular_file(std::filesystem::path{argv[1]}, ec)))
{
main_func = mainEntryClickHouseLocal;
}
diff --git a/src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.cpp b/src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.cpp
index 5819c533fd9..ad1fecac784 100644
--- a/src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.cpp
+++ b/src/AggregateFunctions/AggregateFunctionDeltaSumTimestamp.cpp
@@ -22,6 +22,13 @@ namespace ErrorCodes
namespace
{
+/** Due to a lack of proper code review, this code was contributed with a multiplication of template instantiations
+ * over all pairs of data types, and we deeply regret that.
+ *
+ * We cannot remove all combinations, because the binary representation of serialized data has to remain the same,
+ * but we can partially heal the wound by treating unsigned and signed data types in the same way.
+ */
+
template
struct AggregationFunctionDeltaSumTimestampData
{
@@ -37,23 +44,22 @@ template
class AggregationFunctionDeltaSumTimestamp final
: public IAggregateFunctionDataHelper<
AggregationFunctionDeltaSumTimestampData,
- AggregationFunctionDeltaSumTimestamp
- >
+ AggregationFunctionDeltaSumTimestamp>
{
public:
AggregationFunctionDeltaSumTimestamp(const DataTypes & arguments, const Array & params)
: IAggregateFunctionDataHelper<
AggregationFunctionDeltaSumTimestampData,
- AggregationFunctionDeltaSumTimestamp
- >{arguments, params, createResultType()}
- {}
+ AggregationFunctionDeltaSumTimestamp>{arguments, params, createResultType()}
+ {
+ }
AggregationFunctionDeltaSumTimestamp()
: IAggregateFunctionDataHelper<
AggregationFunctionDeltaSumTimestampData,
- AggregationFunctionDeltaSumTimestamp
- >{}
- {}
+ AggregationFunctionDeltaSumTimestamp>{}
+ {
+ }
bool allocatesMemoryInArena() const override { return false; }
@@ -63,8 +69,8 @@ public:
void NO_SANITIZE_UNDEFINED ALWAYS_INLINE add(AggregateDataPtr __restrict place, const IColumn ** columns, size_t row_num, Arena *) const override
{
- auto value = assert_cast &>(*columns[0]).getData()[row_num];
- auto ts = assert_cast &>(*columns[1]).getData()[row_num];
+ auto value = unalignedLoad(columns[0]->getRawData().data() + row_num * sizeof(ValueType));
+ auto ts = unalignedLoad(columns[1]->getRawData().data() + row_num * sizeof(TimestampType));
auto & data = this->data(place);
@@ -172,10 +178,48 @@ public:
void insertResultInto(AggregateDataPtr __restrict place, IColumn & to, Arena *) const override
{
- assert_cast &>(to).getData().push_back(this->data(place).sum);
+ static_cast(to).template insertRawData(
+ reinterpret_cast(&this->data(place).sum));
}
};
+
+template class AggregateFunctionTemplate, typename... TArgs>
+IAggregateFunction * createWithTwoTypesSecond(const IDataType & second_type, TArgs && ... args)
+{
+ WhichDataType which(second_type);
+
+ if (which.idx == TypeIndex::UInt32) return new AggregateFunctionTemplate(args...);
+ if (which.idx == TypeIndex::UInt64) return new AggregateFunctionTemplate(args...);
+ if (which.idx == TypeIndex::Int32) return new AggregateFunctionTemplate(args...);
+ if (which.idx == TypeIndex::Int64) return new AggregateFunctionTemplate(args...);
+ if (which.idx == TypeIndex::Float32) return new AggregateFunctionTemplate(args...);
+ if (which.idx == TypeIndex::Float64) return new AggregateFunctionTemplate(args...);
+ if (which.idx == TypeIndex::Date) return new AggregateFunctionTemplate(args...);
+ if (which.idx == TypeIndex::DateTime) return new AggregateFunctionTemplate(args...);
+
+ return nullptr;
+}
+
+template class AggregateFunctionTemplate, typename... TArgs>
+IAggregateFunction * createWithTwoTypes(const IDataType & first_type, const IDataType & second_type, TArgs && ... args)
+{
+ WhichDataType which(first_type);
+
+ if (which.idx == TypeIndex::UInt8) return createWithTwoTypesSecond(second_type, args...);
+ if (which.idx == TypeIndex::UInt16) return createWithTwoTypesSecond(second_type, args...);
+ if (which.idx == TypeIndex::UInt32) return createWithTwoTypesSecond(second_type, args...);
+ if (which.idx == TypeIndex::UInt64) return createWithTwoTypesSecond(second_type, args...);
+ if (which.idx == TypeIndex::Int8) return createWithTwoTypesSecond(second_type, args...);
+ if (which.idx == TypeIndex::Int16) return createWithTwoTypesSecond(second_type, args...);
+ if (which.idx == TypeIndex::Int32) return createWithTwoTypesSecond(second_type, args...);
+ if (which.idx == TypeIndex::Int64) return createWithTwoTypesSecond(second_type, args...);
+ if (which.idx == TypeIndex::Float32) return createWithTwoTypesSecond(second_type, args...);
+ if (which.idx == TypeIndex::Float64) return createWithTwoTypesSecond(second_type, args...);
+
+ return nullptr;
+}
+
AggregateFunctionPtr createAggregateFunctionDeltaSumTimestamp(
const String & name,
const DataTypes & arguments,
@@ -193,7 +237,7 @@ AggregateFunctionPtr createAggregateFunctionDeltaSumTimestamp(
throw Exception(ErrorCodes::ILLEGAL_TYPE_OF_ARGUMENT, "Illegal type {} of argument for aggregate function {}, "
"must be Int, Float, Date, DateTime", arguments[1]->getName(), name);
- return AggregateFunctionPtr(createWithTwoNumericOrDateTypes(
+ return AggregateFunctionPtr(createWithTwoTypes(
*arguments[0], *arguments[1], arguments, params));
}
}
diff --git a/src/AggregateFunctions/Helpers.h b/src/AggregateFunctions/Helpers.h
index 2b8459b6684..24842b19f1b 100644
--- a/src/AggregateFunctions/Helpers.h
+++ b/src/AggregateFunctions/Helpers.h
@@ -184,36 +184,8 @@ static IAggregateFunction * createWithDecimalType(const IDataType & argument_typ
}
/** For template with two arguments.
+ * This is an extremely dangerous for code bloat - do not use.
*/
-template class AggregateFunctionTemplate, typename... TArgs>
-static IAggregateFunction * createWithTwoNumericTypesSecond(const IDataType & second_type, TArgs && ... args)
-{
- WhichDataType which(second_type);
-#define DISPATCH(TYPE) \
- if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate(args...);
- FOR_NUMERIC_TYPES(DISPATCH)
-#undef DISPATCH
- if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate(args...);
- if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate(args...);
- return nullptr;
-}
-
-template class AggregateFunctionTemplate, typename... TArgs>
-static IAggregateFunction * createWithTwoNumericTypes(const IDataType & first_type, const IDataType & second_type, TArgs && ... args)
-{
- WhichDataType which(first_type);
-#define DISPATCH(TYPE) \
- if (which.idx == TypeIndex::TYPE) \
- return createWithTwoNumericTypesSecond(second_type, args...);
- FOR_NUMERIC_TYPES(DISPATCH)
-#undef DISPATCH
- if (which.idx == TypeIndex::Enum8)
- return createWithTwoNumericTypesSecond(second_type, args...);
- if (which.idx == TypeIndex::Enum16)
- return createWithTwoNumericTypesSecond(second_type, args...);
- return nullptr;
-}
-
template class AggregateFunctionTemplate, typename... TArgs>
static IAggregateFunction * createWithTwoBasicNumericTypesSecond(const IDataType & second_type, TArgs && ... args)
{
@@ -237,46 +209,6 @@ static IAggregateFunction * createWithTwoBasicNumericTypes(const IDataType & fir
return nullptr;
}
-template class AggregateFunctionTemplate, typename... TArgs>
-static IAggregateFunction * createWithTwoNumericOrDateTypesSecond(const IDataType & second_type, TArgs && ... args)
-{
- WhichDataType which(second_type);
-#define DISPATCH(TYPE) \
- if (which.idx == TypeIndex::TYPE) return new AggregateFunctionTemplate(args...);
- FOR_NUMERIC_TYPES(DISPATCH)
-#undef DISPATCH
- if (which.idx == TypeIndex::Enum8) return new AggregateFunctionTemplate(args...);
- if (which.idx == TypeIndex::Enum16) return new AggregateFunctionTemplate(args...);
-
- /// expects that DataTypeDate based on UInt16, DataTypeDateTime based on UInt32
- if (which.idx == TypeIndex::Date) return new AggregateFunctionTemplate(args...);
- if (which.idx == TypeIndex::DateTime) return new AggregateFunctionTemplate(args...);
-
- return nullptr;
-}
-
-template class AggregateFunctionTemplate, typename... TArgs>
-static IAggregateFunction * createWithTwoNumericOrDateTypes(const IDataType & first_type, const IDataType & second_type, TArgs && ... args)
-{
- WhichDataType which(first_type);
-#define DISPATCH(TYPE) \
- if (which.idx == TypeIndex::TYPE) \
- return createWithTwoNumericOrDateTypesSecond(second_type, args...);
- FOR_NUMERIC_TYPES(DISPATCH)
-#undef DISPATCH
- if (which.idx == TypeIndex::Enum8)
- return createWithTwoNumericOrDateTypesSecond(second_type, args...);
- if (which.idx == TypeIndex::Enum16)
- return createWithTwoNumericOrDateTypesSecond(second_type, args...);
-
- /// expects that DataTypeDate based on UInt16, DataTypeDateTime based on UInt32
- if (which.idx == TypeIndex::Date)
- return createWithTwoNumericOrDateTypesSecond(second_type, args...);
- if (which.idx == TypeIndex::DateTime)
- return createWithTwoNumericOrDateTypesSecond(second_type, args...);
- return nullptr;
-}
-
template class AggregateFunctionTemplate, typename... TArgs>
static IAggregateFunction * createWithStringType(const IDataType & argument_type, TArgs && ... args)
{
diff --git a/src/Analyzer/FunctionNode.cpp b/src/Analyzer/FunctionNode.cpp
index 1ad7e7996c7..27b46a112e2 100644
--- a/src/Analyzer/FunctionNode.cpp
+++ b/src/Analyzer/FunctionNode.cpp
@@ -88,6 +88,7 @@ void FunctionNode::resolveAsFunction(FunctionBasePtr function_value)
function_name = function_value->getName();
function = std::move(function_value);
kind = FunctionKind::ORDINARY;
+ nulls_action = NullsAction::EMPTY;
}
void FunctionNode::resolveAsAggregateFunction(AggregateFunctionPtr aggregate_function_value)
@@ -95,6 +96,12 @@ void FunctionNode::resolveAsAggregateFunction(AggregateFunctionPtr aggregate_fun
function_name = aggregate_function_value->getName();
function = std::move(aggregate_function_value);
kind = FunctionKind::AGGREGATE;
+ /** When the function is resolved, we do not need the nulls action anymore.
+ * The only thing that the nulls action does is map from one function to another.
+ * Thus, the nulls action is encoded in the function name and does not make sense anymore.
+ * Keeping the nulls action may lead to incorrect comparison of functions, e.g., count() and count() IGNORE NULLS are the same function.
+ */
+ nulls_action = NullsAction::EMPTY;
}
void FunctionNode::resolveAsWindowFunction(AggregateFunctionPtr window_function_value)
diff --git a/src/Analyzer/JoinNode.cpp b/src/Analyzer/JoinNode.cpp
index bf99c014826..722c1e19b7e 100644
--- a/src/Analyzer/JoinNode.cpp
+++ b/src/Analyzer/JoinNode.cpp
@@ -48,9 +48,15 @@ ASTPtr JoinNode::toASTTableJoin() const
auto join_expression_ast = children[join_expression_child_index]->toAST();
if (is_using_join_expression)
- join_ast->using_expression_list = std::move(join_expression_ast);
+ {
+ join_ast->using_expression_list = join_expression_ast;
+ join_ast->children.push_back(join_ast->using_expression_list);
+ }
else
- join_ast->on_expression = std::move(join_expression_ast);
+ {
+ join_ast->on_expression = join_expression_ast;
+ join_ast->children.push_back(join_ast->on_expression);
+ }
}
return join_ast;
diff --git a/src/Analyzer/Passes/FuseFunctionsPass.cpp b/src/Analyzer/Passes/FuseFunctionsPass.cpp
index 17a765a068b..cd2577c3d76 100644
--- a/src/Analyzer/Passes/FuseFunctionsPass.cpp
+++ b/src/Analyzer/Passes/FuseFunctionsPass.cpp
@@ -85,10 +85,9 @@ QueryTreeNodePtr createResolvedFunction(const ContextPtr & context, const String
}
FunctionNodePtr createResolvedAggregateFunction(
- const String & name, const QueryTreeNodePtr & argument, const Array & parameters = {}, NullsAction action = NullsAction::EMPTY)
+ const String & name, const QueryTreeNodePtr & argument, const Array & parameters = {})
{
auto function_node = std::make_shared(name);
- function_node->setNullsAction(action);
if (!parameters.empty())
{
@@ -100,7 +99,7 @@ FunctionNodePtr createResolvedAggregateFunction(
function_node->getArguments().getNodes() = { argument };
AggregateFunctionProperties properties;
- auto aggregate_function = AggregateFunctionFactory::instance().get(name, action, {argument->getResultType()}, parameters, properties);
+ auto aggregate_function = AggregateFunctionFactory::instance().get(name, NullsAction::EMPTY, {argument->getResultType()}, parameters, properties);
function_node->resolveAsAggregateFunction(std::move(aggregate_function));
return function_node;
diff --git a/src/Analyzer/QueryTreePassManager.cpp b/src/Analyzer/QueryTreePassManager.cpp
index 4443f83596f..0005aa6c8bc 100644
--- a/src/Analyzer/QueryTreePassManager.cpp
+++ b/src/Analyzer/QueryTreePassManager.cpp
@@ -3,7 +3,6 @@
#include
#include
-#include "Analyzer/Passes/OptimizeGroupByInjectiveFunctionsPass.h"
#include
#include
@@ -16,39 +15,39 @@
#include
#include
#include
-#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
+#include
#include
#include
-#include
-#include
-#include
-#include
#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
-#include
+#include
#include
-#include
-#include
-#include
-
+#include
+#include
+#include
+#include
namespace DB
{
diff --git a/src/Analyzer/Resolve/QueryAnalyzer.cpp b/src/Analyzer/Resolve/QueryAnalyzer.cpp
index 390418494e7..03ebd893c47 100644
--- a/src/Analyzer/Resolve/QueryAnalyzer.cpp
+++ b/src/Analyzer/Resolve/QueryAnalyzer.cpp
@@ -1,3 +1,4 @@
+#include
#include
#include
@@ -676,6 +677,8 @@ void QueryAnalyzer::evaluateScalarSubqueryIfNeeded(QueryTreeNodePtr & node, Iden
"tuple"});
}
}
+
+ logProcessorProfile(context, io.pipeline.getProcessors());
}
scalars_cache.emplace(node_with_hash, scalar_block);
diff --git a/src/Backups/BackupConcurrencyCheck.cpp b/src/Backups/BackupConcurrencyCheck.cpp
index 8b29ae41b53..a67d241845d 100644
--- a/src/Backups/BackupConcurrencyCheck.cpp
+++ b/src/Backups/BackupConcurrencyCheck.cpp
@@ -14,12 +14,12 @@ namespace ErrorCodes
BackupConcurrencyCheck::BackupConcurrencyCheck(
- const UUID & backup_or_restore_uuid_,
bool is_restore_,
bool on_cluster_,
+ const String & zookeeper_path_,
bool allow_concurrency_,
BackupConcurrencyCounters & counters_)
- : is_restore(is_restore_), backup_or_restore_uuid(backup_or_restore_uuid_), on_cluster(on_cluster_), counters(counters_)
+ : is_restore(is_restore_), on_cluster(on_cluster_), zookeeper_path(zookeeper_path_), counters(counters_)
{
std::lock_guard lock{counters.mutex};
@@ -32,7 +32,7 @@ BackupConcurrencyCheck::BackupConcurrencyCheck(
size_t num_on_cluster_restores = counters.on_cluster_restores.size();
if (on_cluster)
{
- if (!counters.on_cluster_restores.contains(backup_or_restore_uuid))
+ if (!counters.on_cluster_restores.contains(zookeeper_path))
++num_on_cluster_restores;
}
else
@@ -47,7 +47,7 @@ BackupConcurrencyCheck::BackupConcurrencyCheck(
size_t num_on_cluster_backups = counters.on_cluster_backups.size();
if (on_cluster)
{
- if (!counters.on_cluster_backups.contains(backup_or_restore_uuid))
+ if (!counters.on_cluster_backups.contains(zookeeper_path))
++num_on_cluster_backups;
}
else
@@ -64,9 +64,9 @@ BackupConcurrencyCheck::BackupConcurrencyCheck(
if (on_cluster)
{
if (is_restore)
- ++counters.on_cluster_restores[backup_or_restore_uuid];
+ ++counters.on_cluster_restores[zookeeper_path];
else
- ++counters.on_cluster_backups[backup_or_restore_uuid];
+ ++counters.on_cluster_backups[zookeeper_path];
}
else
{
@@ -86,7 +86,7 @@ BackupConcurrencyCheck::~BackupConcurrencyCheck()
{
if (is_restore)
{
- auto it = counters.on_cluster_restores.find(backup_or_restore_uuid);
+ auto it = counters.on_cluster_restores.find(zookeeper_path);
if (it != counters.on_cluster_restores.end())
{
if (!--it->second)
@@ -95,7 +95,7 @@ BackupConcurrencyCheck::~BackupConcurrencyCheck()
}
else
{
- auto it = counters.on_cluster_backups.find(backup_or_restore_uuid);
+ auto it = counters.on_cluster_backups.find(zookeeper_path);
if (it != counters.on_cluster_backups.end())
{
if (!--it->second)
diff --git a/src/Backups/BackupConcurrencyCheck.h b/src/Backups/BackupConcurrencyCheck.h
index 048a23a716a..a1baeff5464 100644
--- a/src/Backups/BackupConcurrencyCheck.h
+++ b/src/Backups/BackupConcurrencyCheck.h
@@ -1,7 +1,8 @@
#pragma once
-#include
+#include
#include
+#include
#include
#include
@@ -19,9 +20,9 @@ public:
/// Checks concurrency of a BACKUP operation or a RESTORE operation.
/// Keep a constructed instance of BackupConcurrencyCheck until the operation is done.
BackupConcurrencyCheck(
- const UUID & backup_or_restore_uuid_,
bool is_restore_,
bool on_cluster_,
+ const String & zookeeper_path_,
bool allow_concurrency_,
BackupConcurrencyCounters & counters_);
@@ -31,8 +32,8 @@ public:
private:
const bool is_restore;
- const UUID backup_or_restore_uuid;
const bool on_cluster;
+ const String zookeeper_path;
BackupConcurrencyCounters & counters;
};
@@ -47,8 +48,8 @@ private:
friend class BackupConcurrencyCheck;
size_t local_backups TSA_GUARDED_BY(mutex) = 0;
size_t local_restores TSA_GUARDED_BY(mutex) = 0;
- std::unordered_map on_cluster_backups TSA_GUARDED_BY(mutex);
- std::unordered_map on_cluster_restores TSA_GUARDED_BY(mutex);
+ std::unordered_map on_cluster_backups TSA_GUARDED_BY(mutex);
+ std::unordered_map on_cluster_restores TSA_GUARDED_BY(mutex);
std::mutex mutex;
};
diff --git a/src/Backups/BackupCoordinationCleaner.cpp b/src/Backups/BackupCoordinationCleaner.cpp
index 1f5068a94de..47095f27eb3 100644
--- a/src/Backups/BackupCoordinationCleaner.cpp
+++ b/src/Backups/BackupCoordinationCleaner.cpp
@@ -4,31 +4,29 @@
namespace DB
{
-BackupCoordinationCleaner::BackupCoordinationCleaner(const String & zookeeper_path_, const WithRetries & with_retries_, LoggerPtr log_)
- : zookeeper_path(zookeeper_path_), with_retries(with_retries_), log(log_)
+BackupCoordinationCleaner::BackupCoordinationCleaner(bool is_restore_, const String & zookeeper_path_, const WithRetries & with_retries_, LoggerPtr log_)
+ : is_restore(is_restore_), zookeeper_path(zookeeper_path_), with_retries(with_retries_), log(log_)
{
}
-void BackupCoordinationCleaner::cleanup()
+bool BackupCoordinationCleaner::cleanup(bool throw_if_error)
{
- tryRemoveAllNodes(/* throw_if_error = */ true, /* retries_kind = */ WithRetries::kNormal);
+ WithRetries::Kind retries_kind = throw_if_error ? WithRetries::kNormal : WithRetries::kErrorHandling;
+ return cleanupImpl(throw_if_error, retries_kind);
}
-bool BackupCoordinationCleaner::tryCleanupAfterError() noexcept
-{
- return tryRemoveAllNodes(/* throw_if_error = */ false, /* retries_kind = */ WithRetries::kNormal);
-}
-
-bool BackupCoordinationCleaner::tryRemoveAllNodes(bool throw_if_error, WithRetries::Kind retries_kind)
+bool BackupCoordinationCleaner::cleanupImpl(bool throw_if_error, WithRetries::Kind retries_kind)
{
{
std::lock_guard lock{mutex};
- if (cleanup_result.succeeded)
- return true;
- if (cleanup_result.exception)
+ if (succeeded)
{
- if (throw_if_error)
- std::rethrow_exception(cleanup_result.exception);
+ LOG_TRACE(log, "Nodes from ZooKeeper are already removed");
+ return true;
+ }
+ if (tried)
+ {
+ LOG_INFO(log, "Skipped removing nodes from ZooKeeper because because earlier we failed to do that");
return false;
}
}
@@ -44,16 +42,18 @@ bool BackupCoordinationCleaner::tryRemoveAllNodes(bool throw_if_error, WithRetri
});
std::lock_guard lock{mutex};
- cleanup_result.succeeded = true;
+ tried = true;
+ succeeded = true;
return true;
}
catch (...)
{
- LOG_TRACE(log, "Caught exception while removing nodes from ZooKeeper for this restore: {}",
+ LOG_TRACE(log, "Caught exception while removing nodes from ZooKeeper for this {}: {}",
+ is_restore ? "restore" : "backup",
getCurrentExceptionMessage(/* with_stacktrace= */ false, /* check_embedded_stacktrace= */ true));
std::lock_guard lock{mutex};
- cleanup_result.exception = std::current_exception();
+ tried = true;
if (throw_if_error)
throw;
diff --git a/src/Backups/BackupCoordinationCleaner.h b/src/Backups/BackupCoordinationCleaner.h
index 43e095d9f33..c760a3611f9 100644
--- a/src/Backups/BackupCoordinationCleaner.h
+++ b/src/Backups/BackupCoordinationCleaner.h
@@ -12,14 +12,14 @@ namespace DB
class BackupCoordinationCleaner
{
public:
- BackupCoordinationCleaner(const String & zookeeper_path_, const WithRetries & with_retries_, LoggerPtr log_);
+ BackupCoordinationCleaner(bool is_restore_, const String & zookeeper_path_, const WithRetries & with_retries_, LoggerPtr log_);
- void cleanup();
- bool tryCleanupAfterError() noexcept;
+ bool cleanup(bool throw_if_error);
private:
- bool tryRemoveAllNodes(bool throw_if_error, WithRetries::Kind retries_kind);
+ bool cleanupImpl(bool throw_if_error, WithRetries::Kind retries_kind);
+ const bool is_restore;
const String zookeeper_path;
/// A reference to a field of the parent object which is either BackupCoordinationOnCluster or RestoreCoordinationOnCluster.
@@ -27,13 +27,8 @@ private:
const LoggerPtr log;
- struct CleanupResult
- {
- bool succeeded = false;
- std::exception_ptr exception;
- };
- CleanupResult cleanup_result TSA_GUARDED_BY(mutex);
-
+ bool tried TSA_GUARDED_BY(mutex) = false;
+ bool succeeded TSA_GUARDED_BY(mutex) = false;
std::mutex mutex;
};
diff --git a/src/Backups/BackupCoordinationLocal.cpp b/src/Backups/BackupCoordinationLocal.cpp
index 8bd6b4d327d..402e789eacb 100644
--- a/src/Backups/BackupCoordinationLocal.cpp
+++ b/src/Backups/BackupCoordinationLocal.cpp
@@ -11,12 +11,11 @@ namespace DB
{
BackupCoordinationLocal::BackupCoordinationLocal(
- const UUID & backup_uuid_,
bool is_plain_backup_,
bool allow_concurrent_backup_,
BackupConcurrencyCounters & concurrency_counters_)
: log(getLogger("BackupCoordinationLocal"))
- , concurrency_check(backup_uuid_, /* is_restore = */ false, /* on_cluster = */ false, allow_concurrent_backup_, concurrency_counters_)
+ , concurrency_check(/* is_restore = */ false, /* on_cluster = */ false, /* zookeeper_path = */ "", allow_concurrent_backup_, concurrency_counters_)
, file_infos(is_plain_backup_)
{
}
diff --git a/src/Backups/BackupCoordinationLocal.h b/src/Backups/BackupCoordinationLocal.h
index 09991c0d301..e63fcde981a 100644
--- a/src/Backups/BackupCoordinationLocal.h
+++ b/src/Backups/BackupCoordinationLocal.h
@@ -23,20 +23,19 @@ class BackupCoordinationLocal : public IBackupCoordination
{
public:
explicit BackupCoordinationLocal(
- const UUID & backup_uuid_,
bool is_plain_backup_,
bool allow_concurrent_backup_,
BackupConcurrencyCounters & concurrency_counters_);
~BackupCoordinationLocal() override;
+ void setBackupQueryIsSentToOtherHosts() override {}
+ bool isBackupQuerySentToOtherHosts() const override { return false; }
Strings setStage(const String &, const String &, bool) override { return {}; }
- void setBackupQueryWasSentToOtherHosts() override {}
- bool trySetError(std::exception_ptr) override { return true; }
- void finish() override {}
- bool tryFinishAfterError() noexcept override { return true; }
- void waitForOtherHostsToFinish() override {}
- bool tryWaitForOtherHostsToFinishAfterError() noexcept override { return true; }
+ bool setError(std::exception_ptr, bool) override { return true; }
+ bool waitOtherHostsFinish(bool) const override { return true; }
+ bool finish(bool) override { return true; }
+ bool cleanup(bool) override { return true; }
void addReplicatedPartNames(const String & table_zk_path, const String & table_name_for_logs, const String & replica_name,
const std::vector & part_names_and_checksums) override;
diff --git a/src/Backups/BackupCoordinationOnCluster.cpp b/src/Backups/BackupCoordinationOnCluster.cpp
index dc34939f805..1b14f226eff 100644
--- a/src/Backups/BackupCoordinationOnCluster.cpp
+++ b/src/Backups/BackupCoordinationOnCluster.cpp
@@ -184,17 +184,21 @@ BackupCoordinationOnCluster::BackupCoordinationOnCluster(
, plain_backup(is_plain_backup_)
, log(getLogger("BackupCoordinationOnCluster"))
, with_retries(log, get_zookeeper_, keeper_settings, process_list_element_, [root_zookeeper_path_](Coordination::ZooKeeperWithFaultInjection::Ptr zk) { zk->sync(root_zookeeper_path_); })
- , concurrency_check(backup_uuid_, /* is_restore = */ false, /* on_cluster = */ true, allow_concurrent_backup_, concurrency_counters_)
- , stage_sync(/* is_restore = */ false, fs::path{zookeeper_path} / "stage", current_host, all_hosts, allow_concurrent_backup_, with_retries, schedule_, process_list_element_, log)
- , cleaner(zookeeper_path, with_retries, log)
+ , cleaner(/* is_restore = */ false, zookeeper_path, with_retries, log)
+ , stage_sync(/* is_restore = */ false, fs::path{zookeeper_path} / "stage", current_host, all_hosts, allow_concurrent_backup_, concurrency_counters_, with_retries, schedule_, process_list_element_, log)
{
- createRootNodes();
+ try
+ {
+ createRootNodes();
+ }
+ catch (...)
+ {
+ stage_sync.setError(std::current_exception(), /* throw_if_error = */ false);
+ throw;
+ }
}
-BackupCoordinationOnCluster::~BackupCoordinationOnCluster()
-{
- tryFinishImpl();
-}
+BackupCoordinationOnCluster::~BackupCoordinationOnCluster() = default;
void BackupCoordinationOnCluster::createRootNodes()
{
@@ -217,69 +221,52 @@ void BackupCoordinationOnCluster::createRootNodes()
});
}
+void BackupCoordinationOnCluster::setBackupQueryIsSentToOtherHosts()
+{
+ stage_sync.setQueryIsSentToOtherHosts();
+}
+
+bool BackupCoordinationOnCluster::isBackupQuerySentToOtherHosts() const
+{
+ return stage_sync.isQuerySentToOtherHosts();
+}
+
Strings BackupCoordinationOnCluster::setStage(const String & new_stage, const String & message, bool sync)
{
stage_sync.setStage(new_stage, message);
-
- if (!sync)
- return {};
-
- return stage_sync.waitForHostsToReachStage(new_stage, all_hosts_without_initiator);
+ if (sync)
+ return stage_sync.waitHostsReachStage(all_hosts_without_initiator, new_stage);
+ return {};
}
-void BackupCoordinationOnCluster::setBackupQueryWasSentToOtherHosts()
+bool BackupCoordinationOnCluster::setError(std::exception_ptr exception, bool throw_if_error)
{
- backup_query_was_sent_to_other_hosts = true;
+ return stage_sync.setError(exception, throw_if_error);
}
-bool BackupCoordinationOnCluster::trySetError(std::exception_ptr exception)
+bool BackupCoordinationOnCluster::waitOtherHostsFinish(bool throw_if_error) const
{
- return stage_sync.trySetError(exception);
+ return stage_sync.waitOtherHostsFinish(throw_if_error);
}
-void BackupCoordinationOnCluster::finish()
+bool BackupCoordinationOnCluster::finish(bool throw_if_error)
{
- bool other_hosts_also_finished = false;
- stage_sync.finish(other_hosts_also_finished);
-
- if ((current_host == kInitiator) && (other_hosts_also_finished || !backup_query_was_sent_to_other_hosts))
- cleaner.cleanup();
+ return stage_sync.finish(throw_if_error);
}
-bool BackupCoordinationOnCluster::tryFinishAfterError() noexcept
+bool BackupCoordinationOnCluster::cleanup(bool throw_if_error)
{
- return tryFinishImpl();
-}
-
-bool BackupCoordinationOnCluster::tryFinishImpl() noexcept
-{
- bool other_hosts_also_finished = false;
- if (!stage_sync.tryFinishAfterError(other_hosts_also_finished))
- return false;
-
- if ((current_host == kInitiator) && (other_hosts_also_finished || !backup_query_was_sent_to_other_hosts))
+ /// All the hosts must finish before we remove the coordination nodes.
+ bool expect_other_hosts_finished = stage_sync.isQuerySentToOtherHosts() || !stage_sync.isErrorSet();
+ bool all_hosts_finished = stage_sync.finished() && (stage_sync.otherHostsFinished() || !expect_other_hosts_finished);
+ if (!all_hosts_finished)
{
- if (!cleaner.tryCleanupAfterError())
- return false;
- }
-
- return true;
-}
-
-void BackupCoordinationOnCluster::waitForOtherHostsToFinish()
-{
- if ((current_host != kInitiator) || !backup_query_was_sent_to_other_hosts)
- return;
- stage_sync.waitForOtherHostsToFinish();
-}
-
-bool BackupCoordinationOnCluster::tryWaitForOtherHostsToFinishAfterError() noexcept
-{
- if (current_host != kInitiator)
+ auto unfinished_hosts = expect_other_hosts_finished ? stage_sync.getUnfinishedHosts() : Strings{current_host};
+ LOG_INFO(log, "Skipping removing nodes from ZooKeeper because hosts {} didn't finish",
+ BackupCoordinationStageSync::getHostsDesc(unfinished_hosts));
return false;
- if (!backup_query_was_sent_to_other_hosts)
- return true;
- return stage_sync.tryWaitForOtherHostsToFinishAfterError();
+ }
+ return cleaner.cleanup(throw_if_error);
}
ZooKeeperRetriesInfo BackupCoordinationOnCluster::getOnClusterInitializationKeeperRetriesInfo() const
diff --git a/src/Backups/BackupCoordinationOnCluster.h b/src/Backups/BackupCoordinationOnCluster.h
index 7369c2cc746..b439ab619d8 100644
--- a/src/Backups/BackupCoordinationOnCluster.h
+++ b/src/Backups/BackupCoordinationOnCluster.h
@@ -1,7 +1,6 @@
#pragma once
#include
-#include
#include
#include
#include
@@ -20,7 +19,7 @@ class BackupCoordinationOnCluster : public IBackupCoordination
{
public:
/// Empty string as the current host is used to mark the initiator of a BACKUP ON CLUSTER query.
- static const constexpr std::string_view kInitiator;
+ static const constexpr std::string_view kInitiator = BackupCoordinationStageSync::kInitiator;
BackupCoordinationOnCluster(
const UUID & backup_uuid_,
@@ -37,13 +36,13 @@ public:
~BackupCoordinationOnCluster() override;
+ void setBackupQueryIsSentToOtherHosts() override;
+ bool isBackupQuerySentToOtherHosts() const override;
Strings setStage(const String & new_stage, const String & message, bool sync) override;
- void setBackupQueryWasSentToOtherHosts() override;
- bool trySetError(std::exception_ptr exception) override;
- void finish() override;
- bool tryFinishAfterError() noexcept override;
- void waitForOtherHostsToFinish() override;
- bool tryWaitForOtherHostsToFinishAfterError() noexcept override;
+ bool setError(std::exception_ptr exception, bool throw_if_error) override;
+ bool waitOtherHostsFinish(bool throw_if_error) const override;
+ bool finish(bool throw_if_error) override;
+ bool cleanup(bool throw_if_error) override;
void addReplicatedPartNames(
const String & table_zk_path,
@@ -110,11 +109,10 @@ private:
const bool plain_backup;
LoggerPtr const log;
+ /// The order is important: `stage_sync` must be initialized after `with_retries` and `cleaner`.
const WithRetries with_retries;
- BackupConcurrencyCheck concurrency_check;
- BackupCoordinationStageSync stage_sync;
BackupCoordinationCleaner cleaner;
- std::atomic backup_query_was_sent_to_other_hosts = false;
+ BackupCoordinationStageSync stage_sync;
mutable std::optional replicated_tables TSA_GUARDED_BY(replicated_tables_mutex);
mutable std::optional replicated_access TSA_GUARDED_BY(replicated_access_mutex);
diff --git a/src/Backups/BackupCoordinationStageSync.cpp b/src/Backups/BackupCoordinationStageSync.cpp
index 9a05f9490c2..fcf09d7c315 100644
--- a/src/Backups/BackupCoordinationStageSync.cpp
+++ b/src/Backups/BackupCoordinationStageSync.cpp
@@ -42,9 +42,6 @@ namespace
kCurrentVersion = 2,
};
-
- /// Empty string as the current host is used to mark the initiator of a BACKUP ON CLUSTER or RESTORE ON CLUSTER query.
- const constexpr std::string_view kInitiator;
}
bool BackupCoordinationStageSync::HostInfo::operator ==(const HostInfo & other) const
@@ -63,12 +60,32 @@ bool BackupCoordinationStageSync::State::operator ==(const State & other) const
bool BackupCoordinationStageSync::State::operator !=(const State & other) const = default;
+void BackupCoordinationStageSync::State::merge(const State & other)
+{
+ if (other.host_with_error && !host_with_error)
+ {
+ const String & host = *other.host_with_error;
+ host_with_error = host;
+ hosts.at(host).exception = other.hosts.at(host).exception;
+ }
+
+ for (const auto & [host, other_host_info] : other.hosts)
+ {
+ auto & host_info = hosts.at(host);
+ host_info.stages.insert(other_host_info.stages.begin(), other_host_info.stages.end());
+ if (other_host_info.finished)
+ host_info.finished = true;
+ }
+}
+
+
BackupCoordinationStageSync::BackupCoordinationStageSync(
bool is_restore_,
const String & zookeeper_path_,
const String & current_host_,
const Strings & all_hosts_,
bool allow_concurrency_,
+ BackupConcurrencyCounters & concurrency_counters_,
const WithRetries & with_retries_,
ThreadPoolCallbackRunnerUnsafe schedule_,
QueryStatusPtr process_list_element_,
@@ -89,35 +106,29 @@ BackupCoordinationStageSync::BackupCoordinationStageSync(
, max_attempts_after_bad_version(with_retries.getKeeperSettings().max_attempts_after_bad_version)
, zookeeper_path(zookeeper_path_)
, root_zookeeper_path(zookeeper_path.parent_path().parent_path())
- , operation_node_path(zookeeper_path.parent_path())
+ , operation_zookeeper_path(zookeeper_path.parent_path())
, operation_node_name(zookeeper_path.parent_path().filename())
- , stage_node_path(zookeeper_path)
, start_node_path(zookeeper_path / ("started|" + current_host))
, finish_node_path(zookeeper_path / ("finished|" + current_host))
, num_hosts_node_path(zookeeper_path / "num_hosts")
+ , error_node_path(zookeeper_path / "error")
, alive_node_path(zookeeper_path / ("alive|" + current_host))
, alive_tracker_node_path(fs::path{root_zookeeper_path} / "alive_tracker")
- , error_node_path(zookeeper_path / "error")
, zk_nodes_changed(std::make_shared())
{
- if ((zookeeper_path.filename() != "stage") || !operation_node_name.starts_with(is_restore ? "restore-" : "backup-")
- || (root_zookeeper_path == operation_node_path))
- {
- throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected path in ZooKeeper specified: {}", zookeeper_path);
- }
-
initializeState();
createRootNodes();
try
{
+ concurrency_check.emplace(is_restore, /* on_cluster = */ true, zookeeper_path, allow_concurrency, concurrency_counters_);
createStartAndAliveNodes();
startWatchingThread();
}
catch (...)
{
- trySetError(std::current_exception());
- tryFinishImpl();
+ if (setError(std::current_exception(), /* throw_if_error = */ false))
+ finish(/* throw_if_error = */ false);
throw;
}
}
@@ -125,7 +136,26 @@ BackupCoordinationStageSync::BackupCoordinationStageSync(
BackupCoordinationStageSync::~BackupCoordinationStageSync()
{
- tryFinishImpl();
+ /// Normally either finish() or setError() must be called.
+ if (!tried_to_finish)
+ {
+ if (state.host_with_error)
+ {
+ /// setError() was called and succeeded.
+ finish(/* throw_if_error = */ false);
+ }
+ else if (!tried_to_set_error)
+ {
+ /// Neither finish() nor setError() were called, it's a bug.
+ chassert(false, "~BackupCoordinationStageSync() is called without finish() or setError()");
+ LOG_ERROR(log, "~BackupCoordinationStageSync() is called without finish() or setError()");
+ }
+ }
+
+ /// Normally the watching thread should be stopped already because the finish() function stops it.
+ /// However if an error happened then the watching thread can be still running,
+ /// so here in the destructor we have to ensure that it's stopped.
+ stopWatchingThread();
}
@@ -137,6 +167,12 @@ void BackupCoordinationStageSync::initializeState()
for (const String & host : all_hosts)
state.hosts.emplace(host, HostInfo{.host = host, .last_connection_time = now, .last_connection_time_monotonic = monotonic_now});
+
+ if (!state.hosts.contains(current_host))
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "List of hosts must contain the current host");
+
+ if (!state.hosts.contains(String{kInitiator}))
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "List of hosts must contain the initiator");
}
@@ -179,6 +215,12 @@ String BackupCoordinationStageSync::getHostsDesc(const Strings & hosts)
void BackupCoordinationStageSync::createRootNodes()
{
+ if ((zookeeper_path.filename() != "stage") || !operation_node_name.starts_with(is_restore ? "restore-" : "backup-")
+ || (root_zookeeper_path == operation_zookeeper_path))
+ {
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "Unexpected path in ZooKeeper specified: {}", zookeeper_path);
+ }
+
auto holder = with_retries.createRetriesControlHolder("BackupStageSync::createRootNodes", WithRetries::kInitialization);
holder.retries_ctl.retryLoop(
[&, &zookeeper = holder.faulty_zookeeper]()
@@ -252,27 +294,27 @@ void BackupCoordinationStageSync::createStartAndAliveNodes(Coordination::ZooKeep
Coordination::Requests requests;
requests.reserve(6);
- size_t operation_node_path_pos = static_cast(-1);
- if (!zookeeper->exists(operation_node_path))
+ size_t operation_node_pos = static_cast(-1);
+ if (!zookeeper->exists(operation_zookeeper_path))
{
- operation_node_path_pos = requests.size();
- requests.emplace_back(zkutil::makeCreateRequest(operation_node_path, "", zkutil::CreateMode::Persistent));
+ operation_node_pos = requests.size();
+ requests.emplace_back(zkutil::makeCreateRequest(operation_zookeeper_path, "", zkutil::CreateMode::Persistent));
}
- size_t stage_node_path_pos = static_cast(-1);
- if (!zookeeper->exists(stage_node_path))
+ size_t zookeeper_path_pos = static_cast(-1);
+ if (!zookeeper->exists(zookeeper_path))
{
- stage_node_path_pos = requests.size();
- requests.emplace_back(zkutil::makeCreateRequest(stage_node_path, "", zkutil::CreateMode::Persistent));
+ zookeeper_path_pos = requests.size();
+ requests.emplace_back(zkutil::makeCreateRequest(zookeeper_path, "", zkutil::CreateMode::Persistent));
}
- size_t num_hosts_node_path_pos = requests.size();
+ size_t num_hosts_node_pos = requests.size();
if (num_hosts)
requests.emplace_back(zkutil::makeSetRequest(num_hosts_node_path, toString(*num_hosts + 1), num_hosts_version));
else
requests.emplace_back(zkutil::makeCreateRequest(num_hosts_node_path, "1", zkutil::CreateMode::Persistent));
- size_t alive_tracker_node_path_pos = requests.size();
+ size_t alive_tracker_node_pos = requests.size();
requests.emplace_back(zkutil::makeSetRequest(alive_tracker_node_path, "", alive_tracker_version));
requests.emplace_back(zkutil::makeCreateRequest(start_node_path, std::to_string(kCurrentVersion), zkutil::CreateMode::Persistent));
@@ -284,7 +326,7 @@ void BackupCoordinationStageSync::createStartAndAliveNodes(Coordination::ZooKeep
if (code == Coordination::Error::ZOK)
{
LOG_INFO(log, "Created start node #{} in ZooKeeper for {} (coordination version: {})",
- num_hosts.value_or(0) + 1, current_host_desc, kCurrentVersion);
+ num_hosts.value_or(0) + 1, current_host_desc, static_cast(kCurrentVersion));
return;
}
@@ -294,40 +336,34 @@ void BackupCoordinationStageSync::createStartAndAliveNodes(Coordination::ZooKeep
LOG_TRACE(log, "{} (attempt #{}){}", message, attempt_no, will_try_again ? ", will try again" : "");
};
- if ((responses.size() > operation_node_path_pos) &&
- (responses[operation_node_path_pos]->error == Coordination::Error::ZNODEEXISTS))
+ if ((operation_node_pos < responses.size()) &&
+ (responses[operation_node_pos]->error == Coordination::Error::ZNODEEXISTS))
{
- show_error_before_next_attempt(fmt::format("Node {} in ZooKeeper already exists", operation_node_path));
+ show_error_before_next_attempt(fmt::format("Node {} already exists", operation_zookeeper_path));
/// needs another attempt
}
- else if ((responses.size() > stage_node_path_pos) &&
- (responses[stage_node_path_pos]->error == Coordination::Error::ZNODEEXISTS))
+ else if ((zookeeper_path_pos < responses.size()) &&
+ (responses[zookeeper_path_pos]->error == Coordination::Error::ZNODEEXISTS))
{
- show_error_before_next_attempt(fmt::format("Node {} in ZooKeeper already exists", stage_node_path));
+ show_error_before_next_attempt(fmt::format("Node {} already exists", zookeeper_path));
/// needs another attempt
}
- else if ((responses.size() > num_hosts_node_path_pos) && num_hosts &&
- (responses[num_hosts_node_path_pos]->error == Coordination::Error::ZBADVERSION))
+ else if ((num_hosts_node_pos < responses.size()) && !num_hosts &&
+ (responses[num_hosts_node_pos]->error == Coordination::Error::ZNODEEXISTS))
{
- show_error_before_next_attempt("Other host changed the 'num_hosts' node in ZooKeeper");
+ show_error_before_next_attempt(fmt::format("Node {} already exists", num_hosts_node_path));
+ /// needs another attempt
+ }
+ else if ((num_hosts_node_pos < responses.size()) && num_hosts &&
+ (responses[num_hosts_node_pos]->error == Coordination::Error::ZBADVERSION))
+ {
+ show_error_before_next_attempt(fmt::format("The version of node {} changed", num_hosts_node_path));
num_hosts.reset(); /// needs to reread 'num_hosts' again
}
- else if ((responses.size() > num_hosts_node_path_pos) && num_hosts &&
- (responses[num_hosts_node_path_pos]->error == Coordination::Error::ZNONODE))
+ else if ((alive_tracker_node_pos < responses.size()) &&
+ (responses[alive_tracker_node_pos]->error == Coordination::Error::ZBADVERSION))
{
- show_error_before_next_attempt("Other host removed the 'num_hosts' node in ZooKeeper");
- num_hosts.reset(); /// needs to reread 'num_hosts' again
- }
- else if ((responses.size() > num_hosts_node_path_pos) && !num_hosts &&
- (responses[num_hosts_node_path_pos]->error == Coordination::Error::ZNODEEXISTS))
- {
- show_error_before_next_attempt("Other host created the 'num_hosts' node in ZooKeeper");
- /// needs another attempt
- }
- else if ((responses.size() > alive_tracker_node_path_pos) &&
- (responses[alive_tracker_node_path_pos]->error == Coordination::Error::ZBADVERSION))
- {
- show_error_before_next_attempt("Concurrent backup or restore changed some 'alive' nodes in ZooKeeper");
+ show_error_before_next_attempt(fmt::format("The version of node {} changed", alive_tracker_node_path));
check_concurrency = true; /// needs to recheck for concurrency again
}
else
@@ -337,8 +373,7 @@ void BackupCoordinationStageSync::createStartAndAliveNodes(Coordination::ZooKeep
}
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE,
- "Couldn't create the 'start' node in ZooKeeper for {} after {} attempts",
- current_host_desc, max_attempts_after_bad_version);
+ "Couldn't create node {} in ZooKeeper after {} attempts", start_node_path, max_attempts_after_bad_version);
}
@@ -387,36 +422,53 @@ void BackupCoordinationStageSync::startWatchingThread()
void BackupCoordinationStageSync::stopWatchingThread()
{
- should_stop_watching_thread = true;
+ {
+ std::lock_guard lock{mutex};
+ if (should_stop_watching_thread)
+ return;
+ should_stop_watching_thread = true;
- /// Wake up waiting threads.
- if (zk_nodes_changed)
- zk_nodes_changed->set();
- state_changed.notify_all();
+ /// Wake up waiting threads.
+ if (zk_nodes_changed)
+ zk_nodes_changed->set();
+ state_changed.notify_all();
+ }
if (watching_thread_future.valid())
watching_thread_future.wait();
+
+ LOG_TRACE(log, "Stopped the watching thread");
}
void BackupCoordinationStageSync::watchingThread()
{
- while (!should_stop_watching_thread)
+ auto should_stop = [&]
+ {
+ std::lock_guard lock{mutex};
+ return should_stop_watching_thread;
+ };
+
+ while (!should_stop())
{
try
{
/// Check if the current BACKUP or RESTORE command is already cancelled.
checkIfQueryCancelled();
+ }
+ catch (...)
+ {
+ tryLogCurrentException(log, "Caugth exception while watching");
+ }
- /// Reset the `connected` flag for each host, we'll set them to true again after we find the 'alive' nodes.
- resetConnectedFlag();
-
+ try
+ {
/// Recreate the 'alive' node if necessary and read a new state from ZooKeeper.
auto holder = with_retries.createRetriesControlHolder("BackupStageSync::watchingThread");
auto & zookeeper = holder.faulty_zookeeper;
with_retries.renewZooKeeper(zookeeper);
- if (should_stop_watching_thread)
+ if (should_stop())
return;
/// Recreate the 'alive' node if it was removed.
@@ -427,7 +479,10 @@ void BackupCoordinationStageSync::watchingThread()
}
catch (...)
{
- tryLogCurrentException(log, "Caugth exception while watching");
+ tryLogCurrentException(log, "Caught exception while watching");
+
+ /// Reset the `connected` flag for each host, we'll set them to true again after we find the 'alive' nodes.
+ resetConnectedFlag();
}
try
@@ -438,7 +493,7 @@ void BackupCoordinationStageSync::watchingThread()
}
catch (...)
{
- tryLogCurrentException(log, "Caugth exception while checking if the query should be cancelled");
+ tryLogCurrentException(log, "Caught exception while watching");
}
zk_nodes_changed->tryWait(sync_period_ms.count());
@@ -473,7 +528,7 @@ void BackupCoordinationStageSync::readCurrentState(Coordination::ZooKeeperWithFa
zk_nodes_changed->reset();
/// Get zk nodes and subscribe on their changes.
- Strings new_zk_nodes = zookeeper->getChildren(stage_node_path, nullptr, zk_nodes_changed);
+ Strings new_zk_nodes = zookeeper->getChildren(zookeeper_path, nullptr, zk_nodes_changed);
std::sort(new_zk_nodes.begin(), new_zk_nodes.end()); /// Sorting is necessary because we compare the list of zk nodes with its previous versions.
State new_state;
@@ -492,6 +547,8 @@ void BackupCoordinationStageSync::readCurrentState(Coordination::ZooKeeperWithFa
zk_nodes = new_zk_nodes;
new_state = state;
+ for (auto & [_, host_info] : new_state.hosts)
+ host_info.connected = false;
}
auto get_host_info = [&](const String & host) -> HostInfo *
@@ -514,7 +571,8 @@ void BackupCoordinationStageSync::readCurrentState(Coordination::ZooKeeperWithFa
{
String serialized_error = zookeeper->get(error_node_path);
auto [exception, host] = parseErrorNode(serialized_error);
- if (auto * host_info = get_host_info(host))
+ auto * host_info = get_host_info(host);
+ if (exception && host_info)
{
host_info->exception = exception;
new_state.host_with_error = host;
@@ -576,6 +634,9 @@ void BackupCoordinationStageSync::readCurrentState(Coordination::ZooKeeperWithFa
{
std::lock_guard lock{mutex};
+ /// We were reading `new_state` from ZooKeeper with `mutex` unlocked, so `state` could get more information during that reading,
+ /// we don't want to lose that information, that's why we use merge() here.
+ new_state.merge(state);
was_state_changed = (new_state != state);
state = std::move(new_state);
}
@@ -604,26 +665,10 @@ int BackupCoordinationStageSync::parseStartNode(const String & start_node_conten
}
-std::pair BackupCoordinationStageSync::parseErrorNode(const String & error_node_contents)
-{
- ReadBufferFromOwnString buf{error_node_contents};
- String host;
- readStringBinary(host, buf);
- auto exception = std::make_exception_ptr(readException(buf, fmt::format("Got error from {}", getHostDesc(host))));
- return {exception, host};
-}
-
-
void BackupCoordinationStageSync::checkIfQueryCancelled()
{
if (process_list_element->checkTimeLimitSoft())
return; /// Not cancelled.
-
- std::lock_guard lock{mutex};
- if (state.cancelled)
- return; /// Already marked as cancelled.
-
- state.cancelled = true;
state_changed.notify_all();
}
@@ -634,13 +679,13 @@ void BackupCoordinationStageSync::cancelQueryIfError()
{
std::lock_guard lock{mutex};
- if (state.cancelled || !state.host_with_error)
+ if (!state.host_with_error)
return;
- state.cancelled = true;
exception = state.hosts.at(*state.host_with_error).exception;
}
+ chassert(exception);
process_list_element->cancelQuery(false, exception);
state_changed.notify_all();
}
@@ -652,7 +697,7 @@ void BackupCoordinationStageSync::cancelQueryIfDisconnectedTooLong()
{
std::lock_guard lock{mutex};
- if (state.cancelled || state.host_with_error || ((failure_after_host_disconnected_for_seconds.count() == 0)))
+ if (state.host_with_error || ((failure_after_host_disconnected_for_seconds.count() == 0)))
return;
auto monotonic_now = std::chrono::steady_clock::now();
@@ -685,27 +730,92 @@ void BackupCoordinationStageSync::cancelQueryIfDisconnectedTooLong()
}
}
}
-
- if (!exception)
- return;
-
- state.cancelled = true;
}
+ if (!exception)
+ return;
+
process_list_element->cancelQuery(false, exception);
state_changed.notify_all();
}
+void BackupCoordinationStageSync::setQueryIsSentToOtherHosts()
+{
+ std::lock_guard lock{mutex};
+ query_is_sent_to_other_hosts = true;
+}
+
+bool BackupCoordinationStageSync::isQuerySentToOtherHosts() const
+{
+ std::lock_guard lock{mutex};
+ return query_is_sent_to_other_hosts;
+}
+
+
void BackupCoordinationStageSync::setStage(const String & stage, const String & stage_result)
{
LOG_INFO(log, "{} reached stage {}", current_host_desc, stage);
+
+ {
+ std::lock_guard lock{mutex};
+ if (state.hosts.at(current_host).stages.contains(stage))
+ return; /// Already set.
+ }
+
+ if ((getInitiatorVersion() == kVersionWithoutFinishNode) && (stage == BackupCoordinationStage::COMPLETED))
+ {
+ LOG_TRACE(log, "Stopping the watching thread because the initiator uses outdated version {}", getInitiatorVersion());
+ stopWatchingThread();
+ }
+
auto holder = with_retries.createRetriesControlHolder("BackupStageSync::setStage");
holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zookeeper);
- zookeeper->createIfNotExists(getStageNodePath(stage), stage_result);
+ createStageNode(stage, stage_result, zookeeper);
});
+
+ /// If the initiator of the query has that old version then it doesn't expect us to create the 'finish' node and moreover
+ /// the initiator can start removing all the nodes immediately after all hosts report about reaching the "completed" status.
+ /// So to avoid weird errors in the logs we won't create the 'finish' node if the initiator of the query has that old version.
+ if ((getInitiatorVersion() == kVersionWithoutFinishNode) && (stage == BackupCoordinationStage::COMPLETED))
+ {
+ LOG_INFO(log, "Skipped creating the 'finish' node because the initiator uses outdated version {}", getInitiatorVersion());
+ std::lock_guard lock{mutex};
+ tried_to_finish = true;
+ state.hosts.at(current_host).finished = true;
+ }
+}
+
+
+void BackupCoordinationStageSync::createStageNode(const String & stage, const String & stage_result, Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper)
+{
+ String serialized_error;
+ if (zookeeper->tryGet(error_node_path, serialized_error))
+ {
+ auto [exception, host] = parseErrorNode(serialized_error);
+ if (exception)
+ std::rethrow_exception(exception);
+ }
+
+ auto code = zookeeper->tryCreate(getStageNodePath(stage), stage_result, zkutil::CreateMode::Persistent);
+ if (code == Coordination::Error::ZOK)
+ {
+ std::lock_guard lock{mutex};
+ state.hosts.at(current_host).stages[stage] = stage_result;
+ return;
+ }
+
+ if (code == Coordination::Error::ZNODEEXISTS)
+ {
+ String another_result = zookeeper->get(getStageNodePath(stage));
+ std::lock_guard lock{mutex};
+ state.hosts.at(current_host).stages[stage] = another_result;
+ return;
+ }
+
+ throw zkutil::KeeperException::fromPath(code, getStageNodePath(stage));
}
@@ -715,71 +825,7 @@ String BackupCoordinationStageSync::getStageNodePath(const String & stage) const
}
-bool BackupCoordinationStageSync::trySetError(std::exception_ptr exception) noexcept
-{
- try
- {
- std::rethrow_exception(exception);
- }
- catch (const Exception & e)
- {
- return trySetError(e);
- }
- catch (...)
- {
- return trySetError(Exception(getCurrentExceptionMessageAndPattern(true, true), getCurrentExceptionCode()));
- }
-}
-
-
-bool BackupCoordinationStageSync::trySetError(const Exception & exception)
-{
- try
- {
- setError(exception);
- return true;
- }
- catch (...)
- {
- return false;
- }
-}
-
-
-void BackupCoordinationStageSync::setError(const Exception & exception)
-{
- /// Most likely this exception has been already logged so here we're logging it without stacktrace.
- String exception_message = getExceptionMessage(exception, /* with_stacktrace= */ false, /* check_embedded_stacktrace= */ true);
- LOG_INFO(log, "Sending exception from {} to other hosts: {}", current_host_desc, exception_message);
-
- auto holder = with_retries.createRetriesControlHolder("BackupStageSync::setError", WithRetries::kErrorHandling);
-
- holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]()
- {
- with_retries.renewZooKeeper(zookeeper);
-
- WriteBufferFromOwnString buf;
- writeStringBinary(current_host, buf);
- writeException(exception, buf, true);
- auto code = zookeeper->tryCreate(error_node_path, buf.str(), zkutil::CreateMode::Persistent);
-
- if (code == Coordination::Error::ZOK)
- {
- LOG_TRACE(log, "Sent exception from {} to other hosts", current_host_desc);
- }
- else if (code == Coordination::Error::ZNODEEXISTS)
- {
- LOG_INFO(log, "An error has been already assigned for this {}", operation_name);
- }
- else
- {
- throw zkutil::KeeperException::fromPath(code, error_node_path);
- }
- });
-}
-
-
-Strings BackupCoordinationStageSync::waitForHostsToReachStage(const String & stage_to_wait, const Strings & hosts, std::optional timeout) const
+Strings BackupCoordinationStageSync::waitHostsReachStage(const Strings & hosts, const String & stage_to_wait) const
{
Strings results;
results.resize(hosts.size());
@@ -787,44 +833,28 @@ Strings BackupCoordinationStageSync::waitForHostsToReachStage(const String & sta
std::unique_lock lock{mutex};
/// TSA_NO_THREAD_SAFETY_ANALYSIS is here because Clang Thread Safety Analysis doesn't understand std::unique_lock.
- auto check_if_hosts_ready = [&](bool time_is_out) TSA_NO_THREAD_SAFETY_ANALYSIS
+ auto check_if_hosts_reach_stage = [&]() TSA_NO_THREAD_SAFETY_ANALYSIS
{
- return checkIfHostsReachStage(hosts, stage_to_wait, time_is_out, timeout, results);
+ return checkIfHostsReachStage(hosts, stage_to_wait, results);
};
- if (timeout)
- {
- if (!state_changed.wait_for(lock, *timeout, [&] { return check_if_hosts_ready(/* time_is_out = */ false); }))
- check_if_hosts_ready(/* time_is_out = */ true);
- }
- else
- {
- state_changed.wait(lock, [&] { return check_if_hosts_ready(/* time_is_out = */ false); });
- }
+ state_changed.wait(lock, check_if_hosts_reach_stage);
return results;
}
-bool BackupCoordinationStageSync::checkIfHostsReachStage(
- const Strings & hosts,
- const String & stage_to_wait,
- bool time_is_out,
- std::optional timeout,
- Strings & results) const
+bool BackupCoordinationStageSync::checkIfHostsReachStage(const Strings & hosts, const String & stage_to_wait, Strings & results) const
{
- if (should_stop_watching_thread)
- throw Exception(ErrorCodes::LOGICAL_ERROR, "finish() was called while waiting for a stage");
-
process_list_element->checkTimeLimit();
for (size_t i = 0; i != hosts.size(); ++i)
{
const String & host = hosts[i];
auto it = state.hosts.find(host);
-
if (it == state.hosts.end())
- throw Exception(ErrorCodes::LOGICAL_ERROR, "waitForHostsToReachStage() was called for unexpected {}, all hosts are {}", getHostDesc(host), getHostsDesc(all_hosts));
+ throw Exception(ErrorCodes::LOGICAL_ERROR,
+ "waitHostsReachStage() was called for unexpected {}, all hosts are {}", getHostDesc(host), getHostsDesc(all_hosts));
const HostInfo & host_info = it->second;
auto stage_it = host_info.stages.find(stage_to_wait);
@@ -835,10 +865,11 @@ bool BackupCoordinationStageSync::checkIfHostsReachStage(
}
if (host_info.finished)
- {
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE,
"{} finished without coming to stage {}", getHostDesc(host), stage_to_wait);
- }
+
+ if (should_stop_watching_thread)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "waitHostsReachStage() can't wait for stage {} after the watching thread stopped", stage_to_wait);
String host_status;
if (!host_info.started)
@@ -846,85 +877,73 @@ bool BackupCoordinationStageSync::checkIfHostsReachStage(
else if (!host_info.connected)
host_status = fmt::format(": the host is currently disconnected, last connection was at {}", host_info.last_connection_time);
- if (!time_is_out)
- {
- LOG_TRACE(log, "Waiting for {} to reach stage {}{}", getHostDesc(host), stage_to_wait, host_status);
- return false;
- }
- else
- {
- throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE,
- "Waited longer than timeout {} for {} to reach stage {}{}",
- *timeout, getHostDesc(host), stage_to_wait, host_status);
- }
+ LOG_TRACE(log, "Waiting for {} to reach stage {}{}", getHostDesc(host), stage_to_wait, host_status);
+ return false; /// wait for next change of `state_changed`
}
LOG_INFO(log, "Hosts {} reached stage {}", getHostsDesc(hosts), stage_to_wait);
- return true;
+ return true; /// stop waiting
}
-void BackupCoordinationStageSync::finish(bool & other_hosts_also_finished)
+bool BackupCoordinationStageSync::finish(bool throw_if_error)
{
- tryFinishImpl(other_hosts_also_finished, /* throw_if_error = */ true, /* retries_kind = */ WithRetries::kNormal);
+ WithRetries::Kind retries_kind = WithRetries::kNormal;
+ if (throw_if_error)
+ retries_kind = WithRetries::kErrorHandling;
+
+ return finishImpl(throw_if_error, retries_kind);
}
-bool BackupCoordinationStageSync::tryFinishAfterError(bool & other_hosts_also_finished) noexcept
+bool BackupCoordinationStageSync::finishImpl(bool throw_if_error, WithRetries::Kind retries_kind)
{
- return tryFinishImpl(other_hosts_also_finished, /* throw_if_error = */ false, /* retries_kind = */ WithRetries::kErrorHandling);
-}
-
-
-bool BackupCoordinationStageSync::tryFinishImpl()
-{
- bool other_hosts_also_finished;
- return tryFinishAfterError(other_hosts_also_finished);
-}
-
-
-bool BackupCoordinationStageSync::tryFinishImpl(bool & other_hosts_also_finished, bool throw_if_error, WithRetries::Kind retries_kind)
-{
- auto get_value_other_hosts_also_finished = [&] TSA_REQUIRES(mutex)
- {
- other_hosts_also_finished = true;
- for (const auto & [host, host_info] : state.hosts)
- {
- if ((host != current_host) && !host_info.finished)
- other_hosts_also_finished = false;
- }
- };
-
{
std::lock_guard lock{mutex};
- if (finish_result.succeeded)
+
+ if (finishedNoLock())
{
- get_value_other_hosts_also_finished();
+ LOG_INFO(log, "The finish node for {} already exists", current_host_desc);
return true;
}
- if (finish_result.exception)
+
+ if (tried_to_finish)
{
- if (throw_if_error)
- std::rethrow_exception(finish_result.exception);
+ /// We don't repeat creating the finish node, no matter if it was successful or not.
+ LOG_INFO(log, "Skipped creating the finish node for {} because earlier we failed to do that", current_host_desc);
return false;
}
+
+ bool failed_to_set_error = tried_to_set_error && !state.host_with_error;
+ if (failed_to_set_error)
+ {
+ /// Tried to create the 'error' node, but failed.
+ /// Then it's better not to create the 'finish' node in this case because otherwise other hosts might think we've succeeded.
+ LOG_INFO(log, "Skipping creating the finish node for {} because there was an error which we were unable to send to other hosts", current_host_desc);
+ return false;
+ }
+
+ if (current_host == kInitiator)
+ {
+ /// Normally the initiator should wait for other hosts to finish before creating its own finish node.
+ /// We show warning if some of the other hosts didn't finish.
+ bool expect_other_hosts_finished = query_is_sent_to_other_hosts || !state.host_with_error;
+ bool other_hosts_finished = otherHostsFinishedNoLock() || !expect_other_hosts_finished;
+ if (!other_hosts_finished)
+ LOG_WARNING(log, "Hosts {} didn't finish before the initiator", getHostsDesc(getUnfinishedOtherHostsNoLock()));
+ }
}
+ stopWatchingThread();
+
try
{
- stopWatchingThread();
-
auto holder = with_retries.createRetriesControlHolder("BackupStageSync::finish", retries_kind);
holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]()
{
with_retries.renewZooKeeper(zookeeper);
- createFinishNodeAndRemoveAliveNode(zookeeper);
+ createFinishNodeAndRemoveAliveNode(zookeeper, throw_if_error);
});
-
- std::lock_guard lock{mutex};
- finish_result.succeeded = true;
- get_value_other_hosts_also_finished();
- return true;
}
catch (...)
{
@@ -933,63 +952,87 @@ bool BackupCoordinationStageSync::tryFinishImpl(bool & other_hosts_also_finished
getCurrentExceptionMessage(/* with_stacktrace= */ false, /* check_embedded_stacktrace= */ true));
std::lock_guard lock{mutex};
- finish_result.exception = std::current_exception();
+ tried_to_finish = true;
+
if (throw_if_error)
throw;
return false;
}
+
+ {
+ std::lock_guard lock{mutex};
+ tried_to_finish = true;
+ state.hosts.at(current_host).finished = true;
+ }
+
+ return true;
}
-void BackupCoordinationStageSync::createFinishNodeAndRemoveAliveNode(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper)
+void BackupCoordinationStageSync::createFinishNodeAndRemoveAliveNode(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper, bool throw_if_error)
{
- if (zookeeper->exists(finish_node_path))
- return;
-
- /// If the initiator of the query has that old version then it doesn't expect us to create the 'finish' node and moreover
- /// the initiator can start removing all the nodes immediately after all hosts report about reaching the "completed" status.
- /// So to avoid weird errors in the logs we won't create the 'finish' node if the initiator of the query has that old version.
- if ((getInitiatorVersion() == kVersionWithoutFinishNode) && (current_host != kInitiator))
- {
- LOG_INFO(log, "Skipped creating the 'finish' node because the initiator uses outdated version {}", getInitiatorVersion());
- return;
- }
-
std::optional num_hosts;
int num_hosts_version = -1;
for (size_t attempt_no = 1; attempt_no <= max_attempts_after_bad_version; ++attempt_no)
{
+ /// The 'num_hosts' node may not exist if createStartAndAliveNodes() failed in the constructor.
if (!num_hosts)
{
+ String num_hosts_str;
Coordination::Stat stat;
- num_hosts = parseFromString(zookeeper->get(num_hosts_node_path, &stat));
- num_hosts_version = stat.version;
+ if (zookeeper->tryGet(num_hosts_node_path, num_hosts_str, &stat))
+ {
+ num_hosts = parseFromString(num_hosts_str);
+ num_hosts_version = stat.version;
+ }
}
+ String serialized_error;
+ if (throw_if_error && zookeeper->tryGet(error_node_path, serialized_error))
+ {
+ auto [exception, host] = parseErrorNode(serialized_error);
+ if (exception)
+ std::rethrow_exception(exception);
+ }
+
+ if (zookeeper->exists(finish_node_path))
+ return;
+
+ bool start_node_exists = zookeeper->exists(start_node_path);
+
Coordination::Requests requests;
requests.reserve(3);
requests.emplace_back(zkutil::makeCreateRequest(finish_node_path, "", zkutil::CreateMode::Persistent));
- size_t num_hosts_node_path_pos = requests.size();
- requests.emplace_back(zkutil::makeSetRequest(num_hosts_node_path, toString(*num_hosts - 1), num_hosts_version));
-
- size_t alive_node_path_pos = static_cast(-1);
+ size_t alive_node_pos = static_cast(-1);
if (zookeeper->exists(alive_node_path))
{
- alive_node_path_pos = requests.size();
+ alive_node_pos = requests.size();
requests.emplace_back(zkutil::makeRemoveRequest(alive_node_path, -1));
}
+ size_t num_hosts_node_pos = static_cast(-1);
+ if (num_hosts)
+ {
+ num_hosts_node_pos = requests.size();
+ requests.emplace_back(zkutil::makeSetRequest(num_hosts_node_path, toString(start_node_exists ? (*num_hosts - 1) : *num_hosts), num_hosts_version));
+ }
+
Coordination::Responses responses;
auto code = zookeeper->tryMulti(requests, responses);
if (code == Coordination::Error::ZOK)
{
- --*num_hosts;
- String hosts_left_desc = ((*num_hosts == 0) ? "no hosts left" : fmt::format("{} hosts left", *num_hosts));
- LOG_INFO(log, "Created the 'finish' node in ZooKeeper for {}, {}", current_host_desc, hosts_left_desc);
+ String hosts_left_desc;
+ if (num_hosts)
+ {
+ if (start_node_exists)
+ --*num_hosts;
+ hosts_left_desc = (*num_hosts == 0) ? ", no hosts left" : fmt::format(", {} hosts left", *num_hosts);
+ }
+ LOG_INFO(log, "Created the 'finish' node in ZooKeeper for {}{}", current_host_desc, hosts_left_desc);
return;
}
@@ -999,18 +1042,18 @@ void BackupCoordinationStageSync::createFinishNodeAndRemoveAliveNode(Coordinatio
LOG_TRACE(log, "{} (attempt #{}){}", message, attempt_no, will_try_again ? ", will try again" : "");
};
- if ((responses.size() > num_hosts_node_path_pos) &&
- (responses[num_hosts_node_path_pos]->error == Coordination::Error::ZBADVERSION))
+ if ((alive_node_pos < responses.size()) &&
+ (responses[alive_node_pos]->error == Coordination::Error::ZNONODE))
{
- show_error_before_next_attempt("Other host changed the 'num_hosts' node in ZooKeeper");
- num_hosts.reset(); /// needs to reread 'num_hosts' again
- }
- else if ((responses.size() > alive_node_path_pos) &&
- (responses[alive_node_path_pos]->error == Coordination::Error::ZNONODE))
- {
- show_error_before_next_attempt(fmt::format("Node {} in ZooKeeper doesn't exist", alive_node_path_pos));
+ show_error_before_next_attempt(fmt::format("Node {} doesn't exist", alive_node_path));
/// needs another attempt
}
+ else if ((num_hosts_node_pos < responses.size()) &&
+ (responses[num_hosts_node_pos]->error == Coordination::Error::ZBADVERSION))
+ {
+ show_error_before_next_attempt(fmt::format("The version of node {} changed", num_hosts_node_path));
+ num_hosts.reset(); /// needs to reread 'num_hosts' again
+ }
else
{
zkutil::KeeperMultiException::check(code, requests, responses);
@@ -1026,60 +1069,73 @@ void BackupCoordinationStageSync::createFinishNodeAndRemoveAliveNode(Coordinatio
int BackupCoordinationStageSync::getInitiatorVersion() const
{
std::lock_guard lock{mutex};
- auto it = state.hosts.find(String{kInitiator});
- if (it == state.hosts.end())
- throw Exception(ErrorCodes::LOGICAL_ERROR, "There is no initiator of this {} query, it's a bug", operation_name);
- const HostInfo & host_info = it->second;
- return host_info.version;
+ return state.hosts.at(String{kInitiator}).version;
}
-void BackupCoordinationStageSync::waitForOtherHostsToFinish() const
-{
- tryWaitForOtherHostsToFinishImpl(/* reason = */ "", /* throw_if_error = */ true, /* timeout = */ {});
-}
-
-
-bool BackupCoordinationStageSync::tryWaitForOtherHostsToFinishAfterError() const noexcept
+bool BackupCoordinationStageSync::waitOtherHostsFinish(bool throw_if_error) const
{
std::optional timeout;
- if (finish_timeout_after_error.count() != 0)
- timeout = finish_timeout_after_error;
+ String reason;
- String reason = fmt::format("{} needs other hosts to finish before cleanup", current_host_desc);
- return tryWaitForOtherHostsToFinishImpl(reason, /* throw_if_error = */ false, timeout);
+ if (!throw_if_error)
+ {
+ if (finish_timeout_after_error.count() != 0)
+ timeout = finish_timeout_after_error;
+ reason = "after error before cleanup";
+ }
+
+ return waitOtherHostsFinishImpl(reason, timeout, throw_if_error);
}
-bool BackupCoordinationStageSync::tryWaitForOtherHostsToFinishImpl(const String & reason, bool throw_if_error, std::optional timeout) const
+bool BackupCoordinationStageSync::waitOtherHostsFinishImpl(const String & reason, std::optional timeout, bool throw_if_error) const
{
std::unique_lock lock{mutex};
/// TSA_NO_THREAD_SAFETY_ANALYSIS is here because Clang Thread Safety Analysis doesn't understand std::unique_lock.
- auto check_if_other_hosts_finish = [&](bool time_is_out) TSA_NO_THREAD_SAFETY_ANALYSIS
+ auto other_hosts_finished = [&]() TSA_NO_THREAD_SAFETY_ANALYSIS { return otherHostsFinishedNoLock(); };
+
+ if (other_hosts_finished())
{
- return checkIfOtherHostsFinish(reason, throw_if_error, time_is_out, timeout);
+ LOG_TRACE(log, "Other hosts have already finished");
+ return true;
+ }
+
+ bool failed_to_set_error = TSA_SUPPRESS_WARNING_FOR_READ(tried_to_set_error) && !TSA_SUPPRESS_WARNING_FOR_READ(state).host_with_error;
+ if (failed_to_set_error)
+ {
+ /// Tried to create the 'error' node, but failed.
+ /// Then it's better not to wait for other hosts to finish in this case because other hosts don't know they should finish.
+ LOG_INFO(log, "Skipping waiting for other hosts to finish because there was an error which we were unable to send to other hosts");
+ return false;
+ }
+
+ bool result = false;
+
+ /// TSA_NO_THREAD_SAFETY_ANALYSIS is here because Clang Thread Safety Analysis doesn't understand std::unique_lock.
+ auto check_if_hosts_finish = [&](bool time_is_out) TSA_NO_THREAD_SAFETY_ANALYSIS
+ {
+ return checkIfOtherHostsFinish(reason, timeout, time_is_out, result, throw_if_error);
};
if (timeout)
{
- if (state_changed.wait_for(lock, *timeout, [&] { return check_if_other_hosts_finish(/* time_is_out = */ false); }))
- return true;
- return check_if_other_hosts_finish(/* time_is_out = */ true);
+ if (!state_changed.wait_for(lock, *timeout, [&] { return check_if_hosts_finish(/* time_is_out = */ false); }))
+ check_if_hosts_finish(/* time_is_out = */ true);
}
else
{
- state_changed.wait(lock, [&] { return check_if_other_hosts_finish(/* time_is_out = */ false); });
- return true;
+ state_changed.wait(lock, [&] { return check_if_hosts_finish(/* time_is_out = */ false); });
}
+
+ return result;
}
-bool BackupCoordinationStageSync::checkIfOtherHostsFinish(const String & reason, bool throw_if_error, bool time_is_out, std::optional timeout) const
+bool BackupCoordinationStageSync::checkIfOtherHostsFinish(
+ const String & reason, std::optional timeout, bool time_is_out, bool & result, bool throw_if_error) const
{
- if (should_stop_watching_thread)
- throw Exception(ErrorCodes::LOGICAL_ERROR, "finish() was called while waiting for other hosts to finish");
-
if (throw_if_error)
process_list_element->checkTimeLimit();
@@ -1088,38 +1144,261 @@ bool BackupCoordinationStageSync::checkIfOtherHostsFinish(const String & reason,
if ((host == current_host) || host_info.finished)
continue;
+ String reason_text = reason.empty() ? "" : (" " + reason);
+
String host_status;
if (!host_info.started)
host_status = fmt::format(": the host hasn't started working on this {} yet", operation_name);
else if (!host_info.connected)
host_status = fmt::format(": the host is currently disconnected, last connection was at {}", host_info.last_connection_time);
- if (!time_is_out)
+ if (time_is_out)
{
- String reason_text = reason.empty() ? "" : (" because " + reason);
- LOG_TRACE(log, "Waiting for {} to finish{}{}", getHostDesc(host), reason_text, host_status);
- return false;
- }
- else
- {
- String reason_text = reason.empty() ? "" : fmt::format(" (reason of waiting: {})", reason);
- if (!throw_if_error)
- {
- LOG_INFO(log, "Waited longer than timeout {} for {} to finish{}{}",
- *timeout, getHostDesc(host), host_status, reason_text);
- return false;
- }
- else
+ if (throw_if_error)
{
throw Exception(ErrorCodes::FAILED_TO_SYNC_BACKUP_OR_RESTORE,
"Waited longer than timeout {} for {} to finish{}{}",
- *timeout, getHostDesc(host), host_status, reason_text);
+ *timeout, getHostDesc(host), reason_text, host_status);
}
+ LOG_INFO(log, "Waited longer than timeout {} for {} to finish{}{}",
+ *timeout, getHostDesc(host), reason_text, host_status);
+ result = false;
+ return true; /// stop waiting
}
+
+ if (should_stop_watching_thread)
+ {
+ LOG_ERROR(log, "waitOtherHostFinish({}) can't wait for other hosts to finish after the watching thread stopped", throw_if_error);
+ chassert(false, "waitOtherHostFinish() can't wait for other hosts to finish after the watching thread stopped");
+ if (throw_if_error)
+ throw Exception(ErrorCodes::LOGICAL_ERROR, "waitOtherHostsFinish() can't wait for other hosts to finish after the watching thread stopped");
+ result = false;
+ return true; /// stop waiting
+ }
+
+ LOG_TRACE(log, "Waiting for {} to finish{}{}", getHostDesc(host), reason_text, host_status);
+ return false; /// wait for next change of `state_changed`
}
LOG_TRACE(log, "Other hosts finished working on this {}", operation_name);
+ result = true;
+ return true; /// stop waiting
+}
+
+
+bool BackupCoordinationStageSync::finished() const
+{
+ std::lock_guard lock{mutex};
+ return finishedNoLock();
+}
+
+
+bool BackupCoordinationStageSync::finishedNoLock() const
+{
+ return state.hosts.at(current_host).finished;
+}
+
+
+bool BackupCoordinationStageSync::otherHostsFinished() const
+{
+ std::lock_guard lock{mutex};
+ return otherHostsFinishedNoLock();
+}
+
+
+bool BackupCoordinationStageSync::otherHostsFinishedNoLock() const
+{
+ for (const auto & [host, host_info] : state.hosts)
+ {
+ if (!host_info.finished && (host != current_host))
+ return false;
+ }
return true;
}
+
+bool BackupCoordinationStageSync::allHostsFinishedNoLock() const
+{
+ return finishedNoLock() && otherHostsFinishedNoLock();
+}
+
+
+Strings BackupCoordinationStageSync::getUnfinishedHosts() const
+{
+ std::lock_guard lock{mutex};
+ return getUnfinishedHostsNoLock();
+}
+
+
+Strings BackupCoordinationStageSync::getUnfinishedHostsNoLock() const
+{
+ if (allHostsFinishedNoLock())
+ return {};
+
+ Strings res;
+ res.reserve(all_hosts.size());
+ for (const auto & [host, host_info] : state.hosts)
+ {
+ if (!host_info.finished)
+ res.emplace_back(host);
+ }
+ return res;
+}
+
+
+Strings BackupCoordinationStageSync::getUnfinishedOtherHosts() const
+{
+ std::lock_guard lock{mutex};
+ return getUnfinishedOtherHostsNoLock();
+}
+
+
+Strings BackupCoordinationStageSync::getUnfinishedOtherHostsNoLock() const
+{
+ if (otherHostsFinishedNoLock())
+ return {};
+
+ Strings res;
+ res.reserve(all_hosts.size() - 1);
+ for (const auto & [host, host_info] : state.hosts)
+ {
+ if (!host_info.finished && (host != current_host))
+ res.emplace_back(host);
+ }
+ return res;
+}
+
+
+bool BackupCoordinationStageSync::setError(std::exception_ptr exception, bool throw_if_error)
+{
+ try
+ {
+ std::rethrow_exception(exception);
+ }
+ catch (const Exception & e)
+ {
+ return setError(e, throw_if_error);
+ }
+ catch (...)
+ {
+ return setError(Exception{getCurrentExceptionMessageAndPattern(true, true), getCurrentExceptionCode()}, throw_if_error);
+ }
+}
+
+
+bool BackupCoordinationStageSync::setError(const Exception & exception, bool throw_if_error)
+{
+ try
+ {
+ /// Most likely this exception has been already logged so here we're logging it without stacktrace.
+ String exception_message = getExceptionMessage(exception, /* with_stacktrace= */ false, /* check_embedded_stacktrace= */ true);
+ LOG_INFO(log, "Sending exception from {} to other hosts: {}", current_host_desc, exception_message);
+
+ {
+ std::lock_guard lock{mutex};
+ if (state.host_with_error)
+ {
+ LOG_INFO(log, "The error node already exists");
+ return true;
+ }
+
+ if (tried_to_set_error)
+ {
+ LOG_INFO(log, "Skipped creating the error node because earlier we failed to do that");
+ return false;
+ }
+ }
+
+ auto holder = with_retries.createRetriesControlHolder("BackupStageSync::setError", WithRetries::kErrorHandling);
+ holder.retries_ctl.retryLoop([&, &zookeeper = holder.faulty_zookeeper]()
+ {
+ with_retries.renewZooKeeper(zookeeper);
+ createErrorNode(exception, zookeeper);
+ });
+
+ {
+ std::lock_guard lock{mutex};
+ tried_to_set_error = true;
+ return true;
+ }
+ }
+ catch (...)
+ {
+ LOG_TRACE(log, "Caught exception while removing nodes from ZooKeeper for this {}: {}",
+ is_restore ? "restore" : "backup",
+ getCurrentExceptionMessage(/* with_stacktrace= */ false, /* check_embedded_stacktrace= */ true));
+
+ std::lock_guard lock{mutex};
+ tried_to_set_error = true;
+
+ if (throw_if_error)
+ throw;
+ return false;
+ }
+}
+
+
+void BackupCoordinationStageSync::createErrorNode(const Exception & exception, Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper)
+{
+ String serialized_error;
+ {
+ WriteBufferFromOwnString buf;
+ writeStringBinary(current_host, buf);
+ writeException(exception, buf, true);
+ serialized_error = buf.str();
+ }
+
+ auto code = zookeeper->tryCreate(error_node_path, serialized_error, zkutil::CreateMode::Persistent);
+
+ if (code == Coordination::Error::ZOK)
+ {
+ std::lock_guard lock{mutex};
+ if (!state.host_with_error)
+ {
+ state.host_with_error = current_host;
+ state.hosts.at(current_host).exception = parseErrorNode(serialized_error).first;
+ }
+ LOG_TRACE(log, "Sent exception from {} to other hosts", current_host_desc);
+ return;
+ }
+
+ if (code == Coordination::Error::ZNODEEXISTS)
+ {
+ String another_error = zookeeper->get(error_node_path);
+ auto [another_exception, host] = parseErrorNode(another_error);
+ if (another_exception)
+ {
+ std::lock_guard lock{mutex};
+ if (!state.host_with_error)
+ {
+ state.host_with_error = host;
+ state.hosts.at(host).exception = another_exception;
+ }
+ LOG_INFO(log, "Another error is already assigned for this {}", operation_name);
+ return;
+ }
+ }
+
+ throw zkutil::KeeperException::fromPath(code, error_node_path);
+}
+
+
+std::pair BackupCoordinationStageSync::parseErrorNode(const String & error_node_contents) const
+{
+ ReadBufferFromOwnString buf{error_node_contents};
+ String host;
+ readStringBinary(host, buf);
+ if (std::find(all_hosts.begin(), all_hosts.end(), host) == all_hosts.end())
+ return {};
+ auto exception = std::make_exception_ptr(readException(buf, fmt::format("Got error from {}", getHostDesc(host))));
+ return {exception, host};
+}
+
+
+bool BackupCoordinationStageSync::isErrorSet() const
+{
+ std::lock_guard lock{mutex};
+ return state.host_with_error.has_value();
+}
+
}
diff --git a/src/Backups/BackupCoordinationStageSync.h b/src/Backups/BackupCoordinationStageSync.h
index dc0d3c3c83d..11d3d1cf6f4 100644
--- a/src/Backups/BackupCoordinationStageSync.h
+++ b/src/Backups/BackupCoordinationStageSync.h
@@ -1,7 +1,9 @@
#pragma once
+#include
#include
+
namespace DB
{
@@ -9,12 +11,16 @@ namespace DB
class BackupCoordinationStageSync
{
public:
+ /// Empty string as the current host is used to mark the initiator of a BACKUP ON CLUSTER or RESTORE ON CLUSTER query.
+ static const constexpr std::string_view kInitiator;
+
BackupCoordinationStageSync(
bool is_restore_, /// true if this is a RESTORE ON CLUSTER command, false if this is a BACKUP ON CLUSTER command
const String & zookeeper_path_, /// path to the "stage" folder in ZooKeeper
const String & current_host_, /// the current host, or an empty string if it's the initiator of the BACKUP/RESTORE ON CLUSTER command
const Strings & all_hosts_, /// all the hosts (including the initiator and the current host) performing the BACKUP/RESTORE ON CLUSTER command
bool allow_concurrency_, /// whether it's allowed to have concurrent backups or restores.
+ BackupConcurrencyCounters & concurrency_counters_,
const WithRetries & with_retries_,
ThreadPoolCallbackRunnerUnsafe schedule_,
QueryStatusPtr process_list_element_,
@@ -22,30 +28,37 @@ public:
~BackupCoordinationStageSync();
+ /// Sets that the BACKUP or RESTORE query was sent to other hosts.
+ void setQueryIsSentToOtherHosts();
+ bool isQuerySentToOtherHosts() const;
+
/// Sets the stage of the current host and signal other hosts if there were other hosts waiting for that.
void setStage(const String & stage, const String & stage_result = {});
- /// Waits until all the specified hosts come to the specified stage.
- /// The function returns the results which specified hosts set when they came to the required stage.
- /// If it doesn't happen before the timeout then the function will stop waiting and throw an exception.
- Strings waitForHostsToReachStage(const String & stage_to_wait, const Strings & hosts, std::optional timeout = {}) const;
-
- /// Waits until all the other hosts finish their work.
- /// Stops waiting and throws an exception if another host encounters an error or if some host gets cancelled.
- void waitForOtherHostsToFinish() const;
-
- /// Lets other host know that the current host has finished its work.
- void finish(bool & other_hosts_also_finished);
+ /// Waits until specified hosts come to the specified stage.
+ /// The function returns the results which the specified hosts set when they came to the required stage.
+ Strings waitHostsReachStage(const Strings & hosts, const String & stage_to_wait) const;
/// Lets other hosts know that the current host has encountered an error.
- bool trySetError(std::exception_ptr exception) noexcept;
+ /// The function returns true if it successfully created the error node or if the error node was found already exist.
+ bool setError(std::exception_ptr exception, bool throw_if_error);
+ bool isErrorSet() const;
- /// Waits until all the other hosts finish their work (as a part of error-handling process).
- /// Doesn't stops waiting if some host encounters an error or gets cancelled.
- bool tryWaitForOtherHostsToFinishAfterError() const noexcept;
+ /// Waits until the hosts other than the current host finish their work. Must be called before finish().
+ /// Stops waiting and throws an exception if another host encounters an error or if some host gets cancelled.
+ bool waitOtherHostsFinish(bool throw_if_error) const;
+ bool otherHostsFinished() const;
- /// Lets other host know that the current host has finished its work (as a part of error-handling process).
- bool tryFinishAfterError(bool & other_hosts_also_finished) noexcept;
+ /// Lets other hosts know that the current host has finished its work.
+ bool finish(bool throw_if_error);
+ bool finished() const;
+
+ /// Returns true if all the hosts have finished.
+ bool allHostsFinished() const { return finished() && otherHostsFinished(); }
+
+ /// Returns a list of the hosts which haven't finished yet.
+ Strings getUnfinishedHosts() const;
+ Strings getUnfinishedOtherHosts() const;
/// Returns a printable name of a specific host. For empty host the function returns "initiator".
static String getHostDesc(const String & host);
@@ -78,14 +91,17 @@ private:
/// Reads the current state from ZooKeeper without throwing exceptions.
void readCurrentState(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);
+
+ /// Creates a stage node to let other hosts know we've reached the specified stage.
+ void createStageNode(const String & stage, const String & stage_result, Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);
String getStageNodePath(const String & stage) const;
/// Lets other hosts know that the current host has encountered an error.
- bool trySetError(const Exception & exception);
- void setError(const Exception & exception);
+ bool setError(const Exception & exception, bool throw_if_error);
+ void createErrorNode(const Exception & exception, Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);
/// Deserializes an error stored in the error node.
- static std::pair parseErrorNode(const String & error_node_contents);
+ std::pair parseErrorNode(const String & error_node_contents) const;
/// Reset the `connected` flag for each host.
void resetConnectedFlag();
@@ -102,19 +118,27 @@ private:
void cancelQueryIfDisconnectedTooLong();
/// Used by waitForHostsToReachStage() to check if everything is ready to return.
- bool checkIfHostsReachStage(const Strings & hosts, const String & stage_to_wait, bool time_is_out, std::optional timeout, Strings & results) const TSA_REQUIRES(mutex);
+ bool checkIfHostsReachStage(const Strings & hosts, const String & stage_to_wait, Strings & results) const TSA_REQUIRES(mutex);
/// Creates the 'finish' node.
- bool tryFinishImpl();
- bool tryFinishImpl(bool & other_hosts_also_finished, bool throw_if_error, WithRetries::Kind retries_kind);
- void createFinishNodeAndRemoveAliveNode(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper);
+ bool finishImpl(bool throw_if_error, WithRetries::Kind retries_kind);
+ void createFinishNodeAndRemoveAliveNode(Coordination::ZooKeeperWithFaultInjection::Ptr zookeeper, bool throw_if_error);
/// Returns the version used by the initiator.
int getInitiatorVersion() const;
/// Waits until all the other hosts finish their work.
- bool tryWaitForOtherHostsToFinishImpl(const String & reason, bool throw_if_error, std::optional timeout) const;
- bool checkIfOtherHostsFinish(const String & reason, bool throw_if_error, bool time_is_out, std::optional timeout) const TSA_REQUIRES(mutex);
+ bool waitOtherHostsFinishImpl(const String & reason, std::optional timeout, bool throw_if_error) const;
+ bool checkIfOtherHostsFinish(const String & reason, std::optional timeout, bool time_is_out, bool & result, bool throw_if_error) const TSA_REQUIRES(mutex);
+
+ /// Returns true if all the hosts have finished.
+ bool allHostsFinishedNoLock() const TSA_REQUIRES(mutex);
+ bool finishedNoLock() const TSA_REQUIRES(mutex);
+ bool otherHostsFinishedNoLock() const TSA_REQUIRES(mutex);
+
+ /// Returns a list of the hosts which haven't finished yet.
+ Strings getUnfinishedHostsNoLock() const TSA_REQUIRES(mutex);
+ Strings getUnfinishedOtherHostsNoLock() const TSA_REQUIRES(mutex);
const bool is_restore;
const String operation_name;
@@ -138,15 +162,16 @@ private:
/// Paths in ZooKeeper.
const std::filesystem::path zookeeper_path;
const String root_zookeeper_path;
- const String operation_node_path;
+ const String operation_zookeeper_path;
const String operation_node_name;
- const String stage_node_path;
const String start_node_path;
const String finish_node_path;
const String num_hosts_node_path;
+ const String error_node_path;
const String alive_node_path;
const String alive_tracker_node_path;
- const String error_node_path;
+
+ std::optional concurrency_check;
std::shared_ptr zk_nodes_changed;
@@ -176,25 +201,21 @@ private:
{
std::map hosts; /// std::map because we need to compare states
std::optional host_with_error;
- bool cancelled = false;
bool operator ==(const State & other) const;
bool operator !=(const State & other) const;
+ void merge(const State & other);
};
State state TSA_GUARDED_BY(mutex);
mutable std::condition_variable state_changed;
std::future watching_thread_future;
- std::atomic should_stop_watching_thread = false;
+ bool should_stop_watching_thread TSA_GUARDED_BY(mutex) = false;
- struct FinishResult
- {
- bool succeeded = false;
- std::exception_ptr exception;
- bool other_hosts_also_finished = false;
- };
- FinishResult finish_result TSA_GUARDED_BY(mutex);
+ bool query_is_sent_to_other_hosts TSA_GUARDED_BY(mutex) = false;
+ bool tried_to_finish TSA_GUARDED_BY(mutex) = false;
+ bool tried_to_set_error TSA_GUARDED_BY(mutex) = false;
mutable std::mutex mutex;
};
diff --git a/src/Backups/BackupsWorker.cpp b/src/Backups/BackupsWorker.cpp
index 70ef6d43169..226a90d1dbf 100644
--- a/src/Backups/BackupsWorker.cpp
+++ b/src/Backups/BackupsWorker.cpp
@@ -330,6 +330,7 @@ std::pair BackupsWorker::start(const ASTPtr & backup_
struct BackupsWorker::BackupStarter
{
BackupsWorker & backups_worker;
+ LoggerPtr log;
std::shared_ptr backup_query;
ContextPtr query_context; /// We have to keep `query_context` until the end of the operation because a pointer to it is stored inside the ThreadGroup we're using.
ContextMutablePtr backup_context;
@@ -346,6 +347,7 @@ struct BackupsWorker::BackupStarter
BackupStarter(BackupsWorker & backups_worker_, const ASTPtr & query_, const ContextPtr & context_)
: backups_worker(backups_worker_)
+ , log(backups_worker.log)
, backup_query(std::static_pointer_cast(query_->clone()))
, query_context(context_)
, backup_context(Context::createCopy(query_context))
@@ -400,9 +402,20 @@ struct BackupsWorker::BackupStarter
chassert(!backup);
backup = backups_worker.openBackupForWriting(backup_info, backup_settings, backup_coordination, backup_context);
- backups_worker.doBackup(
- backup, backup_query, backup_id, backup_name_for_logging, backup_settings, backup_coordination, backup_context,
- on_cluster, cluster);
+ backups_worker.doBackup(backup, backup_query, backup_id, backup_settings, backup_coordination, backup_context,
+ on_cluster, cluster);
+
+ backup_coordination->finish(/* throw_if_error = */ true);
+ backup.reset();
+
+ /// The backup coordination is not needed anymore.
+ if (!is_internal_backup)
+ backup_coordination->cleanup(/* throw_if_error = */ true);
+ backup_coordination.reset();
+
+ /// NOTE: setStatus is called after setNumFilesAndSize in order to have actual information in a backup log record
+ LOG_INFO(log, "{} {} was created successfully", (is_internal_backup ? "Internal backup" : "Backup"), backup_name_for_logging);
+ backups_worker.setStatus(backup_id, BackupStatus::BACKUP_CREATED);
}
void onException()
@@ -417,16 +430,29 @@ struct BackupsWorker::BackupStarter
if (backup && !backup->setIsCorrupted())
should_remove_files_in_backup = false;
- if (backup_coordination && backup_coordination->trySetError(std::current_exception()))
+ bool all_hosts_finished = false;
+
+ if (backup_coordination && backup_coordination->setError(std::current_exception(), /* throw_if_error = */ false))
{
- bool other_hosts_finished = backup_coordination->tryWaitForOtherHostsToFinishAfterError();
+ bool other_hosts_finished = !is_internal_backup
+ && (!backup_coordination->isBackupQuerySentToOtherHosts() || backup_coordination->waitOtherHostsFinish(/* throw_if_error = */ false));
- if (should_remove_files_in_backup && other_hosts_finished)
- backup->tryRemoveAllFiles();
-
- backup_coordination->tryFinishAfterError();
+ all_hosts_finished = backup_coordination->finish(/* throw_if_error = */ false) && other_hosts_finished;
}
+ if (!all_hosts_finished)
+ should_remove_files_in_backup = false;
+
+ if (backup && should_remove_files_in_backup)
+ backup->tryRemoveAllFiles();
+
+ backup.reset();
+
+ if (backup_coordination && all_hosts_finished)
+ backup_coordination->cleanup(/* throw_if_error = */ false);
+
+ backup_coordination.reset();
+
backups_worker.setStatusSafe(backup_id, getBackupStatusFromCurrentException());
}
};
@@ -498,7 +524,6 @@ void BackupsWorker::doBackup(
BackupMutablePtr backup,
const std::shared_ptr & backup_query,
const OperationID & backup_id,
- const String & backup_name_for_logging,
const BackupSettings & backup_settings,
std::shared_ptr backup_coordination,
ContextMutablePtr context,
@@ -522,10 +547,10 @@ void BackupsWorker::doBackup(
backup_settings.copySettingsToQuery(*backup_query);
sendQueryToOtherHosts(*backup_query, cluster, backup_settings.shard_num, backup_settings.replica_num,
context, required_access, backup_coordination->getOnClusterInitializationKeeperRetriesInfo());
- backup_coordination->setBackupQueryWasSentToOtherHosts();
+ backup_coordination->setBackupQueryIsSentToOtherHosts();
/// Wait until all the hosts have written their backup entries.
- backup_coordination->waitForOtherHostsToFinish();
+ backup_coordination->waitOtherHostsFinish(/* throw_if_error = */ true);
}
else
{
@@ -570,18 +595,8 @@ void BackupsWorker::doBackup(
compressed_size = backup->getCompressedSize();
}
- /// Close the backup.
- backup.reset();
-
- /// The backup coordination is not needed anymore.
- backup_coordination->finish();
-
/// NOTE: we need to update metadata again after backup->finalizeWriting(), because backup metadata is written there.
setNumFilesAndSize(backup_id, num_files, total_size, num_entries, uncompressed_size, compressed_size, 0, 0);
-
- /// NOTE: setStatus is called after setNumFilesAndSize in order to have actual information in a backup log record
- LOG_INFO(log, "{} {} was created successfully", (is_internal_backup ? "Internal backup" : "Backup"), backup_name_for_logging);
- setStatus(backup_id, BackupStatus::BACKUP_CREATED);
}
@@ -688,6 +703,7 @@ void BackupsWorker::writeBackupEntries(
struct BackupsWorker::RestoreStarter
{
BackupsWorker & backups_worker;
+ LoggerPtr log;
std::shared_ptr restore_query;
ContextPtr query_context; /// We have to keep `query_context` until the end of the operation because a pointer to it is stored inside the ThreadGroup we're using.
ContextMutablePtr restore_context;
@@ -703,6 +719,7 @@ struct BackupsWorker::RestoreStarter
RestoreStarter(BackupsWorker & backups_worker_, const ASTPtr & query_, const ContextPtr & context_)
: backups_worker(backups_worker_)
+ , log(backups_worker.log)
, restore_query(std::static_pointer_cast(query_->clone()))
, query_context(context_)
, restore_context(Context::createCopy(query_context))
@@ -754,16 +771,17 @@ struct BackupsWorker::RestoreStarter
}
restore_coordination = backups_worker.makeRestoreCoordination(on_cluster, restore_settings, restore_context);
- backups_worker.doRestore(
- restore_query,
- restore_id,
- backup_name_for_logging,
- backup_info,
- restore_settings,
- restore_coordination,
- restore_context,
- on_cluster,
- cluster);
+ backups_worker.doRestore(restore_query, restore_id, backup_info, restore_settings, restore_coordination, restore_context,
+ on_cluster, cluster);
+
+ /// The restore coordination is not needed anymore.
+ restore_coordination->finish(/* throw_if_error = */ true);
+ if (!is_internal_restore)
+ restore_coordination->cleanup(/* throw_if_error = */ true);
+ restore_coordination.reset();
+
+ LOG_INFO(log, "Restored from {} {} successfully", (is_internal_restore ? "internal backup" : "backup"), backup_name_for_logging);
+ backups_worker.setStatus(restore_id, BackupStatus::RESTORED);
}
void onException()
@@ -771,12 +789,16 @@ struct BackupsWorker::RestoreStarter
/// Something bad happened, some data were not restored.
tryLogCurrentException(backups_worker.log, fmt::format("Failed to restore from {} {}", (is_internal_restore ? "internal backup" : "backup"), backup_name_for_logging));
- if (restore_coordination && restore_coordination->trySetError(std::current_exception()))
+ if (restore_coordination && restore_coordination->setError(std::current_exception(), /* throw_if_error = */ false))
{
- restore_coordination->tryWaitForOtherHostsToFinishAfterError();
- restore_coordination->tryFinishAfterError();
+ bool other_hosts_finished = !is_internal_restore
+ && (!restore_coordination->isRestoreQuerySentToOtherHosts() || restore_coordination->waitOtherHostsFinish(/* throw_if_error = */ false));
+ if (restore_coordination->finish(/* throw_if_error = */ false) && other_hosts_finished)
+ restore_coordination->cleanup(/* throw_if_error = */ false);
}
+ restore_coordination.reset();
+
backups_worker.setStatusSafe(restore_id, getRestoreStatusFromCurrentException());
}
};
@@ -839,7 +861,6 @@ BackupPtr BackupsWorker::openBackupForReading(const BackupInfo & backup_info, co
void BackupsWorker::doRestore(
const std::shared_ptr & restore_query,
const OperationID & restore_id,
- const String & backup_name_for_logging,
const BackupInfo & backup_info,
RestoreSettings restore_settings,
std::shared_ptr restore_coordination,
@@ -883,10 +904,10 @@ void BackupsWorker::doRestore(
restore_settings.copySettingsToQuery(*restore_query);
sendQueryToOtherHosts(*restore_query, cluster, restore_settings.shard_num, restore_settings.replica_num,
context, {}, restore_coordination->getOnClusterInitializationKeeperRetriesInfo());
- restore_coordination->setRestoreQueryWasSentToOtherHosts();
+ restore_coordination->setRestoreQueryIsSentToOtherHosts();
/// Wait until all the hosts have done with their restoring work.
- restore_coordination->waitForOtherHostsToFinish();
+ restore_coordination->waitOtherHostsFinish(/* throw_if_error = */ true);
}
else
{
@@ -906,12 +927,6 @@ void BackupsWorker::doRestore(
backup, context, getThreadPool(ThreadPoolId::RESTORE), after_task_callback};
restorer.run(RestorerFromBackup::RESTORE);
}
-
- /// The restore coordination is not needed anymore.
- restore_coordination->finish();
-
- LOG_INFO(log, "Restored from {} {} successfully", (is_internal_restore ? "internal backup" : "backup"), backup_name_for_logging);
- setStatus(restore_id, BackupStatus::RESTORED);
}
@@ -944,7 +959,7 @@ BackupsWorker::makeBackupCoordination(bool on_cluster, const BackupSettings & ba
if (!on_cluster)
{
return std::make_shared(
- *backup_settings.backup_uuid, !backup_settings.deduplicate_files, allow_concurrent_backups, *concurrency_counters);
+ !backup_settings.deduplicate_files, allow_concurrent_backups, *concurrency_counters);
}
bool is_internal_backup = backup_settings.internal;
@@ -982,8 +997,7 @@ BackupsWorker::makeRestoreCoordination(bool on_cluster, const RestoreSettings &
{
if (!on_cluster)
{
- return std::make_shared(
- *restore_settings.restore_uuid, allow_concurrent_restores, *concurrency_counters);
+ return std::make_shared(allow_concurrent_restores, *concurrency_counters);
}
bool is_internal_restore = restore_settings.internal;
diff --git a/src/Backups/BackupsWorker.h b/src/Backups/BackupsWorker.h
index 37f91e269a9..2e5ca84f3f6 100644
--- a/src/Backups/BackupsWorker.h
+++ b/src/Backups/BackupsWorker.h
@@ -81,7 +81,6 @@ private:
BackupMutablePtr backup,
const std::shared_ptr & backup_query,
const BackupOperationID & backup_id,
- const String & backup_name_for_logging,
const BackupSettings & backup_settings,
std::shared_ptr backup_coordination,
ContextMutablePtr context,
@@ -102,7 +101,6 @@ private:
void doRestore(
const std::shared_ptr & restore_query,
const BackupOperationID & restore_id,
- const String & backup_name_for_logging,
const BackupInfo & backup_info,
RestoreSettings restore_settings,
std::shared_ptr